From 7649d361e4c27d84789df2a8fff2ed362c7ef420 Mon Sep 17 00:00:00 2001 From: Antioch Peverell Date: Mon, 15 Feb 2021 13:47:59 +0000 Subject: [PATCH] Block sync hash traversal perf (#3558) * sync traversal performance improvements * rework how body_sync determines block hashes to request --- chain/src/chain.rs | 116 ++++++----------------------- servers/src/grin/sync/body_sync.rs | 65 ++++++++-------- 2 files changed, 54 insertions(+), 127 deletions(-) diff --git a/chain/src/chain.rs b/chain/src/chain.rs index e6e89d74e..9d4993388 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -15,7 +15,7 @@ //! Facade and handler for the rest of the blockchain implementation //! and mostly the chain pipeline. -use crate::core::core::hash::{Hash, Hashed, ZERO_HASH}; +use crate::core::core::hash::{Hash, Hashed}; use crate::core::core::merkle_proof::MerkleProof; use crate::core::core::verifier_cache::VerifierCache; use crate::core::core::{ @@ -959,100 +959,26 @@ impl Chain { Ok(()) } - /// Check chain status whether a txhashset downloading is needed - pub fn check_txhashset_needed( - &self, - caller: String, - hashes: &mut Option>, - ) -> Result { - let horizon = global::cut_through_horizon() as u64; + /// Finds the "fork point" where header chain diverges from full block chain. + /// If we are syncing this will correspond to the last full block where + /// the next header is known but we do not yet have the full block. + /// i.e. This is the last known full block and all subsequent blocks are missing. + pub fn fork_point(&self) -> Result { let body_head = self.head()?; + let mut current = self.get_block_header(&body_head.hash())?; + while !self.is_on_current_chain(¤t).is_ok() { + current = self.get_previous_header(¤t)?; + } + Ok(current) + } + + /// Compare fork point to our horizon. + /// If beyond the horizon then we cannot sync via recent full blocks + /// and we need a state (txhashset) sync. + pub fn check_txhashset_needed(&self, fork_point: &BlockHeader) -> Result { let header_head = self.header_head()?; - let sync_head = self.get_sync_head()?; - - debug!( - "{}: body_head - {}, {}, header_head - {}, {}, sync_head - {}, {}", - caller, - body_head.last_block_h, - body_head.height, - header_head.last_block_h, - header_head.height, - sync_head.last_block_h, - sync_head.height, - ); - - if body_head.total_difficulty >= header_head.total_difficulty { - debug!( - "{}: no need txhashset. header_head.total_difficulty: {} <= body_head.total_difficulty: {}", - caller, header_head.total_difficulty, body_head.total_difficulty, - ); - return Ok(false); - } - - let mut oldest_height = 0; - let mut oldest_hash = ZERO_HASH; - - // Start with body_head (head of the full block chain) - let mut current = self.get_block_header(&body_head.last_block_h); - if current.is_err() { - error!( - "{}: body_head not found in chain db: {} at {}", - caller, body_head.last_block_h, body_head.height, - ); - return Ok(false); - } - - // - // TODO - Investigate finding the "common header" by comparing header_mmr and - // sync_mmr (bytes will be identical up to the common header). - // - // Traverse back through the full block chain from body head until we find a header - // that "is on current chain", which is the "fork point" between existing header chain - // and full block chain. - while let Ok(header) = current { - // break out of the while loop when we find a header common - // between the header chain and the current body chain - if self.is_on_current_chain(&header).is_ok() { - oldest_height = header.height; - oldest_hash = header.hash(); - break; - } - - current = self.get_previous_header(&header); - } - - // Traverse back through the header chain from header_head back to this fork point. - // These are the blocks that we need to request in body sync (we have the header but not the full block). - if let Some(hs) = hashes { - let mut h = self.get_block_header(&header_head.last_block_h); - while let Ok(header) = h { - if header.height <= oldest_height { - break; - } - hs.push(header.hash()); - h = self.get_previous_header(&header); - } - } - - if oldest_height < header_head.height.saturating_sub(horizon) { - if oldest_hash != ZERO_HASH { - // this is the normal case. for example: - // body head height is 1 (and not a fork), oldest_height will be 1 - // body head height is 0 (a typical fresh node), oldest_height will be 0 - // body head height is 10,001 (but at a fork with depth 1), oldest_height will be 10,000 - // body head height is 10,005 (but at a fork with depth 5), oldest_height will be 10,000 - debug!( - "{}: need a state sync for txhashset. oldest block which is not on local chain: {} at {}", - caller, oldest_hash, oldest_height, - ); - } else { - // this is the abnormal case, when is_on_current_chain() always return Err, and even for genesis block. - error!("{}: corrupted storage? state sync is needed", caller); - } - Ok(true) - } else { - Ok(false) - } + let horizon = global::cut_through_horizon() as u64; + Ok(fork_point.height < header_head.height.saturating_sub(horizon)) } /// Clean the temporary sandbox folder @@ -1104,8 +1030,8 @@ impl Chain { status.on_setup(); // Initial check whether this txhashset is needed or not - let mut hashes: Option> = None; - if !self.check_txhashset_needed("txhashset_write".to_owned(), &mut hashes)? { + let fork_point = self.fork_point()?; + if !self.check_txhashset_needed(&fork_point)? { warn!("txhashset_write: txhashset received but it's not needed! ignored."); return Err(ErrorKind::InvalidTxHashSet("not needed".to_owned()).into()); } diff --git a/servers/src/grin/sync/body_sync.rs b/servers/src/grin/sync/body_sync.rs index 6a048c5a0..e78441f78 100644 --- a/servers/src/grin/sync/body_sync.rs +++ b/servers/src/grin/sync/body_sync.rs @@ -18,8 +18,9 @@ use rand::prelude::*; use std::cmp; use std::sync::Arc; -use crate::chain::{self, SyncState, SyncStatus}; -use crate::core::core::hash::Hash; +use crate::chain::{self, SyncState, SyncStatus, Tip}; +use crate::core::core::hash::{Hash, Hashed}; +use crate::core::core::BlockHeader; use crate::p2p; pub struct BodySync { @@ -71,27 +72,19 @@ impl BodySync { } /// Return true if txhashset download is needed (when requested block is under the horizon). + /// Otherwise go request some missing blocks and return false. fn body_sync(&mut self) -> Result { - let mut hashes: Option> = Some(vec![]); - let txhashset_needed = self - .chain - .check_txhashset_needed("body_sync".to_owned(), &mut hashes)?; + let head = self.chain.head()?; + let header_head = self.chain.header_head()?; + let fork_point = self.chain.fork_point()?; - if txhashset_needed { + if self.chain.check_txhashset_needed(&fork_point)? { debug!( "body_sync: cannot sync full blocks earlier than horizon. will request txhashset", ); return Ok(true); } - let mut hashes = hashes.ok_or_else(|| { - chain::ErrorKind::SyncError("Got no hashes for body sync".to_string()) - })?; - - hashes.reverse(); - - let head = self.chain.head()?; - // Find connected peers with strictly greater difficulty than us. let peers_iter = || { self.peers @@ -121,25 +114,14 @@ impl BodySync { chain::MAX_ORPHAN_SIZE.saturating_sub(self.chain.orphans_len()) + 1, ); - let hashes_to_get = hashes - .iter() - .filter(|x| { - // only ask for blocks that we have not yet processed - // either successfully stored or in our orphan list - self.chain.get_block(x).is_err() && !self.chain.is_orphan(x) - }) - .take(block_count) - .collect::>(); - - if !hashes_to_get.is_empty() { - let body_head = self.chain.head()?; - let header_head = self.chain.header_head()?; + let hashes = self.block_hashes_to_sync(&fork_point, &header_head, block_count as u64)?; + if !hashes.is_empty() { debug!( "block_sync: {}/{} requesting blocks {:?} from {} peers", - body_head.height, + head.height, header_head.height, - hashes_to_get, + hashes, peers.len(), ); @@ -148,9 +130,9 @@ impl BodySync { self.receive_timeout = Utc::now() + Duration::seconds(6); let mut rng = rand::thread_rng(); - for hash in hashes_to_get.clone() { + for hash in hashes { if let Some(peer) = peers.choose(&mut rng) { - if let Err(e) = peer.send_block_request(*hash, chain::Options::SYNC) { + if let Err(e) = peer.send_block_request(hash, chain::Options::SYNC) { debug!("Skipped request to {}: {:?}", peer.info.addr, e); peer.stop(); } else { @@ -162,6 +144,25 @@ impl BodySync { return Ok(false); } + fn block_hashes_to_sync( + &self, + fork_point: &BlockHeader, + header_head: &Tip, + count: u64, + ) -> Result, chain::Error> { + let mut hashes = vec![]; + let max_height = cmp::min(fork_point.height + count, header_head.height); + let mut current = self.chain.get_header_by_height(max_height)?; + while current.height > fork_point.height { + if !self.chain.is_orphan(¤t.hash()) { + hashes.push(current.hash()); + } + current = self.chain.get_previous_header(¤t)?; + } + hashes.reverse(); + Ok(hashes) + } + // Should we run block body sync and ask for more full blocks? fn body_sync_due(&mut self) -> Result { let blocks_received = self.blocks_received()?;