From b9de13420926b5d0bf1881869560df17a4e08fe3 Mon Sep 17 00:00:00 2001 From: Quentin Le Sceller Date: Thu, 5 Apr 2018 23:06:34 -0400 Subject: [PATCH] Full sync ask blocks only from other archival nodes (#930) * Request body_sync only from archival node * Move verification to p2p crate * Additionnal check on capabilities * Add check block 1 --- grin/src/server.rs | 27 ++++++++++++++++++------- grin/src/sync.rs | 2 +- p2p/src/peers.rs | 40 ++++++++++++++++++++++++++++++++++++- p2p/src/serv.rs | 25 ++++++++++++++++++++++- p2p/tests/peer_handshake.rs | 2 ++ 5 files changed, 86 insertions(+), 10 deletions(-) diff --git a/grin/src/server.rs b/grin/src/server.rs index 33faa5561..26aec924d 100644 --- a/grin/src/server.rs +++ b/grin/src/server.rs @@ -27,6 +27,7 @@ use api; use chain; use core::{consensus, genesis, global}; use core::core::target::Difficulty; +use core::core::hash::Hashed; use dandelion_monitor; use miner; use p2p; @@ -85,6 +86,18 @@ impl Server { /// Instantiates a new server associated with the provided future reactor. pub fn new(mut config: ServerConfig) -> Result { + // Defaults to None (optional) in config file. + // This translates to false here. + let archive_mode = match config.archive_mode { + None => false, + Some(b) => b, + }; + + // If archive mode is enabled then the flags should contains the FULL_HIST flag + if archive_mode && !config.capabilities.contains(p2p::Capabilities::FULL_HIST) { + config.capabilities.insert(p2p::Capabilities::FULL_HIST); + } + let stop = Arc::new(AtomicBool::new(false)); let pool_adapter = Arc::new(PoolToChainAdapter::new()); @@ -124,6 +137,11 @@ impl Server { config.clone(), )); + let block_1_hash = match shared_chain.get_header_by_height(1) { + Ok(header) => Some(header.hash()), + Err(_) => None, + }; + let p2p_config = config.p2p_config.clone(); let p2p_server = Arc::new(p2p::Server::new( config.db_root.clone(), @@ -132,6 +150,8 @@ impl Server { net_adapter.clone(), genesis.hash(), stop.clone(), + archive_mode, + block_1_hash, )?); chain_adapter.init(Arc::downgrade(&p2p_server.peers)); pool_net_adapter.init(Arc::downgrade(&p2p_server.peers)); @@ -158,13 +178,6 @@ impl Server { ); } - // Defaults to None (optional) in config file. - // This translates to false here. - let archive_mode = match config.archive_mode { - None => false, - Some(b) => b, - }; - // Defaults to None (optional) in config file. // This translates to false here so we do not skip by default. let skip_sync_wait = match config.skip_sync_wait { diff --git a/grin/src/sync.rs b/grin/src/sync.rs index 77acb2cec..3e11280e7 100644 --- a/grin/src/sync.rs +++ b/grin/src/sync.rs @@ -179,7 +179,7 @@ fn body_sync(peers: Arc, chain: Arc) { for hash in hashes_to_get.clone() { // TODO - Is there a threshold where we sync from most_work_peer (not // more_work_peer)? - let peer = peers.more_work_peer(); + let peer = peers.more_work_archival_peer(); if let Some(peer) = peer { if let Ok(peer) = peer.try_read() { if let Err(e) = peer.send_block_request(hash) { diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index b17fc48af..de2ce5e1a 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -163,6 +163,32 @@ impl Peers { max_peers } + // Return vec of connected peers that currently advertise more work + // (total_difficulty) than we do and are also full archival nodes. + pub fn more_work_archival_peers(&self) -> Vec>> { + let peers = self.connected_peers(); + if peers.len() == 0 { + return vec![]; + } + + let total_difficulty = self.total_difficulty(); + + let mut max_peers = peers + .iter() + .filter(|x| match x.try_read() { + Ok(peer) => { + peer.info.total_difficulty > total_difficulty + && peer.info.capabilities.contains(Capabilities::FULL_HIST) + } + Err(_) => false, + }) + .cloned() + .collect::>(); + + thread_rng().shuffle(&mut max_peers); + max_peers + } + /// Returns single random peer with more work than us. pub fn more_work_peer(&self) -> Option>> { match self.more_work_peers().first() { @@ -171,6 +197,14 @@ impl Peers { } } + /// Returns single random archival peer with more work than us. + pub fn more_work_archival_peer(&self) -> Option>> { + match self.more_work_archival_peers().first() { + Some(x) => Some(x.clone()), + None => None, + } + } + /// Return vec of connected peers that currently have the most worked branch, /// showing the highest total difficulty. pub fn most_work_peers(&self) -> Vec>> { @@ -594,7 +628,11 @@ impl NetAdapter for Peers { /// addresses. fn find_peer_addrs(&self, capab: Capabilities) -> Vec { let peers = self.find_peers(State::Healthy, capab, MAX_PEER_ADDRS as usize); - trace!(LOGGER, "find_peer_addrs: {} healthy peers picked", peers.len()); + trace!( + LOGGER, + "find_peer_addrs: {} healthy peers picked", + peers.len() + ); map_vec!(peers, |p| p.addr) } diff --git a/p2p/src/serv.rs b/p2p/src/serv.rs index 73348d839..59bacf57d 100644 --- a/p2p/src/serv.rs +++ b/p2p/src/serv.rs @@ -48,12 +48,35 @@ impl Server { /// Creates a new idle p2p server with no peers pub fn new( db_root: String, - capab: Capabilities, + mut capab: Capabilities, config: P2PConfig, adapter: Arc, genesis: Hash, stop: Arc, + archive_mode: bool, + block_1_hash: Option, ) -> Result { + // In the case of an archive node, check that we do have the first block. + // In case of first sync we do not perform this check. + if archive_mode && adapter.total_height() > 0 { + // Check that we have block 1 + match block_1_hash { + Some(hash) => match adapter.get_block(hash) { + Some(_) => debug!(LOGGER, "Full block 1 found, archive capabilities confirmed"), + None => { + debug!( + LOGGER, + "Full block 1 not found, archive capabilities disabled" + ); + capab.remove(Capabilities::FULL_HIST); + } + }, + None => { + debug!(LOGGER, "Block 1 not found, archive capabilities disabled"); + capab.remove(Capabilities::FULL_HIST); + } + } + } Ok(Server { config: config.clone(), capabilities: capab, diff --git a/p2p/tests/peer_handshake.rs b/p2p/tests/peer_handshake.rs index 770cc8bae..5a8d576ec 100644 --- a/p2p/tests/peer_handshake.rs +++ b/p2p/tests/peer_handshake.rs @@ -56,6 +56,8 @@ fn peer_handshake() { net_adapter.clone(), Hash::from_vec(vec![]), Arc::new(AtomicBool::new(false)), + false, + None, ).unwrap(), );