From e688ff99e61293df3b3c924d69734dc35374029e Mon Sep 17 00:00:00 2001 From: Ignotus Peverell Date: Sun, 18 Dec 2016 15:51:54 -0800 Subject: [PATCH] Started putting in place the adapter between the chain and p2p modules to forward blocks and transactions. Cleaned up chain store references. --- chain/src/pipe.rs | 8 +++-- chain/src/store.rs | 3 ++ chain/src/types.rs | 2 +- chain/tests/mine_simple_chain.rs | 7 ++-- grin/src/adapters.rs | 42 +++++++++++++++++++++++ grin/src/lib.rs | 1 + grin/src/miner.rs | 12 +++---- grin/src/server.rs | 20 ++++++----- p2p/src/lib.rs | 2 +- p2p/src/peer.rs | 5 ++- p2p/src/protocol.rs | 58 ++++++++++++++++++-------------- p2p/src/server.rs | 12 +++++-- p2p/src/types.rs | 5 ++- p2p/tests/peer_handshake.rs | 5 +-- store/src/lib.rs | 16 ++++++--- 15 files changed, 137 insertions(+), 61 deletions(-) create mode 100644 grin/src/adapters.rs diff --git a/chain/src/pipe.rs b/chain/src/pipe.rs index 0d03af02f..239fd9551 100644 --- a/chain/src/pipe.rs +++ b/chain/src/pipe.rs @@ -14,6 +14,8 @@ //! Implementation of the chain block acceptance (or refusal) pipeline. +use std::sync::{Arc, Mutex}; + use secp; use time; @@ -36,9 +38,9 @@ bitflags! { /// Contextual information required to process a new block and either reject or /// accept it. -pub struct BlockContext<'a> { +pub struct BlockContext { opts: Options, - store: &'a ChainStore, + store: Arc, head: Tip, tip: Option, } @@ -59,7 +61,7 @@ pub enum Error { StoreErr(types::Error), } -pub fn process_block(b: &Block, store: &ChainStore, opts: Options) -> Result<(), Error> { +pub fn process_block(b: &Block, store: Arc, opts: Options) -> Result<(), Error> { // TODO should just take a promise for a block with a full header so we don't // spend resources reading the full block when its header is invalid diff --git a/chain/src/store.rs b/chain/src/store.rs index a3c5415d8..bbb256685 100644 --- a/chain/src/store.rs +++ b/chain/src/store.rs @@ -36,6 +36,9 @@ pub struct ChainKVStore { db: grin_store::Store, } +unsafe impl Sync for ChainKVStore {} +unsafe impl Send for ChainKVStore {} + impl ChainKVStore { pub fn new(root_path: String) -> Result { let db = try!(grin_store::Store::open(format!("{}/{}", root_path, STORE_SUBPATH).as_str()) diff --git a/chain/src/types.rs b/chain/src/types.rs index 22ad9b633..b58099207 100644 --- a/chain/src/types.rs +++ b/chain/src/types.rs @@ -133,7 +133,7 @@ pub enum Error { /// Trait the chain pipeline requires an implementor for in order to process /// blocks. -pub trait ChainStore: Send { +pub trait ChainStore: Send + Sync { /// Get the tip that's also the head of the chain fn head(&self) -> Result; diff --git a/chain/tests/mine_simple_chain.rs b/chain/tests/mine_simple_chain.rs index 428409e46..e426aef09 100644 --- a/chain/tests/mine_simple_chain.rs +++ b/chain/tests/mine_simple_chain.rs @@ -17,6 +17,7 @@ extern crate grin_chain; extern crate rand; extern crate secp256k1zkp as secp; +use std::sync::Arc; use rand::os::OsRng; use grin_chain::types::*; @@ -26,7 +27,6 @@ use grin_core::consensus; #[test] fn mine_empty_chain() { - let curve = secp::Secp256k1::with_caps(secp::ContextFlag::Commit); let mut rng = OsRng::new().unwrap(); let store = grin_chain::store::ChainKVStore::new(".grin".to_string()).unwrap(); @@ -42,6 +42,7 @@ fn mine_empty_chain() { let mut prev = gen; let secp = secp::Secp256k1::with_caps(secp::ContextFlag::Commit); let reward_key = secp::key::SecretKey::new(&secp, &mut rng); + let arc_store = Arc::new(store); for n in 1..4 { let mut b = core::Block::new(&prev.header, vec![], reward_key).unwrap(); @@ -55,10 +56,10 @@ fn mine_empty_chain() { b.header.pow = proof; b.header.nonce = nonce; b.header.target = diff_target; - grin_chain::pipe::process_block(&b, &store, grin_chain::pipe::EASY_POW).unwrap(); + grin_chain::pipe::process_block(&b, arc_store.clone(), grin_chain::pipe::EASY_POW).unwrap(); // checking our new head - let head = store.head().unwrap(); + let head = arc_store.clone().head().unwrap(); assert_eq!(head.height, n); assert_eq!(head.last_block_h, b.hash()); diff --git a/grin/src/adapters.rs b/grin/src/adapters.rs new file mode 100644 index 000000000..f2ccdccbb --- /dev/null +++ b/grin/src/adapters.rs @@ -0,0 +1,42 @@ +// Copyright 2016 The Grin Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::{Arc, Mutex}; + +use chain; +use core::core; +use p2p::NetAdapter; + +pub struct NetToChainAdapter { + chain_store: Arc, +} + +impl NetAdapter for NetToChainAdapter { + fn transaction_received(&self, tx: core::Transaction) { + unimplemented!(); + } + fn block_received(&self, b: core::Block) { + // if let Err(e) = chain::process_block(&b, self.chain_store, + // chain::pipe::NONE) { + // debug!("Block {} refused by chain: {}", b.hash(), e); + // } + unimplemented!(); + } +} + +impl NetToChainAdapter { + pub fn new(chain_store: Arc) -> NetToChainAdapter { + NetToChainAdapter { chain_store: chain_store } + } +} diff --git a/grin/src/lib.rs b/grin/src/lib.rs index 0edd314a0..962108fef 100644 --- a/grin/src/lib.rs +++ b/grin/src/lib.rs @@ -35,6 +35,7 @@ extern crate grin_p2p as p2p; extern crate grin_store as store; extern crate secp256k1zkp as secp; +mod adapters; mod miner; mod server; diff --git a/grin/src/miner.rs b/grin/src/miner.rs index b857092fe..33a1a4b1f 100644 --- a/grin/src/miner.rs +++ b/grin/src/miner.rs @@ -30,15 +30,13 @@ use secp; pub struct Miner { chain_head: Arc>, - chain_store: Arc>, + chain_store: Arc, } impl Miner { /// Creates a new Miner. Needs references to the chain state and its /// storage. - pub fn new(chain_head: Arc>, - chain_store: Arc>) - -> Miner { + pub fn new(chain_head: Arc>, chain_store: Arc) -> Miner { Miner { chain_head: chain_head, chain_store: chain_store, @@ -54,7 +52,7 @@ impl Miner { let head: core::BlockHeader; let mut latest_hash: Hash; { - head = self.chain_store.lock().unwrap().head_header().unwrap(); + head = self.chain_store.head_header().unwrap(); latest_hash = self.chain_head.lock().unwrap().last_block_h; } let b = self.build_block(&head); @@ -86,9 +84,7 @@ impl Miner { // if we found a solution, push our block out if let Some(proof) = sol { info!("Found valid proof of work, adding block {}.", b.hash()); - if let Err(e) = chain::process_block(&b, - self.chain_store.lock().unwrap().deref(), - chain::NONE) { + if let Err(e) = chain::process_block(&b, self.chain_store.clone(), chain::NONE) { error!("Error validating mined block: {:?}", e); } } else { diff --git a/grin/src/server.rs b/grin/src/server.rs index f763034bd..610ac6e8c 100644 --- a/grin/src/server.rs +++ b/grin/src/server.rs @@ -23,6 +23,7 @@ use std::thread; use futures::Future; use tokio_core::reactor; +use adapters::NetToChainAdapter; use chain; use chain::ChainStore; use core; @@ -72,7 +73,7 @@ pub struct Server { /// the reference copy of the current chain state chain_head: Arc>, /// data store access - chain_store: Arc>, + chain_store: Arc, } impl Server { @@ -82,7 +83,8 @@ impl Server { let mut evtlp = reactor::Core::new().unwrap(); let handle = evtlp.handle(); - let server = Arc::new(p2p::Server::new(config.p2p_config)); + let net_adapter = Arc::new(NetToChainAdapter::new(chain_store.clone())); + let server = Arc::new(p2p::Server::new(config.p2p_config, net_adapter)); evtlp.run(server.start(handle.clone())).unwrap(); warn!("Grin server started."); @@ -91,7 +93,7 @@ impl Server { evt_handle: handle.clone(), p2p: server, chain_head: Arc::new(Mutex::new(head)), - chain_store: Arc::new(Mutex::new(chain_store)), + chain_store: chain_store, }) } @@ -121,7 +123,7 @@ pub struct ServerFut { /// the reference copy of the current chain state chain_head: Arc>, /// data store access - chain_store: Arc>, + chain_store: Arc, } impl ServerFut { @@ -129,7 +131,8 @@ impl ServerFut { pub fn start(config: ServerConfig, evt_handle: &reactor::Handle) -> Result { let (chain_store, head) = try!(store_head(&config)); - let server = Arc::new(p2p::Server::new(config.p2p_config)); + let net_adapter = Arc::new(NetToChainAdapter::new(chain_store.clone())); + let server = Arc::new(p2p::Server::new(config.p2p_config, net_adapter)); evt_handle.spawn(server.start(evt_handle.clone()).map_err(|_| ())); warn!("Grin server started."); @@ -138,7 +141,7 @@ impl ServerFut { evt_handle: evt_handle.clone(), p2p: server, chain_head: Arc::new(Mutex::new(head)), - chain_store: Arc::new(Mutex::new(chain_store)), + chain_store: chain_store, }) } @@ -160,7 +163,8 @@ impl ServerFut { // Helper function to create the chain storage and check if it already has a // genesis block -fn store_head(config: &ServerConfig) -> Result<(chain::store::ChainKVStore, chain::Tip), Error> { +fn store_head(config: &ServerConfig) + -> Result<(Arc, chain::Tip), Error> { let chain_store = try!(chain::store::ChainKVStore::new(config.db_root.clone()) .map_err(&Error::StoreErr)); @@ -178,5 +182,5 @@ fn store_head(config: &ServerConfig) -> Result<(chain::store::ChainKVStore, chai } Err(e) => return Err(Error::StoreErr(e)), }; - Ok((chain_store, head)) + Ok((Arc::new(chain_store), head)) } diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index c6c43377f..4905136b3 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -44,4 +44,4 @@ mod types; pub use server::{Server, DummyAdapter}; pub use peer::Peer; -pub use types::P2PConfig; +pub use types::{P2PConfig, NetAdapter}; diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 8456ef1b0..732eaee7d 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -57,7 +57,10 @@ impl Peer { Box::new(hs_peer) } - pub fn run(&self, conn: TcpStream, na: Arc) -> Box> { + pub fn run(&self, + conn: TcpStream, + na: Arc) + -> Box> { self.proto.handle(conn, na) } diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index cbd20babf..fb4c354df 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -125,7 +125,7 @@ impl ProtocolV1 { let read_msg = iter.fold(reader, move |reader, _| { let mut sender_inner = sender.clone(); let recv_bytes = recv_bytes.clone(); - let adapter = adapter.clone(); + let adapter = adapter.clone(); // first read the message header read_exact(reader, vec![0u8; HEADER_LEN as usize]) @@ -135,8 +135,10 @@ impl ProtocolV1 { Ok((reader, header)) }) .and_then(move |(reader, header)| { - // now that we have a size, proceed with the body - read_exact(reader, vec![0u8; header.msg_len as usize]).map(|(reader, buf)| { (reader, header, buf) }).map_err(|e| ser::Error::IOErr(e)) + // now that we have a size, proceed with the body + read_exact(reader, vec![0u8; header.msg_len as usize]) + .map(|(reader, buf)| (reader, header, buf)) + .map_err(|e| ser::Error::IOErr(e)) }) .map(move |(reader, header, buf)| { // add the count of bytes received @@ -144,9 +146,9 @@ impl ProtocolV1 { *recv_bytes += header.serialized_len() + header.msg_len; // and handle the different message types - if let Err(e) = handle_payload(adapter, &header, buf, &mut sender_inner) { - debug!("Invalid {:?} message: {}", header.msg_type, e); - } + if let Err(e) = handle_payload(adapter, &header, buf, &mut sender_inner) { + debug!("Invalid {:?} message: {}", header.msg_type, e); + } reader }) @@ -192,24 +194,28 @@ impl ProtocolV1 { } } -fn handle_payload(adapter: Arc, header: &MsgHeader, buf: Vec, sender: &mut UnboundedSender>) -> Result<(), ser::Error> { - match header.msg_type { - Type::Ping => { - let data = try!(ser::ser_vec(&MsgHeader::new(Type::Pong, 0))); - sender.send(data); - } - Type::Pong => {} - Type::Transaction => { - let tx = try!(ser::deserialize::(&mut &buf[..])); - adapter.transaction_received(tx); - } - Type::Block => { - let b = try!(ser::deserialize::(&mut &buf[..])); - adapter.block_received(b); - } - _ => { - debug!("unknown message type {:?}", header.msg_type); - } - }; - Ok(()) +fn handle_payload(adapter: Arc, + header: &MsgHeader, + buf: Vec, + sender: &mut UnboundedSender>) + -> Result<(), ser::Error> { + match header.msg_type { + Type::Ping => { + let data = try!(ser::ser_vec(&MsgHeader::new(Type::Pong, 0))); + sender.send(data); + } + Type::Pong => {} + Type::Transaction => { + let tx = try!(ser::deserialize::(&mut &buf[..])); + adapter.transaction_received(tx); + } + Type::Block => { + let b = try!(ser::deserialize::(&mut &buf[..])); + adapter.block_received(b); + } + _ => { + debug!("unknown message type {:?}", header.msg_type); + } + }; + Ok(()) } diff --git a/p2p/src/server.rs b/p2p/src/server.rs index 2901da6f6..6ae2c04c3 100644 --- a/p2p/src/server.rs +++ b/p2p/src/server.rs @@ -45,6 +45,7 @@ impl NetAdapter for DummyAdapter { pub struct Server { config: P2PConfig, peers: Arc>>>, + adapter: Arc, stop: RefCell>>, } @@ -54,10 +55,11 @@ unsafe impl Send for Server {} // TODO TLS impl Server { /// Creates a new idle p2p server with no peers - pub fn new(config: P2PConfig) -> Server { + pub fn new(config: P2PConfig, adapter: Arc) -> Server { Server { config: config, peers: Arc::new(RwLock::new(Vec::new())), + adapter: adapter, stop: RefCell::new(None), } } @@ -71,10 +73,12 @@ impl Server { let hs = Arc::new(Handshake::new()); let peers = self.peers.clone(); + let adapter = self.adapter.clone(); // main peer acceptance future handling handshake let hp = h.clone(); let peers = socket.incoming().map_err(|e| Error::IOErr(e)).map(move |(conn, addr)| { + let adapter = adapter.clone(); let peers = peers.clone(); // accept the peer and add it to the server map @@ -84,7 +88,7 @@ impl Server { let timed_peer = with_timeout(Box::new(peer_accept), &hp); // run the main peer protocol - timed_peer.and_then(|(conn, peer)| peer.clone().run(conn, Arc::new(DummyAdapter {}))) + timed_peer.and_then(move |(conn, peer)| peer.clone().run(conn, adapter)) }); // spawn each peer future to its own task @@ -120,6 +124,8 @@ impl Server { h: reactor::Handle) -> Box> { let peers = self.peers.clone(); + let adapter = self.adapter.clone(); + let socket = TcpStream::connect(&addr, &h).map_err(|e| Error::IOErr(e)); let request = socket.and_then(move |socket| { let peers = peers.clone(); @@ -129,7 +135,7 @@ impl Server { let peer_connect = add_to_peers(peers, Peer::connect(socket, &Handshake::new())); with_timeout(Box::new(peer_connect), &h) }) - .and_then(|(socket, peer)| peer.run(socket, Arc::new(DummyAdapter {}))); + .and_then(move |(socket, peer)| peer.run(socket, adapter)); Box::new(request) } diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 0dfafcc9d..79360bf5c 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -66,7 +66,10 @@ pub trait Protocol { /// be known already, usually passed during construction. Will typically /// block so needs to be called withing a coroutine. Should also be called /// only once. - fn handle(&self, conn: TcpStream, na: Arc) -> Box>; + fn handle(&self, + conn: TcpStream, + na: Arc) + -> Box>; /// Sends a ping message to the remote peer. fn send_ping(&self) -> Result<(), Error>; diff --git a/p2p/tests/peer_handshake.rs b/p2p/tests/peer_handshake.rs index 2b4875f47..8cb0df1d2 100644 --- a/p2p/tests/peer_handshake.rs +++ b/p2p/tests/peer_handshake.rs @@ -37,7 +37,8 @@ fn peer_handshake() { let mut evtlp = Core::new().unwrap(); let handle = evtlp.handle(); let p2p_conf = p2p::P2PConfig::default(); - let server = p2p::Server::new(p2p_conf); + let net_adapter = Arc::new(p2p::DummyAdapter{}); + let server = p2p::Server::new(p2p_conf, net_adapter.clone()); let run_server = server.start(handle.clone()); let phandle = handle.clone(); @@ -51,7 +52,7 @@ fn peer_handshake() { socket.and_then(move |socket| { Peer::connect(socket, &p2p::handshake::Handshake::new()) }).and_then(move |(socket, peer)| { - rhandle.spawn(peer.run(socket, Arc::new(p2p::DummyAdapter {})).map_err(|e| { + rhandle.spawn(peer.run(socket, net_adapter.clone()).map_err(|e| { panic!("Client run failed: {}", e); })); peer.send_ping().unwrap(); diff --git a/store/src/lib.rs b/store/src/lib.rs index 61d952e06..d81b166f1 100644 --- a/store/src/lib.rs +++ b/store/src/lib.rs @@ -50,6 +50,9 @@ pub struct Store { rdb: RwLock, } +unsafe impl Sync for Store {} +unsafe impl Send for Store {} + impl Store { /// Opens a new RocksDB at the specified location. pub fn open(path: &str) -> Result { @@ -87,15 +90,20 @@ impl Store { /// Gets a `Readable` value from the db, provided its key. Encapsulates /// serialization. pub fn get_ser>(&self, key: &[u8]) -> Result, Error> { - self.get_ser_limited(key, 0) + self.get_ser_limited(key, 0) } - /// Gets a `Readable` value from the db, provided its key, allowing to extract only partial data. The underlying Readable size must align accordingly. Encapsulates serialization. - pub fn get_ser_limited>(&self, key: &[u8], len: usize) -> Result, Error> { + /// Gets a `Readable` value from the db, provided its key, allowing to + /// extract only partial data. The underlying Readable size must align + /// accordingly. Encapsulates serialization. + pub fn get_ser_limited>(&self, + key: &[u8], + len: usize) + -> Result, Error> { let data = try!(self.get(key)); match data { Some(val) => { - let mut lval = if len > 0 { &val[..len] } else { &val[..] }; + let mut lval = if len > 0 { &val[..len] } else { &val[..] }; let r = try!(ser::deserialize(&mut lval).map_err(Error::SerErr)); Ok(Some(r)) }