Improvements to orphan handling to help sync ()

Our chain orphans data structure now does:

* Eviction based on a max total number of orphaned blocks
* Evicts further away first, then too old
* Avoids all cloning (blocks can be big)

This allows sync to be a little more stupid, only reducing the
number of blocks it requests when orphans get nearly full.

Fixes 
This commit is contained in:
Ignotus Peverell 2018-04-05 03:24:43 +00:00 committed by GitHub
parent f247bda834
commit c460f9876a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 59 additions and 52 deletions

View file

@ -33,7 +33,8 @@ use types::*;
use util::secp::pedersen::RangeProof;
use util::LOGGER;
const MAX_ORPHAN_AGE_SECS: u64 = 30;
const MAX_ORPHAN_AGE_SECS: u64 = 300;
pub const MAX_ORPHAN_SIZE: usize = 200;
#[derive(Debug, Clone)]
struct Orphan {
@ -45,16 +46,16 @@ struct Orphan {
struct OrphanBlockPool {
// blocks indexed by their hash
orphans: RwLock<HashMap<Hash, Orphan>>,
// additional index of previous -> hash
// additional index of height -> hash
// so we can efficiently identify a child block (ex-orphan) after processing a block
prev_idx: RwLock<HashMap<Hash, Hash>>,
height_idx: RwLock<HashMap<u64, Vec<Hash>>>,
}
impl OrphanBlockPool {
fn new() -> OrphanBlockPool {
OrphanBlockPool {
orphans: RwLock::new(HashMap::new()),
prev_idx: RwLock::new(HashMap::new()),
height_idx: RwLock::new(HashMap::new()),
}
}
@ -64,42 +65,43 @@ impl OrphanBlockPool {
}
fn add(&self, orphan: Orphan) {
let mut orphans = self.orphans.write().unwrap();
let mut height_idx = self.height_idx.write().unwrap();
{
let mut orphans = self.orphans.write().unwrap();
let mut prev_idx = self.prev_idx.write().unwrap();
orphans.insert(orphan.block.hash(), orphan.clone());
prev_idx.insert(orphan.block.header.previous, orphan.block.hash());
let height_hashes = height_idx
.entry(orphan.block.header.height)
.or_insert(vec![]);
height_hashes.push(orphan.block.hash());
orphans.insert(orphan.block.hash(), orphan);
}
{
let mut orphans = self.orphans.write().unwrap();
let mut prev_idx = self.prev_idx.write().unwrap();
if orphans.len() > MAX_ORPHAN_SIZE {
// evict too old
orphans.retain(|_, ref mut x| {
x.added.elapsed() < Duration::from_secs(MAX_ORPHAN_AGE_SECS)
});
prev_idx.retain(|_, &mut x| orphans.contains_key(&x));
// evict too far ahead
let mut heights = height_idx.keys().cloned().collect::<Vec<u64>>();
heights.sort_unstable();
for h in heights.iter().rev() {
let _ = self.remove_by_height(h);
if orphans.len() < MAX_ORPHAN_SIZE {
break;
}
}
// cleanup index
height_idx.retain(|_, ref mut xs| xs.iter().any(|x| orphans.contains_key(&x)));
}
}
fn remove(&self, hash: &Hash) -> Option<Orphan> {
/// Get an orphan from the pool indexed by the hash of its parent, removing
/// it at the same time, preventing clone
fn remove_by_height(&self, height: &u64) -> Option<Vec<Orphan>> {
let mut orphans = self.orphans.write().unwrap();
let mut prev_idx = self.prev_idx.write().unwrap();
let orphan = orphans.remove(hash);
if let Some(x) = orphan.clone() {
prev_idx.remove(&x.block.header.previous);
}
orphan
}
/// Get an orphan from the pool indexed by the hash of its parent
fn get_by_previous(&self, hash: &Hash) -> Option<Orphan> {
let orphans = self.orphans.read().unwrap();
let prev_idx = self.prev_idx.read().unwrap();
if let Some(hash) = prev_idx.get(hash) {
orphans.get(hash).cloned()
} else {
None
}
let mut height_idx = self.height_idx.write().unwrap();
height_idx
.remove(height)
.map(|hs| hs.iter().filter_map(|h| orphans.remove(h)).collect())
}
fn contains(&self, hash: &Hash) -> bool {
@ -235,7 +237,7 @@ impl Chain {
Ok((t, b)) => {
// We accepted a block, so see if we can accept any orphans
if let Some(ref b) = b {
self.check_orphans(b.hash());
self.check_orphans(b.header.height + 1);
}
Ok((t, b))
}
@ -371,32 +373,25 @@ impl Chain {
}
/// Check for orphans, once a block is successfully added
pub fn check_orphans(&self, mut last_block_hash: Hash) {
pub fn check_orphans(&self, mut height: u64) {
trace!(
LOGGER,
"chain: check_orphans: # orphans {}",
"chain: check_orphans at {}, # orphans {}",
height,
self.orphans.len(),
);
// Is there an orphan in our orphans that we can now process?
// We just processed the given block, are there any orphans that have this block
// as their "previous" block?
loop {
if let Some(orphan) = self.orphans.get_by_previous(&last_block_hash) {
self.orphans.remove(&orphan.block.hash());
let res = self.process_block_no_orphans(orphan.block, orphan.opts);
match res {
Ok((_, b)) => {
if let Some(orphans) = self.orphans.remove_by_height(&height) {
for orphan in orphans {
let res = self.process_block_no_orphans(orphan.block, orphan.opts);
if let Ok((_, Some(b))) = res {
// We accepted a block, so see if we can accept any orphans
if b.is_some() {
last_block_hash = b.unwrap().hash();
} else {
break;
}
}
Err(_) => {
height = b.header.height + 1;
} else {
break;
}
};
}
} else {
break;
}
@ -564,7 +559,7 @@ impl Chain {
self.store.build_by_height_index(&header, true)?;
}
self.check_orphans(header.hash());
self.check_orphans(header.height + 1);
Ok(())
}
@ -659,6 +654,10 @@ impl Chain {
self.head.lock().unwrap().clone().total_difficulty
}
pub fn orphans_len(&self) -> usize {
self.orphans.len()
}
/// Total difficulty at the head of the header chain
pub fn total_header_difficulty(&self) -> Result<Difficulty, Error> {
Ok(self.store.get_header_head()?.total_difficulty)

View file

@ -30,6 +30,7 @@ extern crate serde_derive;
extern crate slog;
extern crate time;
#[macro_use]
extern crate grin_core as core;
extern crate grin_keychain as keychain;
extern crate grin_store;
@ -43,5 +44,5 @@ pub mod types;
// Re-export the base interface
pub use chain::Chain;
pub use chain::{Chain, MAX_ORPHAN_SIZE};
pub use types::{ChainAdapter, ChainStore, Error, Options, Tip};

View file

@ -143,7 +143,7 @@ log_file_append = true
#flag whether mining is enabled
enable_mining = true
enable_mining = false
#Whether to use async mode for the miner, if the plugin supports it.
#this allows for many searches to be run in parallel, e.g. if the system

View file

@ -146,7 +146,14 @@ fn body_sync(peers: Arc<Peers>, chain: Arc<chain::Chain>) {
// if we have 5 peers to sync from then ask for 50 blocks total (peer_count *
// 10) max will be 80 if all 8 peers are advertising more work
let peer_count = cmp::min(peers.more_work_peers().len(), 10);
let block_count = peer_count * 10;
let mut block_count = peer_count * 10;
// if the chain is already saturated with orphans, throttle
// still asking for at least 1 unknown block to avoid getting stuck
block_count = cmp::min(
block_count,
chain::MAX_ORPHAN_SIZE.saturating_sub(chain.orphans_len()) + 1,
);
let hashes_to_get = hashes
.iter()