Block sync hash traversal perf (#3558)

* sync traversal performance improvements

* rework how body_sync determines block hashes to request
This commit is contained in:
Antioch Peverell 2021-02-15 13:47:59 +00:00 committed by GitHub
parent a3aed4aae5
commit 7649d361e4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 54 additions and 127 deletions

View file

@ -15,7 +15,7 @@
//! Facade and handler for the rest of the blockchain implementation //! Facade and handler for the rest of the blockchain implementation
//! and mostly the chain pipeline. //! and mostly the chain pipeline.
use crate::core::core::hash::{Hash, Hashed, ZERO_HASH}; use crate::core::core::hash::{Hash, Hashed};
use crate::core::core::merkle_proof::MerkleProof; use crate::core::core::merkle_proof::MerkleProof;
use crate::core::core::verifier_cache::VerifierCache; use crate::core::core::verifier_cache::VerifierCache;
use crate::core::core::{ use crate::core::core::{
@ -959,100 +959,26 @@ impl Chain {
Ok(()) Ok(())
} }
/// Check chain status whether a txhashset downloading is needed /// Finds the "fork point" where header chain diverges from full block chain.
pub fn check_txhashset_needed( /// If we are syncing this will correspond to the last full block where
&self, /// the next header is known but we do not yet have the full block.
caller: String, /// i.e. This is the last known full block and all subsequent blocks are missing.
hashes: &mut Option<Vec<Hash>>, pub fn fork_point(&self) -> Result<BlockHeader, Error> {
) -> Result<bool, Error> {
let horizon = global::cut_through_horizon() as u64;
let body_head = self.head()?; let body_head = self.head()?;
let mut current = self.get_block_header(&body_head.hash())?;
while !self.is_on_current_chain(&current).is_ok() {
current = self.get_previous_header(&current)?;
}
Ok(current)
}
/// Compare fork point to our horizon.
/// If beyond the horizon then we cannot sync via recent full blocks
/// and we need a state (txhashset) sync.
pub fn check_txhashset_needed(&self, fork_point: &BlockHeader) -> Result<bool, Error> {
let header_head = self.header_head()?; let header_head = self.header_head()?;
let sync_head = self.get_sync_head()?; let horizon = global::cut_through_horizon() as u64;
Ok(fork_point.height < header_head.height.saturating_sub(horizon))
debug!(
"{}: body_head - {}, {}, header_head - {}, {}, sync_head - {}, {}",
caller,
body_head.last_block_h,
body_head.height,
header_head.last_block_h,
header_head.height,
sync_head.last_block_h,
sync_head.height,
);
if body_head.total_difficulty >= header_head.total_difficulty {
debug!(
"{}: no need txhashset. header_head.total_difficulty: {} <= body_head.total_difficulty: {}",
caller, header_head.total_difficulty, body_head.total_difficulty,
);
return Ok(false);
}
let mut oldest_height = 0;
let mut oldest_hash = ZERO_HASH;
// Start with body_head (head of the full block chain)
let mut current = self.get_block_header(&body_head.last_block_h);
if current.is_err() {
error!(
"{}: body_head not found in chain db: {} at {}",
caller, body_head.last_block_h, body_head.height,
);
return Ok(false);
}
//
// TODO - Investigate finding the "common header" by comparing header_mmr and
// sync_mmr (bytes will be identical up to the common header).
//
// Traverse back through the full block chain from body head until we find a header
// that "is on current chain", which is the "fork point" between existing header chain
// and full block chain.
while let Ok(header) = current {
// break out of the while loop when we find a header common
// between the header chain and the current body chain
if self.is_on_current_chain(&header).is_ok() {
oldest_height = header.height;
oldest_hash = header.hash();
break;
}
current = self.get_previous_header(&header);
}
// Traverse back through the header chain from header_head back to this fork point.
// These are the blocks that we need to request in body sync (we have the header but not the full block).
if let Some(hs) = hashes {
let mut h = self.get_block_header(&header_head.last_block_h);
while let Ok(header) = h {
if header.height <= oldest_height {
break;
}
hs.push(header.hash());
h = self.get_previous_header(&header);
}
}
if oldest_height < header_head.height.saturating_sub(horizon) {
if oldest_hash != ZERO_HASH {
// this is the normal case. for example:
// body head height is 1 (and not a fork), oldest_height will be 1
// body head height is 0 (a typical fresh node), oldest_height will be 0
// body head height is 10,001 (but at a fork with depth 1), oldest_height will be 10,000
// body head height is 10,005 (but at a fork with depth 5), oldest_height will be 10,000
debug!(
"{}: need a state sync for txhashset. oldest block which is not on local chain: {} at {}",
caller, oldest_hash, oldest_height,
);
} else {
// this is the abnormal case, when is_on_current_chain() always return Err, and even for genesis block.
error!("{}: corrupted storage? state sync is needed", caller);
}
Ok(true)
} else {
Ok(false)
}
} }
/// Clean the temporary sandbox folder /// Clean the temporary sandbox folder
@ -1104,8 +1030,8 @@ impl Chain {
status.on_setup(); status.on_setup();
// Initial check whether this txhashset is needed or not // Initial check whether this txhashset is needed or not
let mut hashes: Option<Vec<Hash>> = None; let fork_point = self.fork_point()?;
if !self.check_txhashset_needed("txhashset_write".to_owned(), &mut hashes)? { if !self.check_txhashset_needed(&fork_point)? {
warn!("txhashset_write: txhashset received but it's not needed! ignored."); warn!("txhashset_write: txhashset received but it's not needed! ignored.");
return Err(ErrorKind::InvalidTxHashSet("not needed".to_owned()).into()); return Err(ErrorKind::InvalidTxHashSet("not needed".to_owned()).into());
} }

View file

@ -18,8 +18,9 @@ use rand::prelude::*;
use std::cmp; use std::cmp;
use std::sync::Arc; use std::sync::Arc;
use crate::chain::{self, SyncState, SyncStatus}; use crate::chain::{self, SyncState, SyncStatus, Tip};
use crate::core::core::hash::Hash; use crate::core::core::hash::{Hash, Hashed};
use crate::core::core::BlockHeader;
use crate::p2p; use crate::p2p;
pub struct BodySync { pub struct BodySync {
@ -71,27 +72,19 @@ impl BodySync {
} }
/// Return true if txhashset download is needed (when requested block is under the horizon). /// Return true if txhashset download is needed (when requested block is under the horizon).
/// Otherwise go request some missing blocks and return false.
fn body_sync(&mut self) -> Result<bool, chain::Error> { fn body_sync(&mut self) -> Result<bool, chain::Error> {
let mut hashes: Option<Vec<Hash>> = Some(vec![]); let head = self.chain.head()?;
let txhashset_needed = self let header_head = self.chain.header_head()?;
.chain let fork_point = self.chain.fork_point()?;
.check_txhashset_needed("body_sync".to_owned(), &mut hashes)?;
if txhashset_needed { if self.chain.check_txhashset_needed(&fork_point)? {
debug!( debug!(
"body_sync: cannot sync full blocks earlier than horizon. will request txhashset", "body_sync: cannot sync full blocks earlier than horizon. will request txhashset",
); );
return Ok(true); return Ok(true);
} }
let mut hashes = hashes.ok_or_else(|| {
chain::ErrorKind::SyncError("Got no hashes for body sync".to_string())
})?;
hashes.reverse();
let head = self.chain.head()?;
// Find connected peers with strictly greater difficulty than us. // Find connected peers with strictly greater difficulty than us.
let peers_iter = || { let peers_iter = || {
self.peers self.peers
@ -121,25 +114,14 @@ impl BodySync {
chain::MAX_ORPHAN_SIZE.saturating_sub(self.chain.orphans_len()) + 1, chain::MAX_ORPHAN_SIZE.saturating_sub(self.chain.orphans_len()) + 1,
); );
let hashes_to_get = hashes let hashes = self.block_hashes_to_sync(&fork_point, &header_head, block_count as u64)?;
.iter()
.filter(|x| {
// only ask for blocks that we have not yet processed
// either successfully stored or in our orphan list
self.chain.get_block(x).is_err() && !self.chain.is_orphan(x)
})
.take(block_count)
.collect::<Vec<_>>();
if !hashes_to_get.is_empty() {
let body_head = self.chain.head()?;
let header_head = self.chain.header_head()?;
if !hashes.is_empty() {
debug!( debug!(
"block_sync: {}/{} requesting blocks {:?} from {} peers", "block_sync: {}/{} requesting blocks {:?} from {} peers",
body_head.height, head.height,
header_head.height, header_head.height,
hashes_to_get, hashes,
peers.len(), peers.len(),
); );
@ -148,9 +130,9 @@ impl BodySync {
self.receive_timeout = Utc::now() + Duration::seconds(6); self.receive_timeout = Utc::now() + Duration::seconds(6);
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
for hash in hashes_to_get.clone() { for hash in hashes {
if let Some(peer) = peers.choose(&mut rng) { if let Some(peer) = peers.choose(&mut rng) {
if let Err(e) = peer.send_block_request(*hash, chain::Options::SYNC) { if let Err(e) = peer.send_block_request(hash, chain::Options::SYNC) {
debug!("Skipped request to {}: {:?}", peer.info.addr, e); debug!("Skipped request to {}: {:?}", peer.info.addr, e);
peer.stop(); peer.stop();
} else { } else {
@ -162,6 +144,25 @@ impl BodySync {
return Ok(false); return Ok(false);
} }
fn block_hashes_to_sync(
&self,
fork_point: &BlockHeader,
header_head: &Tip,
count: u64,
) -> Result<Vec<Hash>, chain::Error> {
let mut hashes = vec![];
let max_height = cmp::min(fork_point.height + count, header_head.height);
let mut current = self.chain.get_header_by_height(max_height)?;
while current.height > fork_point.height {
if !self.chain.is_orphan(&current.hash()) {
hashes.push(current.hash());
}
current = self.chain.get_previous_header(&current)?;
}
hashes.reverse();
Ok(hashes)
}
// Should we run block body sync and ask for more full blocks? // Should we run block body sync and ask for more full blocks?
fn body_sync_due(&mut self) -> Result<bool, chain::Error> { fn body_sync_due(&mut self) -> Result<bool, chain::Error> {
let blocks_received = self.blocks_received()?; let blocks_received = self.blocks_received()?;