Added another server implementation for when the client needs to stay in control of the event loop (i.e. multiple servers for tests). Test to spin up 5 servers and connect them all together.

This commit is contained in:
Ignotus Peverell 2016-12-14 16:10:39 -08:00
parent 8a255c0218
commit d395b3d128
No known key found for this signature in database
GPG key ID: 99CD25F39F8F8211
5 changed files with 89 additions and 29 deletions

View file

@ -119,7 +119,7 @@ impl Readable<BlockHeader> for BlockHeader {
} }
/// A block as expressed in the MimbleWimble protocol. The reward is /// A block as expressed in the MimbleWimble protocol. The reward is
/// non-explicit, assumed to be deductible from block height (similar to /// non-explicit, assumed to be deducible from block height (similar to
/// bitcoin's schedule) and expressed as a global transaction fee (added v.H), /// bitcoin's schedule) and expressed as a global transaction fee (added v.H),
/// additive to the total of fees ever collected. /// additive to the total of fees ever collected.
pub struct Block { pub struct Block {

View file

@ -38,4 +38,4 @@ extern crate secp256k1zkp as secp;
mod miner; mod miner;
mod server; mod server;
pub use server::{Server, ServerConfig}; pub use server::{Server, ServerFut, ServerConfig};

View file

@ -20,6 +20,7 @@ use std::net::SocketAddr;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::thread; use std::thread;
use futures::Future;
use tokio_core::reactor; use tokio_core::reactor;
use chain; use chain;
@ -77,23 +78,7 @@ pub struct Server {
impl Server { impl Server {
/// Instantiates and starts a new server. /// Instantiates and starts a new server.
pub fn start(config: ServerConfig) -> Result<Server, Error> { pub fn start(config: ServerConfig) -> Result<Server, Error> {
let chain_store = try!(chain::store::ChainKVStore::new(config.db_root.clone()) let (chain_store, head) = try!(store_head(&config));
.map_err(&Error::StoreErr));
// check if we have a head in store, otherwise the genesis block is it
let head = match chain_store.head() {
Ok(tip) => tip,
Err(chain::types::Error::NotFoundErr) => {
let mut gen = core::genesis::genesis();
if config.cuckoo_size > 0 {
gen.header.cuckoo_len = config.cuckoo_size;
}
let tip = chain::types::Tip::new(gen.hash());
try!(chain_store.save_tip(&tip).map_err(&Error::StoreErr));
tip
}
Err(e) => return Err(Error::StoreErr(e)),
};
let mut evtlp = reactor::Core::new().unwrap(); let mut evtlp = reactor::Core::new().unwrap();
let handle = evtlp.handle(); let handle = evtlp.handle();
@ -113,7 +98,7 @@ impl Server {
/// Asks the server to connect to a peer at the provided network address. /// Asks the server to connect to a peer at the provided network address.
pub fn connect_peer(&self, addr: SocketAddr) -> Result<(), Error> { pub fn connect_peer(&self, addr: SocketAddr) -> Result<(), Error> {
let handle = self.evt_handle.clone(); let handle = self.evt_handle.clone();
handle.spawn(self.p2p.connect_peer(addr, handle.clone())); handle.spawn(self.p2p.connect_peer(addr, handle.clone()).map_err(|_| ()));
Ok(()) Ok(())
} }
@ -126,3 +111,72 @@ impl Server {
}); });
} }
} }
/// Implementation of the server that doesn't take control of the event loop
/// and returns futures instead.
pub struct ServerFut {
config: ServerConfig,
/// handle to our network server
p2p: Arc<p2p::Server>,
/// the reference copy of the current chain state
chain_head: Arc<Mutex<chain::Tip>>,
/// data store access
chain_store: Arc<Mutex<chain::ChainStore>>,
}
impl ServerFut {
/// Instantiates and starts a new server.
pub fn start(config: ServerConfig, evt_handle: &reactor::Handle) -> Result<Server, Error> {
let (chain_store, head) = try!(store_head(&config));
let server = Arc::new(p2p::Server::new(config.p2p_config));
evt_handle.spawn(server.start(evt_handle.clone()).map_err(|_| ()));
warn!("Grin server started.");
Ok(Server {
config: config,
evt_handle: evt_handle.clone(),
p2p: server,
chain_head: Arc::new(Mutex::new(head)),
chain_store: Arc::new(Mutex::new(chain_store)),
})
}
/// Asks the server to connect to a peer at the provided network address.
pub fn connect_peer(&self, addr: SocketAddr, handle: &reactor::Handle) -> Result<(), Error> {
handle.spawn(self.p2p.connect_peer(addr, handle.clone()).map_err(|_| ()));
Ok(())
}
/// Start mining for blocks on a separate thread. Relies on a toy miner,
/// mostly for testing.
pub fn start_miner(&self) {
let miner = miner::Miner::new(self.chain_head.clone(), self.chain_store.clone());
thread::spawn(move || {
miner.run_loop();
});
}
}
// Helper function to create the chain storage and check if it already has a
// genesis block
fn store_head(config: &ServerConfig) -> Result<(chain::store::ChainKVStore, chain::Tip), Error> {
let chain_store = try!(chain::store::ChainKVStore::new(config.db_root.clone())
.map_err(&Error::StoreErr));
// check if we have a head in store, otherwise the genesis block is it
let head = match chain_store.head() {
Ok(tip) => tip,
Err(chain::types::Error::NotFoundErr) => {
let mut gen = core::genesis::genesis();
if config.cuckoo_size > 0 {
gen.header.cuckoo_len = config.cuckoo_size;
}
let tip = chain::types::Tip::new(gen.hash());
try!(chain_store.save_tip(&tip).map_err(&Error::StoreErr));
tip
}
Err(e) => return Err(Error::StoreErr(e)),
};
Ok((chain_store, head))
}

View file

@ -16,32 +16,37 @@ extern crate grin_grin as grin;
extern crate grin_core as core; extern crate grin_core as core;
extern crate grin_p2p as p2p; extern crate grin_p2p as p2p;
extern crate grin_chain as chain; extern crate grin_chain as chain;
extern crate env_logger; extern crate env_logger;
extern crate futures;
extern crate tokio_core;
use std::io; use std::io;
use std::thread; use std::thread;
use std::time; use std::time;
use futures::Future;
use tokio_core::reactor;
#[test] #[test]
fn simulate_servers() { fn simulate_servers() {
env_logger::init().unwrap(); env_logger::init().unwrap();
let mut evtlp = reactor::Core::new().unwrap();
let handle = evtlp.handle();
// instantiates 5 servers on different ports // instantiates 5 servers on different ports
let mut servers = vec![]; let mut servers = vec![];
for n in 0..5 { for n in 0..5 {
thread::spawn(move || { let s = grin::ServerFut::start(
let s = grin::Server::start(
grin::ServerConfig{ grin::ServerConfig{
db_root: format!("target/grin-{}", n), db_root: format!("target/grin-{}", n),
cuckoo_size: 18, cuckoo_size: 18,
p2p_config: p2p::P2PConfig{port: 10000+n, ..p2p::P2PConfig::default()} p2p_config: p2p::P2PConfig{port: 10000+n, ..p2p::P2PConfig::default()}
}).unwrap(); }, &handle).unwrap();
servers.push(s); servers.push(s);
});
} }
thread::sleep(time::Duration::from_millis(100));
// everyone connects to everyone else // everyone connects to everyone else
for n in 0..5 { for n in 0..5 {
for m in 0..5 { for m in 0..5 {
@ -52,4 +57,6 @@ fn simulate_servers() {
} }
} }
let timeout = reactor::Timeout::new(time::Duration::new(1, 0), &handle.clone()).unwrap();
evtlp.run(timeout);
} }

View file

@ -117,7 +117,7 @@ impl Server {
pub fn connect_peer(&self, pub fn connect_peer(&self,
addr: SocketAddr, addr: SocketAddr,
h: reactor::Handle) h: reactor::Handle)
-> Box<Future<Item = (), Error = ()>> { -> Box<Future<Item = (), Error = Error>> {
let peers = self.peers.clone(); let peers = self.peers.clone();
let socket = TcpStream::connect(&addr, &h).map_err(|e| Error::IOErr(e)); let socket = TcpStream::connect(&addr, &h).map_err(|e| Error::IOErr(e));
let request = socket.and_then(move |socket| { let request = socket.and_then(move |socket| {
@ -128,8 +128,7 @@ impl Server {
let peer_connect = add_to_peers(peers, Peer::connect(socket, &Handshake::new())); let peer_connect = add_to_peers(peers, Peer::connect(socket, &Handshake::new()));
with_timeout(Box::new(peer_connect), &h) with_timeout(Box::new(peer_connect), &h)
}) })
.and_then(|(socket, peer)| peer.run(socket, &DummyAdapter {})) .and_then(|(socket, peer)| peer.run(socket, &DummyAdapter {}));
.map_err(|_| ());
Box::new(request) Box::new(request)
} }