From 4af049a887619989740b221b6f2f09f74ce72961 Mon Sep 17 00:00:00 2001 From: Ignotus Peverell Date: Tue, 7 Feb 2017 13:52:17 -0800 Subject: [PATCH] Added full node sync mode. Follows closely the bitcoin header-first sync. Related p2p messages and protocol. --- grin/src/adapters.rs | 121 ++++++++++++++++++- grin/src/lib.rs | 1 + grin/src/miner.rs | 2 - grin/src/server.rs | 13 +- grin/src/sync.rs | 233 ++++++++++++++++++++++++++++++++++++ grin/tests/simulnet.rs | 37 +++++- p2p/src/conn.rs | 18 +-- p2p/src/handshake.rs | 13 +- p2p/src/lib.rs | 2 +- p2p/src/msg.rs | 93 +++++++++++--- p2p/src/peer.rs | 27 ++++- p2p/src/protocol.rs | 60 ++++++++-- p2p/src/server.rs | 52 +++++++- p2p/src/types.rs | 36 +++++- p2p/tests/peer_handshake.rs | 5 +- 15 files changed, 643 insertions(+), 70 deletions(-) create mode 100644 grin/src/sync.rs diff --git a/grin/src/adapters.rs b/grin/src/adapters.rs index e07dd5714..8e8fcc0bf 100644 --- a/grin/src/adapters.rs +++ b/grin/src/adapters.rs @@ -12,12 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::ops::Deref; use std::sync::{Arc, Mutex}; +use std::thread; use chain::{self, ChainAdapter}; use core::core; -use p2p::{NetAdapter, Server}; +use core::core::hash::{Hash, Hashed}; +use core::core::target::Difficulty; +use p2p::{self, NetAdapter, Server}; use util::OneTime; +use sync; /// Implementation of the NetAdapter for the blockchain. Gets notified when new /// blocks and transactions are received and forwards to the chain and pool @@ -27,23 +32,32 @@ pub struct NetToChainAdapter { chain_head: Arc>, chain_store: Arc, chain_adapter: Arc, + + syncer: OneTime>, } impl NetAdapter for NetToChainAdapter { - fn height(&self) -> u64 { - self.chain_head.lock().unwrap().height + fn total_difficulty(&self) -> Difficulty { + self.chain_head.lock().unwrap().clone().total_difficulty } + fn transaction_received(&self, tx: core::Transaction) { unimplemented!(); } + fn block_received(&self, b: core::Block) { - // TODO delegate to a separate thread to avoid holding up the caller debug!("Received block {} from network, going to process.", b.hash()); + // pushing the new block through the chain pipeline let store = self.chain_store.clone(); let chain_adapter = self.chain_adapter.clone(); - let res = chain::process_block(&b, store, chain_adapter, chain::NONE); + let opts = if self.syncer.borrow().syncing() { + chain::SYNC + } else { + chain::NONE + }; + let res = chain::process_block(&b, store, chain_adapter, opts); // log errors and update the shared head reference on success if let Err(e) = res { @@ -53,6 +67,94 @@ impl NetAdapter for NetToChainAdapter { let mut head = chain_head.lock().unwrap(); *head = tip; } + + if self.syncer.borrow().syncing() { + self.syncer.borrow().block_received(b.hash()); + } + } + + fn headers_received(&self, bhs: Vec) { + let opts = if self.syncer.borrow().syncing() { + chain::SYNC + } else { + chain::NONE + }; + + // try to add each header to our header chain + let mut added_hs = vec![]; + for bh in bhs { + let store = self.chain_store.clone(); + let chain_adapter = self.chain_adapter.clone(); + + let res = chain::process_block_header(&bh, store, chain_adapter, opts); + match res { + Ok(_) => { + added_hs.push(bh.hash()); + } + Err(chain::Error::Unfit(_)) => { + info!("Received unfit block header {} at {}.", + bh.hash(), + bh.height); + } + Err(chain::Error::StoreErr(e)) => { + error!("Store error processing block header {}: {:?}", bh.hash(), e); + return; + } + Err(e) => { + info!("Invalid block header {}: {:?}.", bh.hash(), e); + // TODO penalize peer somehow + } + } + } + info!("Added {} headers to the header chain.", added_hs.len()); + + if self.syncer.borrow().syncing() { + self.syncer.borrow().headers_received(added_hs); + } + } + + fn locate_headers(&self, locator: Vec) -> Vec { + if locator.len() == 0 { + return vec![]; + } + + // go through the locator vector and check if we know any of these headers + let known = self.chain_store.get_block_header(&locator[0]); + let header = match known { + Ok(header) => header, + Err(chain::types::Error::NotFoundErr) => { + return self.locate_headers(locator[1..].to_vec()); + } + Err(e) => { + error!("Could not build header locator: {:?}", e); + return vec![]; + } + }; + + // looks like we know one, getting as many following headers as allowed + let hh = header.height; + let mut headers = vec![]; + for h in (hh + 1)..(hh + (p2p::MAX_BLOCK_HEADERS as u64)) { + let header = self.chain_store.get_header_by_height(h); + match header { + Ok(head) => headers.push(head), + Err(chain::types::Error::NotFoundErr) => break, + Err(e) => { + error!("Could not build header locator: {:?}", e); + return vec![]; + } + } + } + headers + } + + fn get_block(&self, h: Hash) -> Option { + let store = self.chain_store.clone(); + let b = store.get_block(&h); + match b { + Ok(b) => Some(b), + _ => None, + } } } @@ -65,8 +167,17 @@ impl NetToChainAdapter { chain_head: chain_head, chain_store: chain_store, chain_adapter: chain_adapter, + syncer: OneTime::new(), } } + + pub fn start_sync(&self, sync: sync::Syncer) { + let arc_sync = Arc::new(sync); + self.syncer.init(arc_sync.clone()); + thread::Builder::new().name("syncer".to_string()).spawn(move || { + arc_sync.run(); + }); + } } /// Implementation of the ChainAdapter for the network. Gets notified when the diff --git a/grin/src/lib.rs b/grin/src/lib.rs index de97889d3..a3b459dc2 100644 --- a/grin/src/lib.rs +++ b/grin/src/lib.rs @@ -39,5 +39,6 @@ extern crate secp256k1zkp as secp; mod adapters; mod miner; mod server; +mod sync; pub use server::{Server, ServerConfig}; diff --git a/grin/src/miner.rs b/grin/src/miner.rs index 52967576c..4e51b2bad 100644 --- a/grin/src/miner.rs +++ b/grin/src/miner.rs @@ -23,8 +23,6 @@ use adapters::ChainToNetAdapter; use core::consensus; use core::core; use core::core::hash::{Hash, Hashed}; -use core::core::target::Difficulty; -use core::pow; use core::pow::cuckoo; use chain; use secp; diff --git a/grin/src/server.rs b/grin/src/server.rs index f0f5b8354..6e32e64c3 100644 --- a/grin/src/server.rs +++ b/grin/src/server.rs @@ -29,6 +29,7 @@ use chain::ChainStore; use core; use miner; use p2p; +use sync; /// Errors than can be reported by a server implementation, mostly wraps /// underlying components errors. @@ -89,9 +90,12 @@ impl Server { let net_adapter = Arc::new(NetToChainAdapter::new(shared_head.clone(), chain_store.clone(), chain_adapter.clone())); - let server = Arc::new(p2p::Server::new(config.p2p_config, net_adapter)); + let server = Arc::new(p2p::Server::new(config.p2p_config, net_adapter.clone())); chain_adapter.init(server.clone()); + let sync = sync::Syncer::new(chain_store.clone(), server.clone()); + net_adapter.start_sync(sync); + let mut evtlp = reactor::Core::new().unwrap(); let handle = evtlp.handle(); evtlp.run(server.start(handle.clone())).unwrap(); @@ -116,9 +120,12 @@ impl Server { let net_adapter = Arc::new(NetToChainAdapter::new(shared_head.clone(), chain_store.clone(), chain_adapter.clone())); - let server = Arc::new(p2p::Server::new(config.p2p_config, net_adapter)); + let server = Arc::new(p2p::Server::new(config.p2p_config, net_adapter.clone())); chain_adapter.init(server.clone()); + let sync = sync::Syncer::new(chain_store.clone(), server.clone()); + net_adapter.start_sync(sync); + evt_handle.spawn(server.start(evt_handle.clone()).map_err(|_| ())); warn!("Grin server started."); @@ -173,7 +180,7 @@ fn store_head(config: &ServerConfig) if config.cuckoo_size > 0 { gen.header.cuckoo_len = config.cuckoo_size; let diff = gen.header.difficulty.clone(); - core::pow::pow(&mut gen, diff).unwrap(); + core::pow::pow(&mut gen.header, diff).unwrap(); } try!(chain_store.save_block(&gen).map_err(&Error::StoreErr)); let tip = chain::types::Tip::new(gen.hash()); diff --git a/grin/src/sync.rs b/grin/src/sync.rs new file mode 100644 index 000000000..6efa49a70 --- /dev/null +++ b/grin/src/sync.rs @@ -0,0 +1,233 @@ +// Copyright 2016 The Grin Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Synchronization of the local blockchain with the rest of the network. Used +//! either on a brand new node or when a node is late based on others' heads. +//! Always starts by downloading the header chain before asking either for full +//! blocks or a full UTXO set with related information. + +/// How many block bodies to download in parallel +const MAX_BODY_DOWNLOADS: usize = 8; + +use std::ops::Deref; +use std::sync::{Arc, Mutex}; +use std::thread; +use std::time::{Instant, Duration}; + +use core::core::hash::{Hash, Hashed}; +use chain; +use p2p; + +pub struct Syncer { + chain_store: Arc, + p2p: Arc, + + sync: Mutex, + last_header_req: Mutex, + blocks_to_download: Mutex>, + blocks_downloading: Mutex>, +} + +impl Syncer { + pub fn new(chain_store: Arc, p2p: Arc) -> Syncer { + Syncer { + chain_store: chain_store, + p2p: p2p, + sync: Mutex::new(true), + last_header_req: Mutex::new(Instant::now() - Duration::from_secs(2)), + blocks_to_download: Mutex::new(vec![]), + blocks_downloading: Mutex::new(vec![]), + } + } + + pub fn syncing(&self) -> bool { + *self.sync.lock().unwrap() + } + + /// Checks the local chain state, comparing it with our peers and triggers + /// syncing if required. + pub fn run(&self) -> Result<(), chain::Error> { + debug!("Starting syncer."); + let start = Instant::now(); + loop { + let pc = self.p2p.peer_count(); + if pc > 3 { + break; + } + if pc > 0 && (Instant::now() - start > Duration::from_secs(15)) { + break; + } + thread::sleep(Duration::from_millis(200)); + } + + // check if we have missing full blocks for which we already have a header + self.init_download()?; + + // main syncing loop, requests more headers and bodies periodically as long + // as a peer with higher difficulty exists and we're not fully caught up + info!("Starting sync loop."); + loop { + let tip = self.chain_store.get_header_head()?; + // TODO do something better (like trying to get more) if we lose peers + let peer = self.p2p.most_work_peer().unwrap(); + + let more_headers = peer.info.total_difficulty > tip.total_difficulty; + let more_bodies = { + let blocks_to_download = self.blocks_to_download.lock().unwrap(); + let blocks_downloading = self.blocks_downloading.lock().unwrap(); + blocks_to_download.len() > 0 || blocks_downloading.len() > 0 + }; + + { + let last_header_req = self.last_header_req.lock().unwrap().clone(); + if more_headers && (Instant::now() - Duration::from_secs(2) > last_header_req) { + self.request_headers()?; + } + } + if more_bodies { + self.request_bodies(); + } + if !more_headers && !more_bodies { + // TODO check we haven't been lied to on the total work + let mut sync = self.sync.lock().unwrap(); + *sync = false; + break; + } + + thread::sleep(Duration::from_secs(2)); + } + info!("Sync done."); + Ok(()) + } + + /// Checks the gap between the header chain and the full block chain and + /// initializes the blocks_to_download structure with the missing full + /// blocks + fn init_download(&self) -> Result<(), chain::Error> { + // compare the header's head to the full one to see what we're missing + let header_head = self.chain_store.get_header_head()?; + let full_head = self.chain_store.head()?; + let mut blocks_to_download = self.blocks_to_download.lock().unwrap(); + + // go back the chain and insert for download all blocks we only have the + // head for + let mut prev_h = header_head.last_block_h; + while prev_h != full_head.last_block_h { + let header = self.chain_store.get_block_header(&prev_h)?; + blocks_to_download.push(header.hash()); + prev_h = header.previous; + } + + debug!("Added {} full block hashes to download.", + blocks_to_download.len()); + Ok(()) + } + + /// Asks for the blocks we haven't downloaded yet and place them in the + /// downloading structure. + fn request_bodies(&self) { + let mut blocks_downloading = self.blocks_downloading.lock().unwrap(); + if blocks_downloading.len() > MAX_BODY_DOWNLOADS { + // clean up potentially dead downloads + let twenty_sec_ago = Instant::now() - Duration::from_secs(20); + blocks_downloading.iter() + .position(|&h| h.1 < twenty_sec_ago) + .map(|n| blocks_downloading.remove(n)); + } else { + // consume hashes from blocks to download, place them in downloading and + // request them from the network + let mut blocks_to_download = self.blocks_to_download.lock().unwrap(); + while blocks_to_download.len() > 0 && blocks_downloading.len() < MAX_BODY_DOWNLOADS { + let h = blocks_to_download.pop().unwrap(); + let peer = self.p2p.random_peer().unwrap(); + peer.send_block_request(h); + blocks_downloading.push((h, Instant::now())); + } + debug!("Requesting more full block hashes to download, total: {}.", + blocks_to_download.len()); + } + } + + /// We added a block, clean up the downloading structure + pub fn block_received(&self, bh: Hash) { + // just clean up the downloading list + let mut bds = self.blocks_downloading.lock().unwrap(); + bds.iter().position(|&h| h.0 == bh).map(|n| bds.remove(n)); + } + + /// Request some block headers from a peer to advance us + fn request_headers(&self) -> Result<(), chain::Error> { + { + let mut last_header_req = self.last_header_req.lock().unwrap(); + *last_header_req = Instant::now(); + } + + let tip = self.chain_store.get_header_head()?; + let peer = self.p2p.most_work_peer(); + let locator = self.get_locator(&tip)?; + if let Some(p) = peer { + debug!("Asking peer {} for more block headers.", p.info.addr); + p.send_header_request(locator)?; + } else { + warn!("Could not get most worked peer to request headers."); + } + Ok(()) + } + + /// We added a header, add it to the full block download list + pub fn headers_received(&self, bhs: Vec) { + let mut blocks_to_download = self.blocks_to_download.lock().unwrap(); + let hs_len = bhs.len(); + for h in bhs { + // enlist for full block download + blocks_to_download.insert(0, h); + } + // ask for more headers if we got as many as required + if hs_len == (p2p::MAX_BLOCK_HEADERS as usize) { + self.request_headers(); + } + } + + /// Builds a vector of block hashes that should help the remote peer sending + /// us the right block headers. + fn get_locator(&self, tip: &chain::Tip) -> Result, chain::Error> { + // Prepare the heights we want as the latests height minus increasing powers + // of 2 up to max. + let mut heights = vec![tip.height]; + let mut tail = (1..p2p::MAX_LOCATORS) + .map(|n| 2u64.pow(n)) + .filter_map(|n| if n > tip.height { + None + } else { + Some(tip.height - n) + }) + .collect::>(); + heights.append(&mut tail); + + // Iteratively travel the header chain back from our head and retain the + // headers at the wanted heights. + let mut header = self.chain_store.get_block_header(&tip.last_block_h)?; + let mut locator = vec![]; + while heights.len() > 0 { + if header.height == heights[0] { + heights = heights[1..].to_vec(); + locator.push(header.hash()); + } + if header.height > 0 { + header = self.chain_store.get_block_header(&header.previous)?; + } + } + Ok(locator) + } +} diff --git a/grin/tests/simulnet.rs b/grin/tests/simulnet.rs index 9f05df628..a5b39f406 100644 --- a/grin/tests/simulnet.rs +++ b/grin/tests/simulnet.rs @@ -30,8 +30,8 @@ use futures::task::park; use tokio_core::reactor; #[test] -fn simulate_servers() { - env_logger::init().unwrap(); +fn simulate_block_propagation() { + env_logger::init(); let mut evtlp = reactor::Core::new().unwrap(); let handle = evtlp.handle(); @@ -41,7 +41,7 @@ fn simulate_servers() { for n in 0..5 { let s = grin::Server::future( grin::ServerConfig{ - db_root: format!("target/grin-{}", n), + db_root: format!("target/grin-prop-{}", n), cuckoo_size: 12, p2p_config: p2p::P2PConfig{port: 10000+n, ..p2p::P2PConfig::default()} }, &handle).unwrap(); @@ -69,6 +69,37 @@ fn simulate_servers() { })); } +#[test] +fn simulate_full_sync() { + env_logger::init(); + + let mut evtlp = reactor::Core::new().unwrap(); + let handle = evtlp.handle(); + + // instantiates 2 servers on different ports + let mut servers = vec![]; + for n in 0..2 { + let s = grin::Server::future( + grin::ServerConfig{ + db_root: format!("target/grin-sync-{}", n), + cuckoo_size: 12, + p2p_config: p2p::P2PConfig{port: 11000+n, ..p2p::P2PConfig::default()} + }, &handle).unwrap(); + servers.push(s); + } + + // mine a few blocks on server 1 + servers[0].start_miner(); + thread::sleep(time::Duration::from_secs(15)); + + // connect 1 and 2 + let addr = format!("{}:{}", "127.0.0.1", 11001); + servers[0].connect_peer(addr.parse().unwrap()).unwrap(); + + // 2 should get blocks + evtlp.run(change(&servers[1])); +} + // Builds the change future, monitoring for a change of head on the provided server fn change<'a>(s: &'a grin::Server) -> HeadChange<'a> { let start_head = s.head(); diff --git a/p2p/src/conn.rs b/p2p/src/conn.rs index 126be5fd5..ddf17a444 100644 --- a/p2p/src/conn.rs +++ b/p2p/src/conn.rs @@ -223,7 +223,7 @@ impl Connection { pub struct TimeoutConnection { underlying: Connection, - expected_responses: Arc>>, + expected_responses: Arc, Instant)>>>, } impl TimeoutConnection { @@ -244,11 +244,15 @@ impl TimeoutConnection { let recv_h = try!(handler.handle(sender, header, data)); let mut expects = exp.lock().unwrap(); + println!("EXP1 {}", expects.len()); let filtered = expects.iter() - .filter(|&&(typ, h, _)| msg_type != typ || recv_h.is_some() && recv_h.unwrap() != h) + .filter(|&&(typ, h, _, _)| { + msg_type != typ || recv_h.is_some() && recv_h.unwrap() != h + }) .map(|&x| x) .collect::>(); *expects = filtered; + println!("EXP2 {}", expects.len()); Ok(recv_h) }); @@ -259,7 +263,7 @@ impl TimeoutConnection { .interval(Duration::new(2, 0)) .fold((), move |_, _| { let exp = exp.lock().unwrap(); - for &(_, _, t) in exp.deref() { + for &(_, _, _, t) in exp.deref() { if Instant::now() - t > Duration::new(2, 0) { return Err(TimerError::TooLong); } @@ -280,15 +284,15 @@ impl TimeoutConnection { pub fn send_request(&self, t: Type, body: &ser::Writeable, - expect_h: Option) + expect_h: Option<(Type, Hash)>) -> Result<(), ser::Error> { let sent = try!(self.underlying.send_msg(t, body)); let mut expects = self.expected_responses.lock().unwrap(); - if let Some(h) = expect_h { - expects.push((t, h, Instant::now())); + if let Some((rt, h)) = expect_h { + expects.push((rt, h, None, Instant::now())); } else { - expects.push((t, ZERO_HASH, Instant::now())); + expects.push((t, ZERO_HASH, None, Instant::now())); } Ok(()) } diff --git a/p2p/src/handshake.rs b/p2p/src/handshake.rs index 884e0ee7e..939ca42b2 100644 --- a/p2p/src/handshake.rs +++ b/p2p/src/handshake.rs @@ -21,6 +21,7 @@ use rand::os::OsRng; use tokio_core::net::TcpStream; use core::ser::Error; +use core::core::target::Difficulty; use msg::*; use types::*; use protocol::ProtocolV1; @@ -46,7 +47,7 @@ impl Handshake { /// Handles connecting to a new remote peer, starting the version handshake. pub fn connect(&self, - height: u64, + total_difficulty: Difficulty, conn: TcpStream) -> Box> { // prepare the first part of the hanshake @@ -55,7 +56,7 @@ impl Handshake { version: PROTOCOL_VERSION, capabilities: FULL_SYNC, nonce: nonce, - height: height, + total_difficulty: total_difficulty, sender_addr: SockAddr(conn.local_addr().unwrap()), receiver_addr: SockAddr(conn.peer_addr().unwrap()), user_agent: USER_AGENT.to_string(), @@ -76,7 +77,7 @@ impl Handshake { user_agent: shake.user_agent, addr: conn.peer_addr().unwrap(), version: shake.version, - height: shake.height, + total_difficulty: shake.total_difficulty, }; info!("Connected to peer {:?}", peer_info); @@ -89,7 +90,7 @@ impl Handshake { /// Handles receiving a connection from a new remote peer that started the /// version handshake. pub fn handshake(&self, - height: u64, + total_difficulty: Difficulty, conn: TcpStream) -> Box> { let nonces = self.nonces.clone(); @@ -117,13 +118,13 @@ impl Handshake { user_agent: hand.user_agent, addr: conn.peer_addr().unwrap(), version: hand.version, - height: hand.height, + total_difficulty: hand.total_difficulty, }; // send our reply with our info let shake = Shake { version: PROTOCOL_VERSION, capabilities: FULL_SYNC, - height: height, + total_difficulty: total_difficulty, user_agent: USER_AGENT.to_string(), }; Ok((conn, shake, peer_info)) diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index 499263269..0ce8dfbb9 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -47,4 +47,4 @@ mod types; pub use server::{Server, DummyAdapter}; pub use peer::Peer; -pub use types::{P2PConfig, NetAdapter}; +pub use types::{P2PConfig, NetAdapter, MAX_LOCATORS, MAX_BLOCK_HEADERS}; diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index 2e42554aa..e982f18fc 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -21,8 +21,11 @@ use futures::future::{Future, ok}; use tokio_core::net::TcpStream; use tokio_core::io::{write_all, read_exact}; -use core::ser::{self, Writeable, Readable, Writer, Reader}; use core::consensus::MAX_MSG_LEN; +use core::core::BlockHeader; +use core::core::hash::Hash; +use core::core::target::Difficulty; +use core::ser::{self, Writeable, Readable, Writer, Reader}; use types::*; @@ -53,6 +56,9 @@ enum_from_primitive! { Pong, GetPeerAddrs, PeerAddrs, + GetHeaders, + Headers, + GetBlock, Block, Transaction, } @@ -177,8 +183,10 @@ pub struct Hand { pub capabilities: Capabilities, /// randomly generated for each handshake, helps detect self pub nonce: u64, - /// current height of the sender, used to check whether sync may be needed - pub height: u64, + /// total difficulty accumulated by the sender, used to check whether sync + /// may + /// be needed + pub total_difficulty: Difficulty, /// network address of the sender pub sender_addr: SockAddr, /// network address of the receiver @@ -192,8 +200,8 @@ impl Writeable for Hand { ser_multiwrite!(writer, [write_u32, self.version], [write_u32, self.capabilities.bits()], - [write_u64, self.nonce], - [write_u64, self.height]); + [write_u64, self.nonce]); + self.total_difficulty.write(writer); self.sender_addr.write(writer); self.receiver_addr.write(writer); writer.write_bytes(&self.user_agent) @@ -202,7 +210,8 @@ impl Writeable for Hand { impl Readable for Hand { fn read(reader: &mut Reader) -> Result { - let (version, capab, nonce, height) = ser_multiread!(reader, read_u32, read_u32, read_u64, read_u64); + let (version, capab, nonce) = ser_multiread!(reader, read_u32, read_u32, read_u64); + let total_diff = try!(Difficulty::read(reader)); let sender_addr = try!(SockAddr::read(reader)); let receiver_addr = try!(SockAddr::read(reader)); let ua = try!(reader.read_vec()); @@ -212,7 +221,7 @@ impl Readable for Hand { version: version, capabilities: capabilities, nonce: nonce, - height: height, + total_difficulty: total_diff, sender_addr: sender_addr, receiver_addr: receiver_addr, user_agent: user_agent, @@ -227,8 +236,10 @@ pub struct Shake { pub version: u32, /// sender capabilities pub capabilities: Capabilities, - /// current height of the sender, used to check whether sync may be needed - pub height: u64, + /// total difficulty accumulated by the sender, used to check whether sync + /// may + /// be needed + pub total_difficulty: Difficulty, /// name of version of the software pub user_agent: String, } @@ -237,22 +248,24 @@ impl Writeable for Shake { fn write(&self, writer: &mut Writer) -> Result<(), ser::Error> { ser_multiwrite!(writer, [write_u32, self.version], - [write_u32, self.capabilities.bits()], - [write_u64, self.height], - [write_bytes, &self.user_agent]); + [write_u32, self.capabilities.bits()]); + self.total_difficulty.write(writer); + writer.write_bytes(&self.user_agent); Ok(()) } } impl Readable for Shake { fn read(reader: &mut Reader) -> Result { - let (version, capab, height, ua) = ser_multiread!(reader, read_u32, read_u32, read_u64, read_vec); + let (version, capab) = ser_multiread!(reader, read_u32, read_u32); + let total_diff = try!(Difficulty::read(reader)); + let ua = try!(reader.read_vec()); let user_agent = try!(String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData)); let capabilities = try!(Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData)); Ok(Shake { version: version, capabilities: capabilities, - height: height, + total_difficulty: total_diff, user_agent: user_agent, }) } @@ -387,6 +400,58 @@ impl Readable for SockAddr { } } +/// Serializable wrapper for the block locator. +pub struct Locator { + pub hashes: Vec, +} + +impl Writeable for Locator { + fn write(&self, writer: &mut Writer) -> Result<(), ser::Error> { + writer.write_u8(self.hashes.len() as u8)?; + for h in &self.hashes { + h.write(writer)? + } + Ok(()) + } +} + +impl Readable for Locator { + fn read(reader: &mut Reader) -> Result { + let len = reader.read_u8()?; + let mut hashes = Vec::with_capacity(len as usize); + for _ in 0..len { + hashes.push(Hash::read(reader)?); + } + Ok(Locator { hashes: hashes }) + } +} + +/// Serializable wrapper for a list of block headers. +pub struct Headers { + pub headers: Vec, +} + +impl Writeable for Headers { + fn write(&self, writer: &mut Writer) -> Result<(), ser::Error> { + writer.write_u16(self.headers.len() as u16)?; + for h in &self.headers { + h.write(writer)? + } + Ok(()) + } +} + +impl Readable for Headers { + fn read(reader: &mut Reader) -> Result { + let len = reader.read_u16()?; + let mut headers = Vec::with_capacity(len as usize); + for _ in 0..len { + headers.push(BlockHeader::read(reader)?); + } + Ok(Headers { headers: headers }) + } +} + /// Placeholder for messages like Ping and Pong that don't send anything but /// the header. pub struct Empty {} diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index ee5811562..741e420bb 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -18,12 +18,14 @@ use futures::Future; use tokio_core::net::TcpStream; use core::core; +use core::core::hash::Hash; +use core::core::target::Difficulty; use core::ser::Error; use handshake::Handshake; use types::*; pub struct Peer { - info: PeerInfo, + pub info: PeerInfo, proto: Box, } @@ -33,10 +35,10 @@ unsafe impl Send for Peer {} impl Peer { /// Initiates the handshake with another peer. pub fn connect(conn: TcpStream, - height: u64, + total_difficulty: Difficulty, hs: &Handshake) -> Box> { - let connect_peer = hs.connect(height, conn).and_then(|(conn, proto, info)| { + let connect_peer = hs.connect(total_difficulty, conn).and_then(|(conn, proto, info)| { Ok((conn, Peer { info: info, @@ -48,10 +50,10 @@ impl Peer { /// Accept a handshake initiated by another peer. pub fn accept(conn: TcpStream, - height: u64, + total_difficulty: Difficulty, hs: &Handshake) -> Box> { - let hs_peer = hs.handshake(height, conn).and_then(|(conn, proto, info)| { + let hs_peer = hs.handshake(total_difficulty, conn).and_then(|(conn, proto, info)| { Ok((conn, Peer { info: info, @@ -68,7 +70,11 @@ impl Peer { na: Arc) -> Box> { - self.proto.handle(conn, na) + let addr = self.info.addr; + Box::new(self.proto.handle(conn, na).and_then(move |_| { + info!("Client {} disconnected.", addr); + Ok(()) + })) } /// Bytes sent and received by this peer to the remote peer. @@ -87,6 +93,15 @@ impl Peer { self.proto.send_block(b) } + pub fn send_header_request(&self, locator: Vec) -> Result<(), Error> { + self.proto.send_header_request(locator) + } + + pub fn send_block_request(&self, h: Hash) -> Result<(), Error> { + debug!("Requesting block {} from peer {}.", h, self.info.addr); + self.proto.send_block_request(h) + } + pub fn stop(&self) { self.proto.close(); } diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index 6d47f86ea..5a722af58 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -81,6 +81,14 @@ impl Protocol for ProtocolV1 { self.send_msg(Type::Transaction, tx) } + fn send_header_request(&self, locator: Vec) -> Result<(), ser::Error> { + self.send_request(Type::GetHeaders, &Locator { hashes: locator }, None) + } + + fn send_block_request(&self, h: Hash) -> Result<(), ser::Error> { + self.send_request(Type::GetBlock, &h, Some((Type::Block, h))) + } + /// Close the connection to the remote peer fn close(&self) { // TODO some kind of shutdown signal @@ -97,15 +105,7 @@ impl ProtocolV1 { body: &ser::Writeable, expect_resp: Option<(Type, Hash)>) -> Result<(), ser::Error> { - let sent = self.send_msg(t, body); - - if let Err(e) = sent { - warn!("Couldn't send message to remote peer: {}", e); - } else if let Some(exp) = expect_resp { - let mut expects = self.expected_responses.lock().unwrap(); - expects.push(exp); - } - Ok(()) + self.conn.borrow().send_request(t, body, expect_resp) } } @@ -116,22 +116,58 @@ fn handle_payload(adapter: &NetAdapter, -> Result, ser::Error> { match header.msg_type { Type::Ping => { - let data = try!(ser::ser_vec(&MsgHeader::new(Type::Pong, 0))); + let data = ser::ser_vec(&MsgHeader::new(Type::Pong, 0))?; sender.send(data); Ok(None) } Type::Pong => Ok(None), Type::Transaction => { - let tx = try!(ser::deserialize::(&mut &buf[..])); + let tx = ser::deserialize::(&mut &buf[..])?; adapter.transaction_received(tx); Ok(None) } + Type::GetBlock => { + let h = ser::deserialize::(&mut &buf[..])?; + let bo = adapter.get_block(h); + if let Some(b) = bo { + // serialize and send the block over + let mut body_data = vec![]; + try!(ser::serialize(&mut body_data, &b)); + let mut data = vec![]; + try!(ser::serialize(&mut data, + &MsgHeader::new(Type::Block, body_data.len() as u64))); + data.append(&mut body_data); + sender.send(data); + } + Ok(None) + } Type::Block => { - let b = try!(ser::deserialize::(&mut &buf[..])); + let b = ser::deserialize::(&mut &buf[..])?; let bh = b.hash(); adapter.block_received(b); Ok(Some(bh)) } + Type::GetHeaders => { + // load headers from the locator + let loc = ser::deserialize::(&mut &buf[..])?; + let headers = adapter.locate_headers(loc.hashes); + + // serialize and send all the headers over + let mut body_data = vec![]; + try!(ser::serialize(&mut body_data, &Headers { headers: headers })); + let mut data = vec![]; + try!(ser::serialize(&mut data, + &MsgHeader::new(Type::Headers, body_data.len() as u64))); + data.append(&mut body_data); + sender.send(data); + + Ok(None) + } + Type::Headers => { + let headers = ser::deserialize::(&mut &buf[..])?; + adapter.headers_received(headers.headers); + Ok(None) + } _ => { debug!("unknown message type {:?}", header.msg_type); Ok(None) diff --git a/p2p/src/server.rs b/p2p/src/server.rs index 8d839524f..ecf31d393 100644 --- a/p2p/src/server.rs +++ b/p2p/src/server.rs @@ -24,10 +24,13 @@ use std::time::Duration; use futures; use futures::{Future, Stream}; use futures::future::IntoFuture; +use rand::{self, Rng}; use tokio_core::net::{TcpListener, TcpStream}; use tokio_core::reactor; use core::core; +use core::core::hash::Hash; +use core::core::target::Difficulty; use core::ser::Error; use handshake::Handshake; use peer::Peer; @@ -36,9 +39,18 @@ use types::*; /// A no-op network adapter used for testing. pub struct DummyAdapter {} impl NetAdapter for DummyAdapter { - fn height(&self) -> u64 { 0 } + fn total_difficulty(&self) -> Difficulty { + Difficulty::one() + } fn transaction_received(&self, tx: core::Transaction) {} fn block_received(&self, b: core::Block) {} + fn headers_received(&self, bh: Vec) {} + fn locate_headers(&self, locator: Vec) -> Vec { + vec![] + } + fn get_block(&self, h: Hash) -> Option { + None + } } /// P2P server implementation, handling bootstrapping to find and connect to @@ -80,11 +92,11 @@ impl Server { let hp = h.clone(); let peers = socket.incoming().map_err(|e| Error::IOErr(e)).map(move |(conn, addr)| { let adapter = adapter.clone(); - let height = adapter.height(); + let total_diff = adapter.total_difficulty(); let peers = peers.clone(); // accept the peer and add it to the server map - let peer_accept = add_to_peers(peers, Peer::accept(conn, height, &hs.clone())); + let peer_accept = add_to_peers(peers, Peer::accept(conn, total_diff, &hs.clone())); // wire in a future to timeout the accept after 5 secs let timed_peer = with_timeout(Box::new(peer_accept), &hp); @@ -132,17 +144,45 @@ impl Server { let socket = TcpStream::connect(&addr, &h).map_err(|e| Error::IOErr(e)); let request = socket.and_then(move |socket| { let peers = peers.clone(); - let height = adapter1.height(); + let total_diff = adapter1.total_difficulty(); // connect to the peer and add it to the server map, wiring it a timeout for // the handhake - let peer_connect = add_to_peers(peers, Peer::connect(socket, height, &Handshake::new())); + let peer_connect = + add_to_peers(peers, Peer::connect(socket, total_diff, &Handshake::new())); with_timeout(Box::new(peer_connect), &h) }) .and_then(move |(socket, peer)| peer.run(socket, adapter2)); Box::new(request) } + /// Returns the peer with the most worked branch, showing the highest total + /// difficulty. + pub fn most_work_peer(&self) -> Option> { + let peers = self.peers.read().unwrap(); + if peers.len() == 0 { + return None; + } + let mut res = peers[0].clone(); + for p in peers.deref() { + if res.info.total_difficulty < p.info.total_difficulty { + res = (*p).clone(); + } + } + Some(res) + } + + /// Returns a random peer we're connected to. + pub fn random_peer(&self) -> Option> { + let peers = self.peers.read().unwrap(); + if peers.len() == 0 { + None + } else { + let idx = rand::thread_rng().gen_range(0, peers.len()); + Some(peers[idx].clone()) + } + } + /// Broadcasts the provided block to all our peers. A peer implementation /// may drop the broadcast request if it knows the remote peer already has /// the block. @@ -156,7 +196,7 @@ impl Server { } /// Number of peers we're currently connected to. - pub fn peers_count(&self) -> u32 { + pub fn peer_count(&self) -> u32 { self.peers.read().unwrap().len() as u32 } diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 2f4c1b266..64457dc8b 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -19,8 +19,19 @@ use futures::Future; use tokio_core::net::TcpStream; use core::core; +use core::core::hash::Hash; +use core::core::target::Difficulty; use core::ser::Error; +/// Maximum number of hashes in a block header locator request +pub const MAX_LOCATORS: u32 = 10; + +/// Maximum number of block headers a peer should ever send +pub const MAX_BLOCK_HEADERS: u32 = 512; + +/// Maximum number of block bodies a peer should ever ask for and send +pub const MAX_BLOCK_BODIES: u32 = 16; + /// Configuration for the peer-to-peer server. #[derive(Debug, Clone, Copy)] pub struct P2PConfig { @@ -56,7 +67,7 @@ pub struct PeerInfo { pub user_agent: String, pub version: u32, pub addr: SocketAddr, - pub height: u64, + pub total_difficulty: Difficulty, } /// A given communication protocol agreed upon between 2 peers (usually @@ -81,6 +92,12 @@ pub trait Protocol { /// Relays a transaction to the remote peer. fn send_transaction(&self, tx: &core::Transaction) -> Result<(), Error>; + /// Sends a request for block headers based on the provided block locator. + fn send_header_request(&self, locator: Vec) -> Result<(), Error>; + + /// Sends a request for a block from its hash. + fn send_block_request(&self, h: Hash) -> Result<(), Error>; + /// How many bytes have been sent/received to/from the remote peer. fn transmitted_bytes(&self) -> (u64, u64); @@ -92,12 +109,25 @@ pub trait Protocol { /// forwarding or querying of blocks and transactions from the network among /// other things. pub trait NetAdapter: Sync + Send { - /// Current height of our chain. - fn height(&self) -> u64; + /// Current height of our chain. + fn total_difficulty(&self) -> Difficulty; /// A valid transaction has been received from one of our peers fn transaction_received(&self, tx: core::Transaction); /// A block has been received from one of our peers fn block_received(&self, b: core::Block); + + /// A set of block header has been received, typically in response to a + /// block + /// header request. + fn headers_received(&self, bh: Vec); + + /// Finds a list of block headers based on the provided locator. Tries to + /// identify the common chain and gets the headers that follow it + /// immediately. + fn locate_headers(&self, locator: Vec) -> Vec; + + /// Gets a full block by its hash. + fn get_block(&self, h: Hash) -> Option; } diff --git a/p2p/tests/peer_handshake.rs b/p2p/tests/peer_handshake.rs index f4ea14c1a..fb2f638ac 100644 --- a/p2p/tests/peer_handshake.rs +++ b/p2p/tests/peer_handshake.rs @@ -27,6 +27,7 @@ use tokio_core::net::TcpStream; use tokio_core::reactor::{self, Core}; use core::ser; +use core::core::target::Difficulty; use p2p::Peer; // Starts a server and connects a client peer to it to check handshake, followed by a ping/pong exchange to make sure the connection is live. @@ -50,7 +51,7 @@ fn peer_handshake() { let addr = SocketAddr::new(p2p_conf.host, p2p_conf.port); let socket = TcpStream::connect(&addr, &phandle).map_err(|e| ser::Error::IOErr(e)); socket.and_then(move |socket| { - Peer::connect(socket, 0, &p2p::handshake::Handshake::new()) + Peer::connect(socket, Difficulty::one(), &p2p::handshake::Handshake::new()) }).and_then(move |(socket, peer)| { rhandle.spawn(peer.run(socket, net_adapter.clone()).map_err(|e| { panic!("Client run failed: {}", e); @@ -63,7 +64,7 @@ fn peer_handshake() { assert!(recv > 0); Ok(()) }).and_then(|_| { - assert!(server.peers_count() > 0); + assert!(server.peer_count() > 0); server.stop(); Ok(()) })