Smarter orphan blocks (#537)

* wip

* rework check_orphans to be smart about _which_ orphan(s) to check

* cleanup

* limit max of 100 blocks at a time, and corresponding 100 max orphan blocks
This commit is contained in:
AntiochP 2017-12-21 17:29:24 -05:00 committed by GitHub
parent 74ba0d5e88
commit 9ddb1489a5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 116 additions and 40 deletions

View file

@ -15,7 +15,7 @@
//! Facade and handler for the rest of the blockchain implementation //! Facade and handler for the rest of the blockchain implementation
//! and mostly the chain pipeline. //! and mostly the chain pipeline.
use std::collections::VecDeque; use std::collections::HashMap;
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, Mutex, RwLock};
use util::secp::pedersen::{Commitment, RangeProof}; use util::secp::pedersen::{Commitment, RangeProof};
@ -33,7 +33,84 @@ use sumtree;
use types::*; use types::*;
use util::LOGGER; use util::LOGGER;
const MAX_ORPHANS: usize = 50; const MAX_ORPHANS: usize = 100;
#[derive(Debug, Clone)]
struct Orphan {
block: Block,
opts: Options,
}
struct OrphanBlockPool {
// blocks indexed by their hash
orphans: RwLock<HashMap<Hash, Orphan>>,
// additional index of previous -> hash
// so we can efficiently identify a child block (ex-orphan) after processing a block
prev_idx: RwLock<HashMap<Hash, Hash>>,
}
impl OrphanBlockPool {
fn new() -> OrphanBlockPool {
OrphanBlockPool {
orphans: RwLock::new(HashMap::new()),
prev_idx: RwLock::new(HashMap::new()),
}
}
fn len(&self) -> usize {
let orphans = self.orphans.read().unwrap();
orphans.len()
}
fn add(&self, orphan: Orphan) {
{
let mut orphans = self.orphans.write().unwrap();
let mut prev_idx = self.prev_idx.write().unwrap();
orphans.insert(orphan.block.hash(), orphan.clone());
prev_idx.insert(orphan.block.header.previous, orphan.block.hash());
}
if self.len() > MAX_ORPHANS {
let max = {
let orphans = self.orphans.read().unwrap();
orphans.values().max_by_key(|x| x.block.header.height).cloned()
};
if let Some(x) = max {
let mut orphans = self.orphans.write().unwrap();
let mut prev_idx = self.prev_idx.write().unwrap();
orphans.remove(&x.block.hash());
prev_idx.remove(&x.block.header.previous);
}
}
}
fn remove(&self, hash: &Hash) -> Option<Orphan> {
let mut orphans = self.orphans.write().unwrap();
let mut prev_idx = self.prev_idx.write().unwrap();
let orphan = orphans.remove(hash);
if let Some(x) = orphan.clone() {
prev_idx.remove(&x.block.header.previous);
}
orphan
}
/// Get an orphan from the pool indexed by the hash of its parent
fn get_by_previous(&self, hash: &Hash) -> Option<Orphan> {
let orphans = self.orphans.read().unwrap();
let prev_idx = self.prev_idx.read().unwrap();
if let Some(hash) = prev_idx.get(hash) {
orphans.get(hash).cloned()
} else {
None
}
}
fn contains(&self, hash: &Hash) -> bool {
let orphans = self.orphans.read().unwrap();
orphans.contains_key(hash)
}
}
/// Facade to the blockchain block processing pipeline and storage. Provides /// Facade to the blockchain block processing pipeline and storage. Provides
/// the current view of the UTXO set according to the chain state. Also /// the current view of the UTXO set according to the chain state. Also
@ -43,7 +120,7 @@ pub struct Chain {
adapter: Arc<ChainAdapter>, adapter: Arc<ChainAdapter>,
head: Arc<Mutex<Tip>>, head: Arc<Mutex<Tip>>,
orphans: Arc<Mutex<VecDeque<(Options, Block)>>>, orphans: Arc<OrphanBlockPool>,
sumtrees: Arc<RwLock<sumtree::SumTrees>>, sumtrees: Arc<RwLock<sumtree::SumTrees>>,
// POW verification function // POW verification function
@ -123,7 +200,7 @@ impl Chain {
store: store, store: store,
adapter: adapter, adapter: adapter,
head: Arc::new(Mutex::new(head)), head: Arc::new(Mutex::new(head)),
orphans: Arc::new(Mutex::new(VecDeque::with_capacity(MAX_ORPHANS + 1))), orphans: Arc::new(OrphanBlockPool::new()),
sumtrees: Arc::new(RwLock::new(sumtrees)), sumtrees: Arc::new(RwLock::new(sumtrees)),
pow_verifier: pow_verifier, pow_verifier: pow_verifier,
}) })
@ -132,7 +209,9 @@ impl Chain {
/// Attempt to add a new block to the chain. Returns the new chain tip if it /// Attempt to add a new block to the chain. Returns the new chain tip if it
/// has been added to the longest chain, None if it's added to an (as of /// has been added to the longest chain, None if it's added to an (as of
/// now) orphan chain. /// now) orphan chain.
pub fn process_block(&self, b: Block, opts: Options) -> Result<Option<Tip>, Error> { pub fn process_block(&self, b: Block, opts: Options)
-> Result<Option<Tip>, Error>
{
let head = self.store let head = self.store
.head() .head()
.map_err(|e| Error::StoreErr(e, "chain load head".to_owned()))?; .map_err(|e| Error::StoreErr(e, "chain load head".to_owned()))?;
@ -156,7 +235,8 @@ impl Chain {
let adapter = self.adapter.clone(); let adapter = self.adapter.clone();
adapter.block_accepted(&b); adapter.block_accepted(&b);
} }
self.check_orphans(); // We just accepted a block so see if we can now accept any orphan(s)
self.check_orphans(&b);
}, },
Ok(None) => { Ok(None) => {
// block got accepted but we did not extend the head // block got accepted but we did not extend the head
@ -173,23 +253,30 @@ impl Chain {
let adapter = self.adapter.clone(); let adapter = self.adapter.clone();
adapter.block_accepted(&b); adapter.block_accepted(&b);
} }
// We accepted a block here so there is a chance we can now accept // We just accepted a block so see if we can now accept any orphan(s)
// one or more orphans. self.check_orphans(&b);
self.check_orphans();
}, },
Err(Error::Orphan) => { Err(Error::Orphan) => {
// TODO - Do we want to check that orphan height is > current height? // TODO - Do we want to check that orphan height is > current height?
// TODO - Just check heights here? Or should we be checking total_difficulty as well? // TODO - Just check heights here? Or should we be checking total_difficulty as well?
let block_hash = b.hash(); let block_hash = b.hash();
if b.header.height < height + (MAX_ORPHANS as u64) { if b.header.height < height + (MAX_ORPHANS as u64) {
let mut orphans = self.orphans.lock().unwrap(); let orphan = Orphan {
orphans.push_front((opts, b)); block: b.clone(),
orphans.truncate(MAX_ORPHANS); opts: opts,
};
// In the case of a fork - it is possible to have multiple blocks
// that are children of a given block.
// We do not handle this currently for orphans (future enhancement?).
// We just assume "last one wins" for now.
&self.orphans.add(orphan);
debug!( debug!(
LOGGER, LOGGER,
"process_block: orphan: {:?}, # orphans {}", "process_block: orphan: {:?}, # orphans {}",
block_hash, block_hash,
orphans.len(), self.orphans.len(),
); );
} else { } else {
debug!( debug!(
@ -220,7 +307,6 @@ impl Chain {
); );
} }
} }
res res
} }
@ -250,31 +336,22 @@ impl Chain {
/// Check if hash is for a known orphan. /// Check if hash is for a known orphan.
pub fn is_orphan(&self, hash: &Hash) -> bool { pub fn is_orphan(&self, hash: &Hash) -> bool {
let orphans = self.orphans.lock().unwrap(); self.orphans.contains(hash)
orphans.iter().any(|&(_, ref x)| x.hash() == hash.clone())
} }
/// Pop orphans out of the queue and check if we can now accept them. fn check_orphans(&self, block: &Block) {
fn check_orphans(&self) { debug!(
// first check how many we have to retry, unfort. we can't extend the lock LOGGER,
// in the loop as it needs to be freed before going in process_block "chain: check_orphans: # orphans {}",
let orphan_count; self.orphans.len(),
{ );
let orphans = self.orphans.lock().unwrap();
orphan_count = orphans.len();
}
debug!(LOGGER, "check_orphans: # orphans {}", orphan_count);
// pop each orphan and retry, if still orphaned, will be pushed again // Is there an orphan in our orphans that we can now process?
for _ in 0..orphan_count { // We just processed the given block, are there any orphans that have this block
let popped; // as their "previous" block?
{ if let Some(orphan) = self.orphans.get_by_previous(&block.hash()) {
let mut orphans = self.orphans.lock().unwrap(); self.orphans.remove(&orphan.block.hash());
popped = orphans.pop_back(); let _ = self.process_block(orphan.block, orphan.opts);
}
if let Some((opts, orphan)) = popped {
let _process_result = self.process_block(orphan, opts);
}
} }
} }

View file

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::thread; use std::{cmp, thread};
use std::time::Duration; use std::time::Duration;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
@ -112,11 +112,10 @@ fn body_sync(peers: Peers, chain: Arc<chain::Chain>) {
// if we have 5 most_work_peers then ask for 50 blocks total (peer_count * 10) // if we have 5 most_work_peers then ask for 50 blocks total (peer_count * 10)
// max will be 80 if all 8 peers are advertising most_work // max will be 80 if all 8 peers are advertising most_work
let peer_count = { let peer_count = cmp::max(peers.most_work_peers().len(), 10);
peers.most_work_peers().len()
};
let block_count = peer_count * 10; let block_count = peer_count * 10;
let hashes_to_get = hashes let hashes_to_get = hashes
.iter() .iter()
.filter(|x| { .filter(|x| {