From f067e18644d1125159dfb3b48891ccdd7dddc282 Mon Sep 17 00:00:00 2001 From: Ignotus Peverell Date: Tue, 20 Dec 2016 17:39:02 -0800 Subject: [PATCH] Adapters between the chain and the network for block broadcast. First working test that spins up 5 servers, connects them all together, mine a block on one and watch it getting added to all the other servers chains. Some improvements still needed to automate the test properly. --- grin/Cargo.toml | 1 + grin/src/adapters.rs | 65 ++++++++++++++++++++++++++++++++++++------ grin/src/lib.rs | 1 + grin/src/server.rs | 45 +++++++++++++++++++++++------ grin/tests/simulnet.rs | 8 ++++-- 5 files changed, 99 insertions(+), 21 deletions(-) diff --git a/grin/Cargo.toml b/grin/Cargo.toml index 973776018..9c215f75d 100644 --- a/grin/Cargo.toml +++ b/grin/Cargo.toml @@ -8,6 +8,7 @@ grin_chain = { path = "../chain" } grin_core = { path = "../core" } grin_store = { path = "../store" } grin_p2p = { path = "../p2p" } +grin_util = { path = "../util" } secp256k1zkp = { path = "../secp256k1zkp" } env_logger="^0.3.5" diff --git a/grin/src/adapters.rs b/grin/src/adapters.rs index f2ccdccbb..295e10424 100644 --- a/grin/src/adapters.rs +++ b/grin/src/adapters.rs @@ -14,12 +14,19 @@ use std::sync::{Arc, Mutex}; -use chain; +use chain::{self, ChainAdapter}; use core::core; -use p2p::NetAdapter; +use p2p::{NetAdapter, Server}; +use util::OneTime; +/// Implementation of the NetAdapter for the blockchain. Gets notified when new +/// blocks and transactions are received and forwards to the chain and pool +/// implementations. pub struct NetToChainAdapter { + /// the reference copy of the current chain state + chain_head: Arc>, chain_store: Arc, + chain_adapter: Arc, } impl NetAdapter for NetToChainAdapter { @@ -27,16 +34,56 @@ impl NetAdapter for NetToChainAdapter { unimplemented!(); } fn block_received(&self, b: core::Block) { - // if let Err(e) = chain::process_block(&b, self.chain_store, - // chain::pipe::NONE) { - // debug!("Block {} refused by chain: {}", b.hash(), e); - // } - unimplemented!(); + // TODO delegate to a separate thread to avoid holding up the caller + debug!("Received block {} from network, going to process.", + b.hash()); + // pushing the new block through the chain pipeline + let store = self.chain_store.clone(); + let chain_adapter = self.chain_adapter.clone(); + let res = chain::process_block(&b, store, chain_adapter, chain::NONE); + + // log errors and update the shared head reference on success + if let Err(e) = res { + debug!("Block {} refused by chain: {:?}", b.hash(), e); + } else if let Ok(Some(tip)) = res { + let chain_head = self.chain_head.clone(); + let mut head = chain_head.lock().unwrap(); + *head = tip; + } } } impl NetToChainAdapter { - pub fn new(chain_store: Arc) -> NetToChainAdapter { - NetToChainAdapter { chain_store: chain_store } + pub fn new(chain_head: Arc>, + chain_store: Arc, + chain_adapter: Arc) + -> NetToChainAdapter { + NetToChainAdapter { + chain_head: chain_head, + chain_store: chain_store, + chain_adapter: chain_adapter, + } + } +} + +/// Implementation of the ChainAdapter for the network. Gets notified when the +/// blockchain accepted a new block and forwards it to the network for +/// broadcast. +pub struct ChainToNetAdapter { + p2p: OneTime>, +} + +impl ChainAdapter for ChainToNetAdapter { + fn block_accepted(&self, b: &core::Block) { + self.p2p.borrow().broadcast_block(b); + } +} + +impl ChainToNetAdapter { + pub fn new() -> ChainToNetAdapter { + ChainToNetAdapter { p2p: OneTime::new() } + } + pub fn init(&self, p2p: Arc) { + self.p2p.init(p2p); } } diff --git a/grin/src/lib.rs b/grin/src/lib.rs index 962108fef..6cf508e97 100644 --- a/grin/src/lib.rs +++ b/grin/src/lib.rs @@ -33,6 +33,7 @@ extern crate grin_chain as chain; extern crate grin_core as core; extern crate grin_p2p as p2p; extern crate grin_store as store; +extern crate grin_util as util; extern crate secp256k1zkp as secp; mod adapters; diff --git a/grin/src/server.rs b/grin/src/server.rs index 610ac6e8c..b26483a95 100644 --- a/grin/src/server.rs +++ b/grin/src/server.rs @@ -23,7 +23,7 @@ use std::thread; use futures::Future; use tokio_core::reactor; -use adapters::NetToChainAdapter; +use adapters::{NetToChainAdapter, ChainToNetAdapter}; use chain; use chain::ChainStore; use core; @@ -74,26 +74,37 @@ pub struct Server { chain_head: Arc>, /// data store access chain_store: Arc, + /// chain adapter to net, required for miner and anything that submits + /// blocks + chain_adapter: Arc, } impl Server { /// Instantiates and starts a new server. pub fn start(config: ServerConfig) -> Result { let (chain_store, head) = try!(store_head(&config)); + let shared_head = Arc::new(Mutex::new(head)); + + let chain_adapter = Arc::new(ChainToNetAdapter::new()); + let net_adapter = Arc::new(NetToChainAdapter::new(shared_head.clone(), + chain_store.clone(), + chain_adapter.clone())); + let server = Arc::new(p2p::Server::new(config.p2p_config, net_adapter)); + chain_adapter.init(server.clone()); let mut evtlp = reactor::Core::new().unwrap(); let handle = evtlp.handle(); - let net_adapter = Arc::new(NetToChainAdapter::new(chain_store.clone())); - let server = Arc::new(p2p::Server::new(config.p2p_config, net_adapter)); evtlp.run(server.start(handle.clone())).unwrap(); + warn!("Grin server started."); Ok(Server { config: config, evt_handle: handle.clone(), p2p: server, - chain_head: Arc::new(Mutex::new(head)), + chain_head: shared_head, chain_store: chain_store, + chain_adapter: chain_adapter, }) } @@ -107,7 +118,9 @@ impl Server { /// Start mining for blocks on a separate thread. Relies on a toy miner, /// mostly for testing. pub fn start_miner(&self) { - let miner = miner::Miner::new(self.chain_head.clone(), self.chain_store.clone()); + let miner = miner::Miner::new(self.chain_head.clone(), + self.chain_store.clone(), + self.chain_adapter.clone()); thread::spawn(move || { miner.run_loop(); }); @@ -124,15 +137,24 @@ pub struct ServerFut { chain_head: Arc>, /// data store access chain_store: Arc, + /// chain adapter to net, required for miner and anything that submits + /// blocks + chain_adapter: Arc, } impl ServerFut { /// Instantiates and starts a new server. pub fn start(config: ServerConfig, evt_handle: &reactor::Handle) -> Result { let (chain_store, head) = try!(store_head(&config)); + let shared_head = Arc::new(Mutex::new(head)); - let net_adapter = Arc::new(NetToChainAdapter::new(chain_store.clone())); + let chain_adapter = Arc::new(ChainToNetAdapter::new()); + let net_adapter = Arc::new(NetToChainAdapter::new(shared_head.clone(), + chain_store.clone(), + chain_adapter.clone())); let server = Arc::new(p2p::Server::new(config.p2p_config, net_adapter)); + chain_adapter.init(server.clone()); + evt_handle.spawn(server.start(evt_handle.clone()).map_err(|_| ())); warn!("Grin server started."); @@ -140,8 +162,9 @@ impl ServerFut { config: config, evt_handle: evt_handle.clone(), p2p: server, - chain_head: Arc::new(Mutex::new(head)), + chain_head: shared_head, chain_store: chain_store, + chain_adapter: chain_adapter, }) } @@ -154,7 +177,9 @@ impl ServerFut { /// Start mining for blocks on a separate thread. Relies on a toy miner, /// mostly for testing. pub fn start_miner(&self) { - let miner = miner::Miner::new(self.chain_head.clone(), self.chain_store.clone()); + let miner = miner::Miner::new(self.chain_head.clone(), + self.chain_store.clone(), + self.chain_adapter.clone()); thread::spawn(move || { miner.run_loop(); }); @@ -172,12 +197,14 @@ fn store_head(config: &ServerConfig) let head = match chain_store.head() { Ok(tip) => tip, Err(chain::types::Error::NotFoundErr) => { + debug!("No genesis block found, creating and saving one."); let mut gen = core::genesis::genesis(); if config.cuckoo_size > 0 { gen.header.cuckoo_len = config.cuckoo_size; } + try!(chain_store.save_block(&gen).map_err(&Error::StoreErr)); let tip = chain::types::Tip::new(gen.hash()); - try!(chain_store.save_tip(&tip).map_err(&Error::StoreErr)); + try!(chain_store.save_head(&tip).map_err(&Error::StoreErr)); tip } Err(e) => return Err(Error::StoreErr(e)), diff --git a/grin/tests/simulnet.rs b/grin/tests/simulnet.rs index c9f16d01f..bfcb765b6 100644 --- a/grin/tests/simulnet.rs +++ b/grin/tests/simulnet.rs @@ -41,7 +41,7 @@ fn simulate_servers() { let s = grin::ServerFut::start( grin::ServerConfig{ db_root: format!("target/grin-{}", n), - cuckoo_size: 18, + cuckoo_size: 12, p2p_config: p2p::P2PConfig{port: 10000+n, ..p2p::P2PConfig::default()} }, &handle).unwrap(); servers.push(s); @@ -53,10 +53,12 @@ fn simulate_servers() { if m == n { continue } let addr = format!("{}:{}", "127.0.0.1", 10000+m); servers[n].connect_peer(addr.parse().unwrap()).unwrap(); - println!("c {}", m); } } - let timeout = reactor::Timeout::new(time::Duration::new(1, 0), &handle.clone()).unwrap(); + // start mining + servers[0].start_miner(); + + let timeout = reactor::Timeout::new(time::Duration::new(10, 0), &handle.clone()).unwrap(); evtlp.run(timeout); }