Adapters between the chain and the network for block broadcast. First working test that spins up 5 servers, connects them all together, mine a block on one and watch it getting added to all the other servers chains. Some improvements still needed to automate the test properly.

This commit is contained in:
Ignotus Peverell 2016-12-20 17:39:02 -08:00
parent 17104977de
commit f067e18644
No known key found for this signature in database
GPG key ID: 99CD25F39F8F8211
5 changed files with 99 additions and 21 deletions

View file

@ -8,6 +8,7 @@ grin_chain = { path = "../chain" }
grin_core = { path = "../core" } grin_core = { path = "../core" }
grin_store = { path = "../store" } grin_store = { path = "../store" }
grin_p2p = { path = "../p2p" } grin_p2p = { path = "../p2p" }
grin_util = { path = "../util" }
secp256k1zkp = { path = "../secp256k1zkp" } secp256k1zkp = { path = "../secp256k1zkp" }
env_logger="^0.3.5" env_logger="^0.3.5"

View file

@ -14,12 +14,19 @@
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use chain; use chain::{self, ChainAdapter};
use core::core; use core::core;
use p2p::NetAdapter; use p2p::{NetAdapter, Server};
use util::OneTime;
/// Implementation of the NetAdapter for the blockchain. Gets notified when new
/// blocks and transactions are received and forwards to the chain and pool
/// implementations.
pub struct NetToChainAdapter { pub struct NetToChainAdapter {
/// the reference copy of the current chain state
chain_head: Arc<Mutex<chain::Tip>>,
chain_store: Arc<chain::ChainStore>, chain_store: Arc<chain::ChainStore>,
chain_adapter: Arc<ChainToNetAdapter>,
} }
impl NetAdapter for NetToChainAdapter { impl NetAdapter for NetToChainAdapter {
@ -27,16 +34,56 @@ impl NetAdapter for NetToChainAdapter {
unimplemented!(); unimplemented!();
} }
fn block_received(&self, b: core::Block) { fn block_received(&self, b: core::Block) {
// if let Err(e) = chain::process_block(&b, self.chain_store, // TODO delegate to a separate thread to avoid holding up the caller
// chain::pipe::NONE) { debug!("Received block {} from network, going to process.",
// debug!("Block {} refused by chain: {}", b.hash(), e); b.hash());
// } // pushing the new block through the chain pipeline
unimplemented!(); let store = self.chain_store.clone();
let chain_adapter = self.chain_adapter.clone();
let res = chain::process_block(&b, store, chain_adapter, chain::NONE);
// log errors and update the shared head reference on success
if let Err(e) = res {
debug!("Block {} refused by chain: {:?}", b.hash(), e);
} else if let Ok(Some(tip)) = res {
let chain_head = self.chain_head.clone();
let mut head = chain_head.lock().unwrap();
*head = tip;
}
} }
} }
impl NetToChainAdapter { impl NetToChainAdapter {
pub fn new(chain_store: Arc<chain::ChainStore>) -> NetToChainAdapter { pub fn new(chain_head: Arc<Mutex<chain::Tip>>,
NetToChainAdapter { chain_store: chain_store } chain_store: Arc<chain::ChainStore>,
chain_adapter: Arc<ChainToNetAdapter>)
-> NetToChainAdapter {
NetToChainAdapter {
chain_head: chain_head,
chain_store: chain_store,
chain_adapter: chain_adapter,
}
}
}
/// Implementation of the ChainAdapter for the network. Gets notified when the
/// blockchain accepted a new block and forwards it to the network for
/// broadcast.
pub struct ChainToNetAdapter {
p2p: OneTime<Arc<Server>>,
}
impl ChainAdapter for ChainToNetAdapter {
fn block_accepted(&self, b: &core::Block) {
self.p2p.borrow().broadcast_block(b);
}
}
impl ChainToNetAdapter {
pub fn new() -> ChainToNetAdapter {
ChainToNetAdapter { p2p: OneTime::new() }
}
pub fn init(&self, p2p: Arc<Server>) {
self.p2p.init(p2p);
} }
} }

View file

