Full sync ask blocks only from other archival nodes (#930)

* Request body_sync only from archival node
* Move verification to p2p crate
* Additionnal check on capabilities
* Add check block 1
This commit is contained in:
Quentin Le Sceller 2018-04-05 23:06:34 -04:00 committed by Ignotus Peverell
parent 98efaf88df
commit b9de134209
5 changed files with 86 additions and 10 deletions

View file

@ -27,6 +27,7 @@ use api;
use chain;
use core::{consensus, genesis, global};
use core::core::target::Difficulty;
use core::core::hash::Hashed;
use dandelion_monitor;
use miner;
use p2p;
@ -85,6 +86,18 @@ impl Server {
/// Instantiates a new server associated with the provided future reactor.
pub fn new(mut config: ServerConfig) -> Result<Server, Error> {
// Defaults to None (optional) in config file.
// This translates to false here.
let archive_mode = match config.archive_mode {
None => false,
Some(b) => b,
};
// If archive mode is enabled then the flags should contains the FULL_HIST flag
if archive_mode && !config.capabilities.contains(p2p::Capabilities::FULL_HIST) {
config.capabilities.insert(p2p::Capabilities::FULL_HIST);
}
let stop = Arc::new(AtomicBool::new(false));
let pool_adapter = Arc::new(PoolToChainAdapter::new());
@ -124,6 +137,11 @@ impl Server {
config.clone(),
));
let block_1_hash = match shared_chain.get_header_by_height(1) {
Ok(header) => Some(header.hash()),
Err(_) => None,
};
let p2p_config = config.p2p_config.clone();
let p2p_server = Arc::new(p2p::Server::new(
config.db_root.clone(),
@ -132,6 +150,8 @@ impl Server {
net_adapter.clone(),
genesis.hash(),
stop.clone(),
archive_mode,
block_1_hash,
)?);
chain_adapter.init(Arc::downgrade(&p2p_server.peers));
pool_net_adapter.init(Arc::downgrade(&p2p_server.peers));
@ -158,13 +178,6 @@ impl Server {
);
}
// Defaults to None (optional) in config file.
// This translates to false here.
let archive_mode = match config.archive_mode {
None => false,
Some(b) => b,
};
// Defaults to None (optional) in config file.
// This translates to false here so we do not skip by default.
let skip_sync_wait = match config.skip_sync_wait {

View file

@ -179,7 +179,7 @@ fn body_sync(peers: Arc<Peers>, chain: Arc<chain::Chain>) {
for hash in hashes_to_get.clone() {
// TODO - Is there a threshold where we sync from most_work_peer (not
// more_work_peer)?
let peer = peers.more_work_peer();
let peer = peers.more_work_archival_peer();
if let Some(peer) = peer {
if let Ok(peer) = peer.try_read() {
if let Err(e) = peer.send_block_request(hash) {

View file

@ -163,6 +163,32 @@ impl Peers {
max_peers
}
// Return vec of connected peers that currently advertise more work
// (total_difficulty) than we do and are also full archival nodes.
pub fn more_work_archival_peers(&self) -> Vec<Arc<RwLock<Peer>>> {
let peers = self.connected_peers();
if peers.len() == 0 {
return vec![];
}
let total_difficulty = self.total_difficulty();
let mut max_peers = peers
.iter()
.filter(|x| match x.try_read() {
Ok(peer) => {
peer.info.total_difficulty > total_difficulty
&& peer.info.capabilities.contains(Capabilities::FULL_HIST)
}
Err(_) => false,
})
.cloned()
.collect::<Vec<_>>();
thread_rng().shuffle(&mut max_peers);
max_peers
}
/// Returns single random peer with more work than us.
pub fn more_work_peer(&self) -> Option<Arc<RwLock<Peer>>> {
match self.more_work_peers().first() {
@ -171,6 +197,14 @@ impl Peers {
}
}
/// Returns single random archival peer with more work than us.
pub fn more_work_archival_peer(&self) -> Option<Arc<RwLock<Peer>>> {
match self.more_work_archival_peers().first() {
Some(x) => Some(x.clone()),
None => None,
}
}
/// Return vec of connected peers that currently have the most worked branch,
/// showing the highest total difficulty.
pub fn most_work_peers(&self) -> Vec<Arc<RwLock<Peer>>> {
@ -594,7 +628,11 @@ impl NetAdapter for Peers {
/// addresses.
fn find_peer_addrs(&self, capab: Capabilities) -> Vec<SocketAddr> {
let peers = self.find_peers(State::Healthy, capab, MAX_PEER_ADDRS as usize);
trace!(LOGGER, "find_peer_addrs: {} healthy peers picked", peers.len());
trace!(
LOGGER,
"find_peer_addrs: {} healthy peers picked",
peers.len()
);
map_vec!(peers, |p| p.addr)
}

View file

@ -48,12 +48,35 @@ impl Server {
/// Creates a new idle p2p server with no peers
pub fn new(
db_root: String,
capab: Capabilities,
mut capab: Capabilities,
config: P2PConfig,
adapter: Arc<ChainAdapter>,
genesis: Hash,
stop: Arc<AtomicBool>,
archive_mode: bool,
block_1_hash: Option<Hash>,
) -> Result<Server, Error> {
// In the case of an archive node, check that we do have the first block.
// In case of first sync we do not perform this check.
if archive_mode && adapter.total_height() > 0 {
// Check that we have block 1
match block_1_hash {
Some(hash) => match adapter.get_block(hash) {
Some(_) => debug!(LOGGER, "Full block 1 found, archive capabilities confirmed"),
None => {
debug!(
LOGGER,
"Full block 1 not found, archive capabilities disabled"
);
capab.remove(Capabilities::FULL_HIST);
}
},
None => {
debug!(LOGGER, "Block 1 not found, archive capabilities disabled");
capab.remove(Capabilities::FULL_HIST);
}
}
}
Ok(Server {
config: config.clone(),
capabilities: capab,

View file

@ -56,6 +56,8 @@ fn peer_handshake() {
net_adapter.clone(),
Hash::from_vec(vec![]),
Arc::new(AtomicBool::new(false)),
false,
None,
).unwrap(),
);