From 30c20294f59a05a0beac62d6384cc9d52fd682d8 Mon Sep 17 00:00:00 2001 From: AntiochP <30642645+antiochp@users.noreply.github.com> Date: Thu, 28 Dec 2017 19:49:27 -0500 Subject: [PATCH] reset header_head and sync_head on peer ban (#551) (#553) handle orphan blocks based on age (not fixed count) --- chain/src/chain.rs | 93 ++++++++++++++++---------------------------- chain/src/pipe.rs | 2 +- chain/src/store.rs | 7 ++++ chain/src/types.rs | 3 ++ grin/src/adapters.rs | 7 +--- grin/src/sync.rs | 10 +++-- p2p/src/peers.rs | 43 ++++++++++++++++---- 7 files changed, 89 insertions(+), 76 deletions(-) diff --git a/chain/src/chain.rs b/chain/src/chain.rs index 61aac87da..bdc319270 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -17,6 +17,7 @@ use std::collections::HashMap; use std::sync::{Arc, Mutex, RwLock}; +use std::time::{Duration, Instant}; use util::secp::pedersen::{Commitment, RangeProof}; @@ -33,12 +34,13 @@ use sumtree; use types::*; use util::LOGGER; -const MAX_ORPHANS: usize = 100; +const MAX_ORPHAN_AGE_SECS: u64 = 30; #[derive(Debug, Clone)] struct Orphan { block: Block, opts: Options, + added: Instant, } struct OrphanBlockPool { @@ -70,18 +72,11 @@ impl OrphanBlockPool { prev_idx.insert(orphan.block.header.previous, orphan.block.hash()); } - if self.len() > MAX_ORPHANS { - let max = { - let orphans = self.orphans.read().unwrap(); - orphans.values().max_by_key(|x| x.block.header.height).cloned() - }; - - if let Some(x) = max { - let mut orphans = self.orphans.write().unwrap(); - let mut prev_idx = self.prev_idx.write().unwrap(); - orphans.remove(&x.block.hash()); - prev_idx.remove(&x.block.header.previous); - } + { + let mut orphans = self.orphans.write().unwrap(); + let mut prev_idx = self.prev_idx.write().unwrap(); + orphans.retain(|_, ref mut x| x.added.elapsed() < Duration::from_secs(MAX_ORPHAN_AGE_SECS)); + prev_idx.retain(|_, &mut x| orphans.contains_key(&x)); } } @@ -176,13 +171,9 @@ impl Chain { Err(e) => return Err(Error::StoreErr(e, "chain init load head".to_owned())), }; - // Make sure we have a sync_head available for later use. - // We may have been tracking an invalid chain on a now banned peer - // so we want to reset the both sync_head and header_head on restart to handle this. - // TODO - handle sync_head/header_head and peer banning in a more effective way. - let tip = chain_store.head().unwrap(); - chain_store.save_header_head(&tip)?; - chain_store.save_sync_head(&tip)?; + // Reset sync_head and header_head to head of current chain. + // Make sure sync_head is available for later use when needed. + chain_store.reset_head()?; info!( LOGGER, @@ -212,7 +203,6 @@ impl Chain { let head = self.store .head() .map_err(|e| Error::StoreErr(e, "chain load head".to_owned()))?; - let height = head.height; let ctx = self.ctx_from_head(head, opts); let res = pipe::process_block(&b, ctx); @@ -254,36 +244,25 @@ impl Chain { self.check_orphans(&b); }, Err(Error::Orphan) => { - // TODO - Do we want to check that orphan height is > current height? - // TODO - Just check heights here? Or should we be checking total_difficulty as well? let block_hash = b.hash(); - if b.header.height < height + (MAX_ORPHANS as u64) { - let orphan = Orphan { - block: b.clone(), - opts: opts, - }; + let orphan = Orphan { + block: b.clone(), + opts: opts, + added: Instant::now(), + }; - // In the case of a fork - it is possible to have multiple blocks - // that are children of a given block. - // We do not handle this currently for orphans (future enhancement?). - // We just assume "last one wins" for now. - &self.orphans.add(orphan); + // In the case of a fork - it is possible to have multiple blocks + // that are children of a given block. + // We do not handle this currently for orphans (future enhancement?). + // We just assume "last one wins" for now. + &self.orphans.add(orphan); - debug!( - LOGGER, - "process_block: orphan: {:?}, # orphans {}", - block_hash, - self.orphans.len(), - ); - } else { - debug!( - LOGGER, - "process_block: orphan: {:?}, (dropping, height {} vs {})", - block_hash, - b.header.height, - height, - ); - } + debug!( + LOGGER, + "process_block: orphan: {:?}, # orphans {}", + block_hash, + self.orphans.len(), + ); }, Err(Error::Unfit(ref msg)) => { debug!( @@ -409,17 +388,6 @@ impl Chain { sumtrees.roots() } - /// Reset the header head to the same as the main head. When sync is running, - /// the header head will go ahead to try to download as many as possible. - /// However if a block, when fully received, is found invalid, the header - /// head need to backtrack to the last known valid position. - pub fn reset_header_head(&self) -> Result<(), Error> { - let head = self.head.lock().unwrap(); - debug!(LOGGER, "Reset header head to {} at {}", - head.last_block_h, head.height); - self.store.save_header_head(&head).map_err(From::from) - } - /// returns the last n nodes inserted into the utxo sum tree /// returns sum tree hash plus output itself (as the sum is contained /// in the output anyhow) @@ -451,6 +419,13 @@ impl Chain { self.head.lock().unwrap().clone().total_difficulty } + /// Reset header_head and sync_head to head of current body chain + pub fn reset_head(&self) -> Result<(), Error> { + self.store + .reset_head() + .map_err(|e| Error::StoreErr(e, "chain reset_head".to_owned())) + } + /// Get the tip that's also the head of the chain pub fn head(&self) -> Result { Ok(self.head.lock().unwrap().clone()) diff --git a/chain/src/pipe.rs b/chain/src/pipe.rs index da217dd28..dc652c6f0 100644 --- a/chain/src/pipe.rs +++ b/chain/src/pipe.rs @@ -93,7 +93,7 @@ pub fn process_block(b: &Block, mut ctx: BlockContext) -> Result, Er validate_block(b, &mut ctx, &mut extension)?; debug!( LOGGER, - "pipe: proces_block {} at {} is valid, save and append.", + "pipe: process_block {} at {} is valid, save and append.", b.hash(), b.header.height, ); diff --git a/chain/src/store.rs b/chain/src/store.rs index 8c17c5837..093479999 100644 --- a/chain/src/store.rs +++ b/chain/src/store.rs @@ -89,6 +89,13 @@ impl ChainStore for ChainKVStore { self.db.put_ser(&vec![SYNC_HEAD_PREFIX], t) } + // Reset both header_head and sync_head to the current head of the body chain + fn reset_head(&self) -> Result<(), Error> { + let tip = self.head()?; + self.save_header_head(&tip)?; + self.save_sync_head(&tip) + } + fn get_block(&self, h: &Hash) -> Result { option_to_not_found(self.db.get_ser(&to_key(BLOCK_PREFIX, &mut h.to_vec()))) } diff --git a/chain/src/types.rs b/chain/src/types.rs index 15c08a068..a9cfab279 100644 --- a/chain/src/types.rs +++ b/chain/src/types.rs @@ -223,6 +223,9 @@ pub trait ChainStore: Send + Sync { /// Save the provided tip as the current head of the sync header chain fn save_sync_head(&self, t: &Tip) -> Result<(), store::Error>; + /// Reset header_head and sync_head to head of current body chain + fn reset_head(&self) -> Result<(), store::Error>; + /// Gets the block header at the provided height fn get_header_by_height(&self, height: u64) -> Result; diff --git a/grin/src/adapters.rs b/grin/src/adapters.rs index 8f771007b..4cf763333 100644 --- a/grin/src/adapters.rs +++ b/grin/src/adapters.rs @@ -79,11 +79,8 @@ impl p2p::ChainAdapter for NetToChainAdapter { if let &Err(ref e) = &res { debug!(LOGGER, "Block {} refused by chain: {:?}", bhash, e); if e.is_bad_block() { - // header chain should be consistent with the sync head here - // we just banned the peer that sent a bad block so - // sync head should resolve itself if/when we find an alternative peer - // with more work than ourselves - // we should not need to reset the header head here + debug!(LOGGER, "block_received: {} is a bad block, resetting head", bhash); + let _ = self.chain.reset_head(); return false; } } diff --git a/grin/src/sync.rs b/grin/src/sync.rs index ec7c79a1f..ce0a1b400 100644 --- a/grin/src/sync.rs +++ b/grin/src/sync.rs @@ -110,9 +110,9 @@ fn body_sync(peers: Peers, chain: Arc) { } hashes.reverse(); - // if we have 5 most_work_peers then ask for 50 blocks total (peer_count * 10) - // max will be 80 if all 8 peers are advertising most_work - let peer_count = cmp::min(peers.most_work_peers().len(), 10); + // if we have 5 peers to sync from then ask for 50 blocks total (peer_count * 10) + // max will be 80 if all 8 peers are advertising more work + let peer_count = cmp::min(peers.more_work_peers().len(), 10); let block_count = peer_count * 10; let hashes_to_get = hashes @@ -137,7 +137,8 @@ fn body_sync(peers: Peers, chain: Arc) { ); for hash in hashes_to_get.clone() { - let peer = peers.most_work_peer(); + // TODO - Is there a threshold where we sync from most_work_peer (not more_work_peer)? + let peer = peers.more_work_peer(); if let Some(peer) = peer { if let Ok(peer) = peer.try_read() { let _ = peer.send_block_request(hash); @@ -209,6 +210,7 @@ pub fn needs_syncing( if peer.info.total_difficulty <= local_diff { info!(LOGGER, "synchronize stopped, at {:?} @ {:?}", local_diff, chain.head().unwrap().height); currently_syncing.store(false, Ordering::Relaxed); + let _ = chain.reset_head(); } } } else { diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index e26964935..5bb162a07 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -90,7 +90,42 @@ impl Peers { self.connected_peers().len() as u32 } - /// Return vec of all peers that currently have the most worked branch, + // Return vec of connected peers that currently advertise more work (total_difficulty) + // than we do. + pub fn more_work_peers(&self) -> Vec>> { + let peers = self.connected_peers(); + if peers.len() == 0 { + return vec![]; + } + + let total_difficulty = self.total_difficulty(); + + let mut max_peers = peers + .iter() + .filter(|x| { + match x.try_read() { + Ok(peer) => { + peer.info.total_difficulty > total_difficulty + }, + Err(_) => false, + } + }) + .cloned() + .collect::>(); + + thread_rng().shuffle(&mut max_peers); + max_peers + } + + /// Returns single random peer with more work than us. + pub fn more_work_peer(&self) -> Option>> { + match self.more_work_peers().first() { + Some(x) => Some(x.clone()), + None => None + } + } + + /// Return vec of connected peers that currently have the most worked branch, /// showing the highest total difficulty. pub fn most_work_peers(&self) -> Vec>> { let peers = self.connected_peers(); @@ -135,12 +170,6 @@ impl Peers { } } - /// Returns a random connected peer. - // pub fn random_peer(&self) -> Option>> { - // let peers = self.connected_peers(); - // Some(thread_rng().choose(&peers).unwrap().clone()) - // } - pub fn is_banned(&self, peer_addr: SocketAddr) -> bool { if let Ok(peer_data) = self.store.get_peer(peer_addr) { if peer_data.flags == State::Banned {