diff --git a/core/src/core/block.rs b/core/src/core/block.rs index fcab672c1..7c02a6c28 100644 --- a/core/src/core/block.rs +++ b/core/src/core/block.rs @@ -119,7 +119,7 @@ impl Readable for BlockHeader { } /// A block as expressed in the MimbleWimble protocol. The reward is -/// non-explicit, assumed to be deductible from block height (similar to +/// non-explicit, assumed to be deducible from block height (similar to /// bitcoin's schedule) and expressed as a global transaction fee (added v.H), /// additive to the total of fees ever collected. pub struct Block { diff --git a/grin/src/lib.rs b/grin/src/lib.rs index 3c0971891..0edd314a0 100644 --- a/grin/src/lib.rs +++ b/grin/src/lib.rs @@ -38,4 +38,4 @@ extern crate secp256k1zkp as secp; mod miner; mod server; -pub use server::{Server, ServerConfig}; +pub use server::{Server, ServerFut, ServerConfig}; diff --git a/grin/src/server.rs b/grin/src/server.rs index e4824eba3..f763034bd 100644 --- a/grin/src/server.rs +++ b/grin/src/server.rs @@ -20,6 +20,7 @@ use std::net::SocketAddr; use std::sync::{Arc, Mutex}; use std::thread; +use futures::Future; use tokio_core::reactor; use chain; @@ -77,23 +78,7 @@ pub struct Server { impl Server { /// Instantiates and starts a new server. pub fn start(config: ServerConfig) -> Result { - let chain_store = try!(chain::store::ChainKVStore::new(config.db_root.clone()) - .map_err(&Error::StoreErr)); - - // check if we have a head in store, otherwise the genesis block is it - let head = match chain_store.head() { - Ok(tip) => tip, - Err(chain::types::Error::NotFoundErr) => { - let mut gen = core::genesis::genesis(); - if config.cuckoo_size > 0 { - gen.header.cuckoo_len = config.cuckoo_size; - } - let tip = chain::types::Tip::new(gen.hash()); - try!(chain_store.save_tip(&tip).map_err(&Error::StoreErr)); - tip - } - Err(e) => return Err(Error::StoreErr(e)), - }; + let (chain_store, head) = try!(store_head(&config)); let mut evtlp = reactor::Core::new().unwrap(); let handle = evtlp.handle(); @@ -113,7 +98,7 @@ impl Server { /// Asks the server to connect to a peer at the provided network address. pub fn connect_peer(&self, addr: SocketAddr) -> Result<(), Error> { let handle = self.evt_handle.clone(); - handle.spawn(self.p2p.connect_peer(addr, handle.clone())); + handle.spawn(self.p2p.connect_peer(addr, handle.clone()).map_err(|_| ())); Ok(()) } @@ -126,3 +111,72 @@ impl Server { }); } } + +/// Implementation of the server that doesn't take control of the event loop +/// and returns futures instead. +pub struct ServerFut { + config: ServerConfig, + /// handle to our network server + p2p: Arc, + /// the reference copy of the current chain state + chain_head: Arc>, + /// data store access + chain_store: Arc>, +} + +impl ServerFut { + /// Instantiates and starts a new server. + pub fn start(config: ServerConfig, evt_handle: &reactor::Handle) -> Result { + let (chain_store, head) = try!(store_head(&config)); + + let server = Arc::new(p2p::Server::new(config.p2p_config)); + evt_handle.spawn(server.start(evt_handle.clone()).map_err(|_| ())); + + warn!("Grin server started."); + Ok(Server { + config: config, + evt_handle: evt_handle.clone(), + p2p: server, + chain_head: Arc::new(Mutex::new(head)), + chain_store: Arc::new(Mutex::new(chain_store)), + }) + } + + /// Asks the server to connect to a peer at the provided network address. + pub fn connect_peer(&self, addr: SocketAddr, handle: &reactor::Handle) -> Result<(), Error> { + handle.spawn(self.p2p.connect_peer(addr, handle.clone()).map_err(|_| ())); + Ok(()) + } + + /// Start mining for blocks on a separate thread. Relies on a toy miner, + /// mostly for testing. + pub fn start_miner(&self) { + let miner = miner::Miner::new(self.chain_head.clone(), self.chain_store.clone()); + thread::spawn(move || { + miner.run_loop(); + }); + } +} + +// Helper function to create the chain storage and check if it already has a +// genesis block +fn store_head(config: &ServerConfig) -> Result<(chain::store::ChainKVStore, chain::Tip), Error> { + let chain_store = try!(chain::store::ChainKVStore::new(config.db_root.clone()) + .map_err(&Error::StoreErr)); + + // check if we have a head in store, otherwise the genesis block is it + let head = match chain_store.head() { + Ok(tip) => tip, + Err(chain::types::Error::NotFoundErr) => { + let mut gen = core::genesis::genesis(); + if config.cuckoo_size > 0 { + gen.header.cuckoo_len = config.cuckoo_size; + } + let tip = chain::types::Tip::new(gen.hash()); + try!(chain_store.save_tip(&tip).map_err(&Error::StoreErr)); + tip + } + Err(e) => return Err(Error::StoreErr(e)), + }; + Ok((chain_store, head)) +} diff --git a/grin/tests/simulnet.rs b/grin/tests/simulnet.rs index 81b5b57e5..c9f16d01f 100644 --- a/grin/tests/simulnet.rs +++ b/grin/tests/simulnet.rs @@ -16,32 +16,37 @@ extern crate grin_grin as grin; extern crate grin_core as core; extern crate grin_p2p as p2p; extern crate grin_chain as chain; + extern crate env_logger; +extern crate futures; +extern crate tokio_core; use std::io; use std::thread; use std::time; +use futures::Future; +use tokio_core::reactor; + #[test] fn simulate_servers() { env_logger::init().unwrap(); + let mut evtlp = reactor::Core::new().unwrap(); + let handle = evtlp.handle(); + // instantiates 5 servers on different ports let mut servers = vec![]; for n in 0..5 { - thread::spawn(move || { - let s = grin::Server::start( + let s = grin::ServerFut::start( grin::ServerConfig{ db_root: format!("target/grin-{}", n), cuckoo_size: 18, p2p_config: p2p::P2PConfig{port: 10000+n, ..p2p::P2PConfig::default()} - }).unwrap(); + }, &handle).unwrap(); servers.push(s); - }); } - thread::sleep(time::Duration::from_millis(100)); - // everyone connects to everyone else for n in 0..5 { for m in 0..5 { @@ -52,4 +57,6 @@ fn simulate_servers() { } } + let timeout = reactor::Timeout::new(time::Duration::new(1, 0), &handle.clone()).unwrap(); + evtlp.run(timeout); } diff --git a/p2p/src/server.rs b/p2p/src/server.rs index a128492c9..6ce52dd87 100644 --- a/p2p/src/server.rs +++ b/p2p/src/server.rs @@ -117,7 +117,7 @@ impl Server { pub fn connect_peer(&self, addr: SocketAddr, h: reactor::Handle) - -> Box> { + -> Box> { let peers = self.peers.clone(); let socket = TcpStream::connect(&addr, &h).map_err(|e| Error::IOErr(e)); let request = socket.and_then(move |socket| { @@ -128,8 +128,7 @@ impl Server { let peer_connect = add_to_peers(peers, Peer::connect(socket, &Handshake::new())); with_timeout(Box::new(peer_connect), &h) }) - .and_then(|(socket, peer)| peer.run(socket, &DummyAdapter {})) - .map_err(|_| ()); + .and_then(|(socket, peer)| peer.run(socket, &DummyAdapter {})); Box::new(request) }