a little bit speed-up for full node syncing (#1463)

* simplified: track the head and orphans instead of every single block hash that sync requests
* change 1st block timeout from 1s to 5s
This commit is contained in:
Gary Yu 2018-09-11 03:32:20 +08:00 committed by Ignotus Peverell
parent 277687c17c
commit e760ea2ce2
2 changed files with 117 additions and 6 deletions

View file

@ -17,6 +17,7 @@
use std::collections::{HashMap, VecDeque}; use std::collections::{HashMap, VecDeque};
use std::fs::File; use std::fs::File;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@ -59,6 +60,8 @@ pub struct OrphanBlockPool {
// additional index of height -> hash // additional index of height -> hash
// so we can efficiently identify a child block (ex-orphan) after processing a block // so we can efficiently identify a child block (ex-orphan) after processing a block
height_idx: RwLock<HashMap<u64, Vec<Hash>>>, height_idx: RwLock<HashMap<u64, Vec<Hash>>>,
// accumulated number of evicted block because of MAX_ORPHAN_SIZE limitation
evicted: AtomicUsize,
} }
impl OrphanBlockPool { impl OrphanBlockPool {
@ -66,6 +69,7 @@ impl OrphanBlockPool {
OrphanBlockPool { OrphanBlockPool {
orphans: RwLock::new(HashMap::new()), orphans: RwLock::new(HashMap::new()),
height_idx: RwLock::new(HashMap::new()), height_idx: RwLock::new(HashMap::new()),
evicted: AtomicUsize::new(0),
} }
} }
@ -74,6 +78,10 @@ impl OrphanBlockPool {
orphans.len() orphans.len()
} }
fn len_evicted(&self) -> usize {
self.evicted.load(Ordering::Relaxed)
}
fn add(&self, orphan: Orphan) { fn add(&self, orphan: Orphan) {
let mut orphans = self.orphans.write().unwrap(); let mut orphans = self.orphans.write().unwrap();
let mut height_idx = self.height_idx.write().unwrap(); let mut height_idx = self.height_idx.write().unwrap();
@ -86,6 +94,8 @@ impl OrphanBlockPool {
} }
if orphans.len() > MAX_ORPHAN_SIZE { if orphans.len() > MAX_ORPHAN_SIZE {
let old_len = orphans.len();
// evict too old // evict too old
orphans.retain(|_, ref mut x| { orphans.retain(|_, ref mut x| {
x.added.elapsed() < Duration::from_secs(MAX_ORPHAN_AGE_SECS) x.added.elapsed() < Duration::from_secs(MAX_ORPHAN_AGE_SECS)
@ -105,6 +115,9 @@ impl OrphanBlockPool {
} }
// cleanup index // cleanup index
height_idx.retain(|_, ref mut xs| xs.iter().any(|x| orphans.contains_key(&x))); height_idx.retain(|_, ref mut xs| xs.iter().any(|x| orphans.contains_key(&x)));
self.evicted
.fetch_add(old_len - orphans.len(), Ordering::Relaxed);
} }
} }
@ -279,9 +292,14 @@ impl Chain {
debug!( debug!(
LOGGER, LOGGER,
"process_block: orphan: {:?}, # orphans {}", "process_block: orphan: {:?}, # orphans {}{}",
block_hash, block_hash,
self.orphans.len(), self.orphans.len(),
if self.orphans.len_evicted() > 0 {
format!(", # evicted {}", self.orphans.len_evicted())
} else {
String::new()
},
); );
Err(ErrorKind::Orphan.into()) Err(ErrorKind::Orphan.into())
} }
@ -351,6 +369,11 @@ impl Chain {
self.orphans.contains(hash) self.orphans.contains(hash)
} }
/// Get the OrphanBlockPool accumulated evicted number of blocks
pub fn orphans_evicted_len(&self) -> usize {
self.orphans.len_evicted()
}
/// Check for orphans, once a block is successfully added /// Check for orphans, once a block is successfully added
pub fn check_orphans(&self, mut height: u64) { pub fn check_orphans(&self, mut height: u64) {
let initial_height = height; let initial_height = height;

View file

@ -113,6 +113,13 @@ pub fn run_sync(
// the genesis state // the genesis state
let mut history_locators: Vec<(u64, Hash)> = vec![]; let mut history_locators: Vec<(u64, Hash)> = vec![];
let mut body_sync_info = BodySyncInfo {
sync_start_ts: Utc::now(),
body_sync_hashes: vec![],
prev_body_received: None,
prev_tip: chain.head().unwrap(),
prev_orphans_len: 0,
};
loop { loop {
let horizon = global::cut_through_horizon() as u64; let horizon = global::cut_through_horizon() as u64;
@ -192,8 +199,8 @@ pub fn run_sync(
} }
} else { } else {
// run the body_sync every 5s // run the body_sync every 5s
if si.body_sync_due(&head) { if si.body_sync_due(&head, chain.clone(), &mut body_sync_info) {
body_sync(peers.clone(), chain.clone()); body_sync(peers.clone(), chain.clone(), &mut body_sync_info);
sync_state.update(SyncStatus::BodySync { sync_state.update(SyncStatus::BodySync {
current_height: head.height, current_height: head.height,
highest_height: si.highest_height, highest_height: si.highest_height,
@ -213,12 +220,81 @@ pub fn run_sync(
}); });
} }
fn body_sync(peers: Arc<Peers>, chain: Arc<chain::Chain>) { struct BodySyncInfo {
sync_start_ts: DateTime<Utc>,
body_sync_hashes: Vec<Hash>,
prev_body_received: Option<DateTime<Utc>>,
prev_tip: chain::Tip,
prev_orphans_len: usize,
}
impl BodySyncInfo {
fn reset(&mut self) {
self.body_sync_hashes.clear();
self.prev_body_received = None;
}
fn reset_start(&mut self, chain: Arc<chain::Chain>) {
self.prev_tip = chain.head().unwrap();
self.prev_orphans_len = chain.orphans_len() + chain.orphans_evicted_len();
self.sync_start_ts = Utc::now();
}
fn body_no_more(&mut self, chain: Arc<chain::Chain>) -> bool {
let tip = chain.head().unwrap();
match self.prev_body_received {
Some(prev_ts) => {
if tip.last_block_h == self.prev_tip.last_block_h
&& chain.orphans_len() + chain.orphans_evicted_len() == self.prev_orphans_len
&& Utc::now() - prev_ts > Duration::milliseconds(200)
{
let hashes_not_get = self
.body_sync_hashes
.iter()
.filter(|x| !chain.get_block(*x).is_ok() && !chain.is_orphan(*x))
.collect::<Vec<_>>();
debug!(
LOGGER,
"body_sync: {}/{} blocks received, and no more in 200ms",
self.body_sync_hashes.len() - hashes_not_get.len(),
self.body_sync_hashes.len(),
);
return true;
}
}
None => {
if Utc::now() - self.sync_start_ts > Duration::seconds(5) {
debug!(
LOGGER,
"body_sync: 0/{} blocks received in 5s",
self.body_sync_hashes.len(),
);
return true;
}
}
}
if tip.last_block_h != self.prev_tip.last_block_h
|| chain.orphans_len() + chain.orphans_evicted_len() != self.prev_orphans_len
{
self.prev_tip = tip;
self.prev_body_received = Some(Utc::now());
self.prev_orphans_len = chain.orphans_len() + chain.orphans_evicted_len();
}
return false;
}
}
fn body_sync(peers: Arc<Peers>, chain: Arc<chain::Chain>, body_sync_info: &mut BodySyncInfo) {
let horizon = global::cut_through_horizon() as u64; let horizon = global::cut_through_horizon() as u64;
let body_head: chain::Tip = chain.head().unwrap(); let body_head: chain::Tip = chain.head().unwrap();
let header_head: chain::Tip = chain.get_header_head().unwrap(); let header_head: chain::Tip = chain.get_header_head().unwrap();
let sync_head: chain::Tip = chain.get_sync_head().unwrap(); let sync_head: chain::Tip = chain.get_sync_head().unwrap();
body_sync_info.reset();
debug!( debug!(
LOGGER, LOGGER,
"body_sync: body_head - {}, {}, header_head - {}, {}, sync_head - {}, {}", "body_sync: body_head - {}, {}, header_head - {}, {}, sync_head - {}, {}",
@ -289,11 +365,15 @@ fn body_sync(peers: Arc<Peers>, chain: Arc<chain::Chain>) {
if let Ok(peer) = peer.try_read() { if let Ok(peer) = peer.try_read() {
if let Err(e) = peer.send_block_request(*hash) { if let Err(e) = peer.send_block_request(*hash) {
debug!(LOGGER, "Skipped request to {}: {:?}", peer.info.addr, e); debug!(LOGGER, "Skipped request to {}: {:?}", peer.info.addr, e);
} else {
body_sync_info.body_sync_hashes.push(hash.clone());
} }
} }
} }
} }
} }
body_sync_info.reset_start(chain);
} }
fn header_sync( fn header_sync(
@ -606,11 +686,19 @@ impl SyncInfo {
} }
} }
fn body_sync_due(&mut self, head: &chain::Tip) -> bool { fn body_sync_due(
&mut self,
head: &chain::Tip,
chain: Arc<chain::Chain>,
body_sync_info: &mut BodySyncInfo,
) -> bool {
let now = Utc::now(); let now = Utc::now();
let (prev_ts, prev_height) = self.prev_body_sync; let (prev_ts, prev_height) = self.prev_body_sync;
if head.height >= prev_height + 96 || now - prev_ts > Duration::seconds(5) { if head.height >= prev_height + 96
|| now - prev_ts > Duration::seconds(5)
|| body_sync_info.body_no_more(chain)
{
self.prev_body_sync = (now, head.height); self.prev_body_sync = (now, head.height);
return true; return true;
} }