diff --git a/chain/src/chain.rs b/chain/src/chain.rs index 685381998..dc6c63858 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -17,6 +17,7 @@ use std::collections::{HashMap, VecDeque}; use std::fs::File; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, RwLock}; use std::time::{Duration, Instant}; @@ -59,6 +60,8 @@ pub struct OrphanBlockPool { // additional index of height -> hash // so we can efficiently identify a child block (ex-orphan) after processing a block height_idx: RwLock>>, + // accumulated number of evicted block because of MAX_ORPHAN_SIZE limitation + evicted: AtomicUsize, } impl OrphanBlockPool { @@ -66,6 +69,7 @@ impl OrphanBlockPool { OrphanBlockPool { orphans: RwLock::new(HashMap::new()), height_idx: RwLock::new(HashMap::new()), + evicted: AtomicUsize::new(0), } } @@ -74,6 +78,10 @@ impl OrphanBlockPool { orphans.len() } + fn len_evicted(&self) -> usize { + self.evicted.load(Ordering::Relaxed) + } + fn add(&self, orphan: Orphan) { let mut orphans = self.orphans.write().unwrap(); let mut height_idx = self.height_idx.write().unwrap(); @@ -86,6 +94,8 @@ impl OrphanBlockPool { } if orphans.len() > MAX_ORPHAN_SIZE { + let old_len = orphans.len(); + // evict too old orphans.retain(|_, ref mut x| { x.added.elapsed() < Duration::from_secs(MAX_ORPHAN_AGE_SECS) @@ -105,6 +115,9 @@ impl OrphanBlockPool { } // cleanup index height_idx.retain(|_, ref mut xs| xs.iter().any(|x| orphans.contains_key(&x))); + + self.evicted + .fetch_add(old_len - orphans.len(), Ordering::Relaxed); } } @@ -279,9 +292,14 @@ impl Chain { debug!( LOGGER, - "process_block: orphan: {:?}, # orphans {}", + "process_block: orphan: {:?}, # orphans {}{}", block_hash, self.orphans.len(), + if self.orphans.len_evicted() > 0 { + format!(", # evicted {}", self.orphans.len_evicted()) + } else { + String::new() + }, ); Err(ErrorKind::Orphan.into()) } @@ -351,6 +369,11 @@ impl Chain { self.orphans.contains(hash) } + /// Get the OrphanBlockPool accumulated evicted number of blocks + pub fn orphans_evicted_len(&self) -> usize { + self.orphans.len_evicted() + } + /// Check for orphans, once a block is successfully added pub fn check_orphans(&self, mut height: u64) { let initial_height = height; diff --git a/servers/src/grin/sync.rs b/servers/src/grin/sync.rs index 68e23c499..0fa740b17 100644 --- a/servers/src/grin/sync.rs +++ b/servers/src/grin/sync.rs @@ -113,6 +113,13 @@ pub fn run_sync( // the genesis state let mut history_locators: Vec<(u64, Hash)> = vec![]; + let mut body_sync_info = BodySyncInfo { + sync_start_ts: Utc::now(), + body_sync_hashes: vec![], + prev_body_received: None, + prev_tip: chain.head().unwrap(), + prev_orphans_len: 0, + }; loop { let horizon = global::cut_through_horizon() as u64; @@ -192,8 +199,8 @@ pub fn run_sync( } } else { // run the body_sync every 5s - if si.body_sync_due(&head) { - body_sync(peers.clone(), chain.clone()); + if si.body_sync_due(&head, chain.clone(), &mut body_sync_info) { + body_sync(peers.clone(), chain.clone(), &mut body_sync_info); sync_state.update(SyncStatus::BodySync { current_height: head.height, highest_height: si.highest_height, @@ -213,12 +220,81 @@ pub fn run_sync( }); } -fn body_sync(peers: Arc, chain: Arc) { +struct BodySyncInfo { + sync_start_ts: DateTime, + body_sync_hashes: Vec, + prev_body_received: Option>, + prev_tip: chain::Tip, + prev_orphans_len: usize, +} + +impl BodySyncInfo { + fn reset(&mut self) { + self.body_sync_hashes.clear(); + self.prev_body_received = None; + } + + fn reset_start(&mut self, chain: Arc) { + self.prev_tip = chain.head().unwrap(); + self.prev_orphans_len = chain.orphans_len() + chain.orphans_evicted_len(); + self.sync_start_ts = Utc::now(); + } + + fn body_no_more(&mut self, chain: Arc) -> bool { + let tip = chain.head().unwrap(); + + match self.prev_body_received { + Some(prev_ts) => { + if tip.last_block_h == self.prev_tip.last_block_h + && chain.orphans_len() + chain.orphans_evicted_len() == self.prev_orphans_len + && Utc::now() - prev_ts > Duration::milliseconds(200) + { + let hashes_not_get = self + .body_sync_hashes + .iter() + .filter(|x| !chain.get_block(*x).is_ok() && !chain.is_orphan(*x)) + .collect::>(); + debug!( + LOGGER, + "body_sync: {}/{} blocks received, and no more in 200ms", + self.body_sync_hashes.len() - hashes_not_get.len(), + self.body_sync_hashes.len(), + ); + return true; + } + } + None => { + if Utc::now() - self.sync_start_ts > Duration::seconds(5) { + debug!( + LOGGER, + "body_sync: 0/{} blocks received in 5s", + self.body_sync_hashes.len(), + ); + return true; + } + } + } + + if tip.last_block_h != self.prev_tip.last_block_h + || chain.orphans_len() + chain.orphans_evicted_len() != self.prev_orphans_len + { + self.prev_tip = tip; + self.prev_body_received = Some(Utc::now()); + self.prev_orphans_len = chain.orphans_len() + chain.orphans_evicted_len(); + } + + return false; + } +} + +fn body_sync(peers: Arc, chain: Arc, body_sync_info: &mut BodySyncInfo) { let horizon = global::cut_through_horizon() as u64; let body_head: chain::Tip = chain.head().unwrap(); let header_head: chain::Tip = chain.get_header_head().unwrap(); let sync_head: chain::Tip = chain.get_sync_head().unwrap(); + body_sync_info.reset(); + debug!( LOGGER, "body_sync: body_head - {}, {}, header_head - {}, {}, sync_head - {}, {}", @@ -289,11 +365,15 @@ fn body_sync(peers: Arc, chain: Arc) { if let Ok(peer) = peer.try_read() { if let Err(e) = peer.send_block_request(*hash) { debug!(LOGGER, "Skipped request to {}: {:?}", peer.info.addr, e); + } else { + body_sync_info.body_sync_hashes.push(hash.clone()); } } } } } + + body_sync_info.reset_start(chain); } fn header_sync( @@ -606,11 +686,19 @@ impl SyncInfo { } } - fn body_sync_due(&mut self, head: &chain::Tip) -> bool { + fn body_sync_due( + &mut self, + head: &chain::Tip, + chain: Arc, + body_sync_info: &mut BodySyncInfo, + ) -> bool { let now = Utc::now(); let (prev_ts, prev_height) = self.prev_body_sync; - if head.height >= prev_height + 96 || now - prev_ts > Duration::seconds(5) { + if head.height >= prev_height + 96 + || now - prev_ts > Duration::seconds(5) + || body_sync_info.body_no_more(chain) + { self.prev_body_sync = (now, head.height); return true; }