@ -33,6 +33,7 @@ extern crate grin_chain as chain;
extern crate grin_core as core; extern crate grin_core as core;
extern crate grin_p2p as p2p; extern crate grin_p2p as p2p;
extern crate grin_store as store; extern crate grin_store as store;
extern crate grin_util as util;
extern crate secp256k1zkp as secp; extern crate secp256k1zkp as secp;
mod adapters; mod adapters;

View file

@ -23,7 +23,7 @@ use std::thread;
use futures::Future; use futures::Future;
use tokio_core::reactor; use tokio_core::reactor;
use adapters::NetToChainAdapter; use adapters::{NetToChainAdapter, ChainToNetAdapter};
use chain; use chain;
use chain::ChainStore; use chain::ChainStore;
use core; use core;
@ -74,26 +74,37 @@ pub struct Server {
chain_head: Arc<Mutex<chain::Tip>>, chain_head: Arc<Mutex<chain::Tip>>,
/// data store access /// data store access
chain_store: Arc<chain::ChainStore>, chain_store: Arc<chain::ChainStore>,
/// chain adapter to net, required for miner and anything that submits
/// blocks
chain_adapter: Arc<ChainToNetAdapter>,
} }
impl Server { impl Server {
/// Instantiates and starts a new server. /// Instantiates and starts a new server.
pub fn start(config: ServerConfig) -> Result<Server, Error> { pub fn start(config: ServerConfig) -> Result<Server, Error> {
let (chain_store, head) = try!(store_head(&config)); let (chain_store, head) = try!(store_head(&config));
let shared_head = Arc::new(Mutex::new(head));
let chain_adapter = Arc::new(ChainToNetAdapter::new());
let net_adapter = Arc::new(NetToChainAdapter::new(shared_head.clone(),
chain_store.clone(),
chain_adapter.clone()));
let server = Arc::new(p2p::Server::new(config.p2p_config, net_adapter));
chain_adapter.init(server.clone());
let mut evtlp = reactor::Core::new().unwrap(); let mut evtlp = reactor::Core::new().unwrap();
let handle = evtlp.handle(); let handle = evtlp.handle();
let net_adapter = Arc::new(NetToChainAdapter::new(chain_store.clone()));
let server = Arc::new(p2p::Server::new(config.p2p_config, net_adapter));
evtlp.run(server.start(handle.clone())).unwrap(); evtlp.run(server.start(handle.clone())).unwrap();
warn!("Grin server started."); warn!("Grin server started.");
Ok(Server { Ok(Server {
config: config, config: config,
evt_handle: handle.clone(), evt_handle: handle.clone(),
p2p: server, p2p: server,
chain_head: Arc::new(Mutex::new(head)), chain_head: shared_head,
chain_store: chain_store, chain_store: chain_store,
chain_adapter: chain_adapter,
}) })
} }
@ -107,7 +118,9 @@ impl Server {
/// Start mining for blocks on a separate thread. Relies on a toy miner, /// Start mining for blocks on a separate thread. Relies on a toy miner,
/// mostly for testing. /// mostly for testing.
pub fn start_miner(&self) { pub fn start_miner(&self) {
let miner = miner::Miner::new(self.chain_head.clone(), self.chain_store.clone()); let miner = miner::Miner::new(self.chain_head.clone(),
self.chain_store.clone(),
self.chain_adapter.clone());
thread::spawn(move || { thread::spawn(move || {
miner.run_loop(); miner.run_loop();
}); });
@ -124,15 +137,24 @@ pub struct ServerFut {
chain_head: Arc<Mutex<chain::Tip>>, chain_head: Arc<Mutex<chain::Tip>>,
/// data store access /// data store access
chain_store: Arc<chain::ChainStore>, chain_store: Arc<chain::ChainStore>,
/// chain adapter to net, required for miner and anything that submits
/// blocks
chain_adapter: Arc<ChainToNetAdapter>,
} }
impl ServerFut { impl ServerFut {
/// Instantiates and starts a new server. /// Instantiates and starts a new server.
pub fn start(config: ServerConfig, evt_handle: &reactor::Handle) -> Result<Server, Error> { pub fn start(config: ServerConfig, evt_handle: &reactor::Handle) -> Result<Server, Error> {
let (chain_store, head) = try!(store_head(&config)); let (chain_store, head) = try!(store_head(&config));
let shared_head = Arc::new(Mutex::new(head));
let net_adapter = Arc::new(NetToChainAdapter::new(chain_store.clone())); let chain_adapter = Arc::new(ChainToNetAdapter::new());
let net_adapter = Arc::new(NetToChainAdapter::new(shared_head.clone(),
chain_store.clone(),
chain_adapter.clone()));
let server = Arc::new(p2p::Server::new(config.p2p_config, net_adapter)); let server = Arc::new(p2p::Server::new(config.p2p_config, net_adapter));
chain_adapter.init(server.clone());
evt_handle.spawn(server.start(evt_handle.clone()).map_err(|_| ())); evt_handle.spawn(server.start(evt_handle.clone()).map_err(|_| ()));
warn!("Grin server started."); warn!("Grin server started.");
@ -140,8 +162,9 @@ impl ServerFut {
config: config, config: config,
evt_handle: evt_handle.clone(), evt_handle: evt_handle.clone(),
p2p: server, p2p: server,
chain_head: Arc::new(Mutex::new(head)), chain_head: shared_head,
chain_store: chain_store, chain_store: chain_store,
chain_adapter: chain_adapter,
}) })
} }
@ -154,7 +177,9 @@ impl ServerFut {
/// Start mining for blocks on a separate thread. Relies on a toy miner, /// Start mining for blocks on a separate thread. Relies on a toy miner,
/// mostly for testing. /// mostly for testing.
pub fn start_miner(&self) { pub fn start_miner(&self) {
let miner = miner::Miner::new(self.chain_head.clone(), self.chain_store.clone()); let miner = miner::Miner::new(self.chain_head.clone(),
self.chain_store.clone(),
self.chain_adapter.clone());
thread::spawn(move || { thread::spawn(move || {
miner.run_loop(); miner.run_loop();
}); });
@ -172,12 +197,14 @@ fn store_head(config: &ServerConfig)
let head = match chain_store.head() { let head = match chain_store.head() {
Ok(tip) => tip, Ok(tip) => tip,
Err(chain::types::Error::NotFoundErr) => { Err(chain::types::Error::NotFoundErr) => {
debug!("No genesis block found, creating and saving one.");
let mut gen = core::genesis::genesis(); let mut gen = core::genesis::genesis();
if config.cuckoo_size > 0 { if config.cuckoo_size > 0 {
gen.header.cuckoo_len = config.cuckoo_size; gen.header.cuckoo_len = config.cuckoo_size;
} }
try!(chain_store.save_block(&gen).map_err(&Error::StoreErr));
let tip = chain::types::Tip::new(gen.hash()); let tip = chain::types::Tip::new(gen.hash());
try!(chain_store.save_tip(&tip).map_err(&Error::StoreErr)); try!(chain_store.save_head(&tip).map_err(&Error::StoreErr));
tip tip
} }
Err(e) => return Err(Error::StoreErr(e)), Err(e) => return Err(Error::StoreErr(e)),

View file

@ -41,7 +41,7 @@ fn simulate_servers() {
let s = grin::ServerFut::start( let s = grin::ServerFut::start(
grin::ServerConfig{ grin::ServerConfig{
db_root: format!("target/grin-{}", n), db_root: format!("target/grin-{}", n),
cuckoo_size: 18, cuckoo_size: 12,
p2p_config: p2p::P2PConfig{port: 10000+n, ..p2p::P2PConfig::default()} p2p_config: p2p::P2PConfig{port: 10000+n, ..p2p::P2PConfig::default()}
}, &handle).unwrap(); }, &handle).unwrap();
servers.push(s); servers.push(s);
@ -53,10 +53,12 @@ fn simulate_servers() {
if m == n { continue } if m == n { continue }
let addr = format!("{}:{}", "127.0.0.1", 10000+m); let addr = format!("{}:{}", "127.0.0.1", 10000+m);
servers[n].connect_peer(addr.parse().unwrap()).unwrap(); servers[n].connect_peer(addr.parse().unwrap()).unwrap();
println!("c {}", m);
} }
} }
let timeout = reactor::Timeout::new(time::Duration::new(1, 0), &handle.clone()).unwrap(); // start mining
servers[0].start_miner();
let timeout = reactor::Timeout::new(time::Duration::new(10, 0), &handle.clone()).unwrap();
evtlp.run(timeout); evtlp.run(timeout);
} }