Refactored server to share the same struct whether futures are exposed or not. Multi-server test now properly waits for a block instead of a timer.

This commit is contained in:
Ignotus Peverell 2016-12-23 12:15:36 -08:00
parent f067e18644
commit 0cc786a1e5
No known key found for this signature in database
GPG key ID: 99CD25F39F8F8211
3 changed files with 51 additions and 43 deletions

View file

@ -40,4 +40,4 @@ mod adapters;
mod miner;
mod server;
pub use server::{Server, ServerFut, ServerConfig};
pub use server::{Server, ServerConfig};

View file

@ -108,43 +108,8 @@ impl Server {
})
}
/// Asks the server to connect to a peer at the provided network address.
pub fn connect_peer(&self, addr: SocketAddr) -> Result<(), Error> {
let handle = self.evt_handle.clone();
handle.spawn(self.p2p.connect_peer(addr, handle.clone()).map_err(|_| ()));
Ok(())
}
/// Start mining for blocks on a separate thread. Relies on a toy miner,
/// mostly for testing.
pub fn start_miner(&self) {
let miner = miner::Miner::new(self.chain_head.clone(),
self.chain_store.clone(),
self.chain_adapter.clone());
thread::spawn(move || {
miner.run_loop();
});
}
}
/// Implementation of the server that doesn't take control of the event loop
/// and returns futures instead.
pub struct ServerFut {
config: ServerConfig,
/// handle to our network server
p2p: Arc<p2p::Server>,
/// the reference copy of the current chain state
chain_head: Arc<Mutex<chain::Tip>>,
/// data store access
chain_store: Arc<chain::ChainStore>,
/// chain adapter to net, required for miner and anything that submits
/// blocks
chain_adapter: Arc<ChainToNetAdapter>,
}
impl ServerFut {
/// Instantiates and starts a new server.
pub fn start(config: ServerConfig, evt_handle: &reactor::Handle) -> Result<Server, Error> {
/// Instantiates a new server associated with the provided future reactor.
pub fn future(config: ServerConfig, evt_handle: &reactor::Handle) -> Result<Server, Error> {
let (chain_store, head) = try!(store_head(&config));
let shared_head = Arc::new(Mutex::new(head));
@ -169,7 +134,8 @@ impl ServerFut {
}
/// Asks the server to connect to a peer at the provided network address.
pub fn connect_peer(&self, addr: SocketAddr, handle: &reactor::Handle) -> Result<(), Error> {
pub fn connect_peer(&self, addr: SocketAddr) -> Result<(), Error> {
let handle = self.evt_handle.clone();
handle.spawn(self.p2p.connect_peer(addr, handle.clone()).map_err(|_| ()));
Ok(())
}
@ -184,6 +150,12 @@ impl ServerFut {
miner.run_loop();
});
}
pub fn head(&self) -> chain::Tip {
let head = self.chain_head.clone();
let h = head.lock().unwrap();
h.clone()
}
}
// Helper function to create the chain storage and check if it already has a

View file

@ -25,7 +25,8 @@ use std::io;
use std::thread;
use std::time;
use futures::Future;
use futures::{Future, Poll, Async};
use futures::task::park;
use tokio_core::reactor;
#[test]
@ -38,7 +39,7 @@ fn simulate_servers() {
// instantiates 5 servers on different ports
let mut servers = vec![];
for n in 0..5 {
let s = grin::ServerFut::start(
let s = grin::Server::future(
grin::ServerConfig{
db_root: format!("target/grin-{}", n),
cuckoo_size: 12,
@ -58,7 +59,42 @@ fn simulate_servers() {
// start mining
servers[0].start_miner();
let original_height = servers[0].head().height;
let timeout = reactor::Timeout::new(time::Duration::new(10, 0), &handle.clone()).unwrap();
evtlp.run(timeout);
// monitor for a change of head on a different server and check we
evtlp.run(change(&servers[4]).and_then(|tip| {
assert!(tip.height == original_height+1);
Ok(())
}));
}
// Builds the change future, monitoring for a change of head on the provided server
fn change<'a>(s: &'a grin::Server) -> HeadChange<'a> {
let start_head = s.head();
HeadChange {
server: s,
original: start_head,
}
}
/// Future that monitors when a server has had its head updated.
struct HeadChange<'a> {
server: &'a grin::Server,
original: chain::Tip,
}
impl<'a> Future for HeadChange<'a> {
type Item = chain::Tip;
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let new_head = self.server.head();
if new_head.last_block_h != self.original.last_block_h {
Ok(Async::Ready(new_head))
} else {
// egregious polling, asking the task to schedule us every iteration
park().unpark();
Ok(Async::NotReady)
}
}
}