From 9ddb1489a56cc405fe8982a18db2511929ac3bb8 Mon Sep 17 00:00:00 2001 From: AntiochP <30642645+antiochp@users.noreply.github.com> Date: Thu, 21 Dec 2017 17:29:24 -0500 Subject: [PATCH] Smarter orphan blocks (#537) * wip * rework check_orphans to be smart about _which_ orphan(s) to check * cleanup * limit max of 100 blocks at a time, and corresponding 100 max orphan blocks --- chain/src/chain.rs | 149 ++++++++++++++++++++++++++++++++++----------- grin/src/sync.rs | 7 +-- 2 files changed, 116 insertions(+), 40 deletions(-) diff --git a/chain/src/chain.rs b/chain/src/chain.rs index cf4c1a428..3b46e57b0 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -15,7 +15,7 @@ //! Facade and handler for the rest of the blockchain implementation //! and mostly the chain pipeline. -use std::collections::VecDeque; +use std::collections::HashMap; use std::sync::{Arc, Mutex, RwLock}; use util::secp::pedersen::{Commitment, RangeProof}; @@ -33,7 +33,84 @@ use sumtree; use types::*; use util::LOGGER; -const MAX_ORPHANS: usize = 50; +const MAX_ORPHANS: usize = 100; + +#[derive(Debug, Clone)] +struct Orphan { + block: Block, + opts: Options, +} + +struct OrphanBlockPool { + // blocks indexed by their hash + orphans: RwLock>, + // additional index of previous -> hash + // so we can efficiently identify a child block (ex-orphan) after processing a block + prev_idx: RwLock>, +} + +impl OrphanBlockPool { + fn new() -> OrphanBlockPool { + OrphanBlockPool { + orphans: RwLock::new(HashMap::new()), + prev_idx: RwLock::new(HashMap::new()), + } + } + + fn len(&self) -> usize { + let orphans = self.orphans.read().unwrap(); + orphans.len() + } + + fn add(&self, orphan: Orphan) { + { + let mut orphans = self.orphans.write().unwrap(); + let mut prev_idx = self.prev_idx.write().unwrap(); + orphans.insert(orphan.block.hash(), orphan.clone()); + prev_idx.insert(orphan.block.header.previous, orphan.block.hash()); + } + + if self.len() > MAX_ORPHANS { + let max = { + let orphans = self.orphans.read().unwrap(); + orphans.values().max_by_key(|x| x.block.header.height).cloned() + }; + + if let Some(x) = max { + let mut orphans = self.orphans.write().unwrap(); + let mut prev_idx = self.prev_idx.write().unwrap(); + orphans.remove(&x.block.hash()); + prev_idx.remove(&x.block.header.previous); + } + } + } + + fn remove(&self, hash: &Hash) -> Option { + let mut orphans = self.orphans.write().unwrap(); + let mut prev_idx = self.prev_idx.write().unwrap(); + let orphan = orphans.remove(hash); + if let Some(x) = orphan.clone() { + prev_idx.remove(&x.block.header.previous); + } + orphan + } + + /// Get an orphan from the pool indexed by the hash of its parent + fn get_by_previous(&self, hash: &Hash) -> Option { + let orphans = self.orphans.read().unwrap(); + let prev_idx = self.prev_idx.read().unwrap(); + if let Some(hash) = prev_idx.get(hash) { + orphans.get(hash).cloned() + } else { + None + } + } + + fn contains(&self, hash: &Hash) -> bool { + let orphans = self.orphans.read().unwrap(); + orphans.contains_key(hash) + } +} /// Facade to the blockchain block processing pipeline and storage. Provides /// the current view of the UTXO set according to the chain state. Also @@ -43,7 +120,7 @@ pub struct Chain { adapter: Arc, head: Arc>, - orphans: Arc>>, + orphans: Arc, sumtrees: Arc>, // POW verification function @@ -123,7 +200,7 @@ impl Chain { store: store, adapter: adapter, head: Arc::new(Mutex::new(head)), - orphans: Arc::new(Mutex::new(VecDeque::with_capacity(MAX_ORPHANS + 1))), + orphans: Arc::new(OrphanBlockPool::new()), sumtrees: Arc::new(RwLock::new(sumtrees)), pow_verifier: pow_verifier, }) @@ -132,7 +209,9 @@ impl Chain { /// Attempt to add a new block to the chain. Returns the new chain tip if it /// has been added to the longest chain, None if it's added to an (as of /// now) orphan chain. - pub fn process_block(&self, b: Block, opts: Options) -> Result, Error> { + pub fn process_block(&self, b: Block, opts: Options) + -> Result, Error> + { let head = self.store .head() .map_err(|e| Error::StoreErr(e, "chain load head".to_owned()))?; @@ -156,7 +235,8 @@ impl Chain { let adapter = self.adapter.clone(); adapter.block_accepted(&b); } - self.check_orphans(); + // We just accepted a block so see if we can now accept any orphan(s) + self.check_orphans(&b); }, Ok(None) => { // block got accepted but we did not extend the head @@ -173,23 +253,30 @@ impl Chain { let adapter = self.adapter.clone(); adapter.block_accepted(&b); } - // We accepted a block here so there is a chance we can now accept - // one or more orphans. - self.check_orphans(); + // We just accepted a block so see if we can now accept any orphan(s) + self.check_orphans(&b); }, Err(Error::Orphan) => { // TODO - Do we want to check that orphan height is > current height? // TODO - Just check heights here? Or should we be checking total_difficulty as well? let block_hash = b.hash(); if b.header.height < height + (MAX_ORPHANS as u64) { - let mut orphans = self.orphans.lock().unwrap(); - orphans.push_front((opts, b)); - orphans.truncate(MAX_ORPHANS); + let orphan = Orphan { + block: b.clone(), + opts: opts, + }; + + // In the case of a fork - it is possible to have multiple blocks + // that are children of a given block. + // We do not handle this currently for orphans (future enhancement?). + // We just assume "last one wins" for now. + &self.orphans.add(orphan); + debug!( LOGGER, "process_block: orphan: {:?}, # orphans {}", block_hash, - orphans.len(), + self.orphans.len(), ); } else { debug!( @@ -220,7 +307,6 @@ impl Chain { ); } } - res } @@ -250,31 +336,22 @@ impl Chain { /// Check if hash is for a known orphan. pub fn is_orphan(&self, hash: &Hash) -> bool { - let orphans = self.orphans.lock().unwrap(); - orphans.iter().any(|&(_, ref x)| x.hash() == hash.clone()) + self.orphans.contains(hash) } - /// Pop orphans out of the queue and check if we can now accept them. - fn check_orphans(&self) { - // first check how many we have to retry, unfort. we can't extend the lock - // in the loop as it needs to be freed before going in process_block - let orphan_count; - { - let orphans = self.orphans.lock().unwrap(); - orphan_count = orphans.len(); - } - debug!(LOGGER, "check_orphans: # orphans {}", orphan_count); + fn check_orphans(&self, block: &Block) { + debug!( + LOGGER, + "chain: check_orphans: # orphans {}", + self.orphans.len(), + ); - // pop each orphan and retry, if still orphaned, will be pushed again - for _ in 0..orphan_count { - let popped; - { - let mut orphans = self.orphans.lock().unwrap(); - popped = orphans.pop_back(); - } - if let Some((opts, orphan)) = popped { - let _process_result = self.process_block(orphan, opts); - } + // Is there an orphan in our orphans that we can now process? + // We just processed the given block, are there any orphans that have this block + // as their "previous" block? + if let Some(orphan) = self.orphans.get_by_previous(&block.hash()) { + self.orphans.remove(&orphan.block.hash()); + let _ = self.process_block(orphan.block, orphan.opts); } } diff --git a/grin/src/sync.rs b/grin/src/sync.rs index 96150f009..f6d90da42 100644 --- a/grin/src/sync.rs +++ b/grin/src/sync.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::thread; +use std::{cmp, thread}; use std::time::Duration; use std::sync::{Arc, RwLock}; use std::sync::atomic::{AtomicBool, Ordering}; @@ -112,11 +112,10 @@ fn body_sync(peers: Peers, chain: Arc) { // if we have 5 most_work_peers then ask for 50 blocks total (peer_count * 10) // max will be 80 if all 8 peers are advertising most_work - let peer_count = { - peers.most_work_peers().len() - }; + let peer_count = cmp::max(peers.most_work_peers().len(), 10); let block_count = peer_count * 10; + let hashes_to_get = hashes .iter() .filter(|x| {