reset header_head and sync_head on peer ban (#551) (#553)

handle orphan blocks based on age (not fixed count)
This commit is contained in:
AntiochP 2017-12-28 19:49:27 -05:00 committed by GitHub
parent 2057525251
commit 30c20294f5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 89 additions and 76 deletions

View file

@ -17,6 +17,7 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant};
use util::secp::pedersen::{Commitment, RangeProof}; use util::secp::pedersen::{Commitment, RangeProof};
@ -33,12 +34,13 @@ use sumtree;
use types::*; use types::*;
use util::LOGGER; use util::LOGGER;
const MAX_ORPHANS: usize = 100; const MAX_ORPHAN_AGE_SECS: u64 = 30;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct Orphan { struct Orphan {
block: Block, block: Block,
opts: Options, opts: Options,
added: Instant,
} }
struct OrphanBlockPool { struct OrphanBlockPool {
@ -70,18 +72,11 @@ impl OrphanBlockPool {
prev_idx.insert(orphan.block.header.previous, orphan.block.hash()); prev_idx.insert(orphan.block.header.previous, orphan.block.hash());
} }
if self.len() > MAX_ORPHANS { {
let max = { let mut orphans = self.orphans.write().unwrap();
let orphans = self.orphans.read().unwrap(); let mut prev_idx = self.prev_idx.write().unwrap();
orphans.values().max_by_key(|x| x.block.header.height).cloned() orphans.retain(|_, ref mut x| x.added.elapsed() < Duration::from_secs(MAX_ORPHAN_AGE_SECS));
}; prev_idx.retain(|_, &mut x| orphans.contains_key(&x));
if let Some(x) = max {
let mut orphans = self.orphans.write().unwrap();
let mut prev_idx = self.prev_idx.write().unwrap();
orphans.remove(&x.block.hash());
prev_idx.remove(&x.block.header.previous);
}
} }
} }
@ -176,13 +171,9 @@ impl Chain {
Err(e) => return Err(Error::StoreErr(e, "chain init load head".to_owned())), Err(e) => return Err(Error::StoreErr(e, "chain init load head".to_owned())),
}; };
// Make sure we have a sync_head available for later use. // Reset sync_head and header_head to head of current chain.
// We may have been tracking an invalid chain on a now banned peer // Make sure sync_head is available for later use when needed.
// so we want to reset the both sync_head and header_head on restart to handle this. chain_store.reset_head()?;
// TODO - handle sync_head/header_head and peer banning in a more effective way.
let tip = chain_store.head().unwrap();
chain_store.save_header_head(&tip)?;
chain_store.save_sync_head(&tip)?;
info!( info!(
LOGGER, LOGGER,
@ -212,7 +203,6 @@ impl Chain {
let head = self.store let head = self.store
.head() .head()
.map_err(|e| Error::StoreErr(e, "chain load head".to_owned()))?; .map_err(|e| Error::StoreErr(e, "chain load head".to_owned()))?;
let height = head.height;
let ctx = self.ctx_from_head(head, opts); let ctx = self.ctx_from_head(head, opts);
let res = pipe::process_block(&b, ctx); let res = pipe::process_block(&b, ctx);
@ -254,36 +244,25 @@ impl Chain {
self.check_orphans(&b); self.check_orphans(&b);
}, },
Err(Error::Orphan) => { Err(Error::Orphan) => {
// TODO - Do we want to check that orphan height is > current height?
// TODO - Just check heights here? Or should we be checking total_difficulty as well?
let block_hash = b.hash(); let block_hash = b.hash();
if b.header.height < height + (MAX_ORPHANS as u64) { let orphan = Orphan {
let orphan = Orphan { block: b.clone(),
block: b.clone(), opts: opts,
opts: opts, added: Instant::now(),
}; };
// In the case of a fork - it is possible to have multiple blocks // In the case of a fork - it is possible to have multiple blocks
// that are children of a given block. // that are children of a given block.
// We do not handle this currently for orphans (future enhancement?). // We do not handle this currently for orphans (future enhancement?).
// We just assume "last one wins" for now. // We just assume "last one wins" for now.
&self.orphans.add(orphan); &self.orphans.add(orphan);
debug!( debug!(
LOGGER, LOGGER,
"process_block: orphan: {:?}, # orphans {}", "process_block: orphan: {:?}, # orphans {}",
block_hash, block_hash,
self.orphans.len(), self.orphans.len(),
); );
} else {
debug!(
LOGGER,
"process_block: orphan: {:?}, (dropping, height {} vs {})",
block_hash,
b.header.height,
height,
);
}
}, },
Err(Error::Unfit(ref msg)) => { Err(Error::Unfit(ref msg)) => {
debug!( debug!(
@ -409,17 +388,6 @@ impl Chain {
sumtrees.roots() sumtrees.roots()
} }
/// Reset the header head to the same as the main head. When sync is running,
/// the header head will go ahead to try to download as many as possible.
/// However if a block, when fully received, is found invalid, the header
/// head need to backtrack to the last known valid position.
pub fn reset_header_head(&self) -> Result<(), Error> {
let head = self.head.lock().unwrap();
debug!(LOGGER, "Reset header head to {} at {}",
head.last_block_h, head.height);
self.store.save_header_head(&head).map_err(From::from)
}
/// returns the last n nodes inserted into the utxo sum tree /// returns the last n nodes inserted into the utxo sum tree
/// returns sum tree hash plus output itself (as the sum is contained /// returns sum tree hash plus output itself (as the sum is contained
/// in the output anyhow) /// in the output anyhow)
@ -451,6 +419,13 @@ impl Chain {
self.head.lock().unwrap().clone().total_difficulty self.head.lock().unwrap().clone().total_difficulty
} }
/// Reset header_head and sync_head to head of current body chain
pub fn reset_head(&self) -> Result<(), Error> {
self.store
.reset_head()
.map_err(|e| Error::StoreErr(e, "chain reset_head".to_owned()))
}
/// Get the tip that's also the head of the chain /// Get the tip that's also the head of the chain
pub fn head(&self) -> Result<Tip, Error> { pub fn head(&self) -> Result<Tip, Error> {
Ok(self.head.lock().unwrap().clone()) Ok(self.head.lock().unwrap().clone())

View file

@ -93,7 +93,7 @@ pub fn process_block(b: &Block, mut ctx: BlockContext) -> Result<Option<Tip>, Er
validate_block(b, &mut ctx, &mut extension)?; validate_block(b, &mut ctx, &mut extension)?;
debug!( debug!(
LOGGER, LOGGER,
"pipe: proces_block {} at {} is valid, save and append.", "pipe: process_block {} at {} is valid, save and append.",
b.hash(), b.hash(),
b.header.height, b.header.height,
); );

View file

@ -89,6 +89,13 @@ impl ChainStore for ChainKVStore {
self.db.put_ser(&vec![SYNC_HEAD_PREFIX], t) self.db.put_ser(&vec![SYNC_HEAD_PREFIX], t)
} }
// Reset both header_head and sync_head to the current head of the body chain
fn reset_head(&self) -> Result<(), Error> {
let tip = self.head()?;
self.save_header_head(&tip)?;
self.save_sync_head(&tip)
}
fn get_block(&self, h: &Hash) -> Result<Block, Error> { fn get_block(&self, h: &Hash) -> Result<Block, Error> {
option_to_not_found(self.db.get_ser(&to_key(BLOCK_PREFIX, &mut h.to_vec()))) option_to_not_found(self.db.get_ser(&to_key(BLOCK_PREFIX, &mut h.to_vec())))
} }

View file

@ -223,6 +223,9 @@ pub trait ChainStore: Send + Sync {
/// Save the provided tip as the current head of the sync header chain /// Save the provided tip as the current head of the sync header chain
fn save_sync_head(&self, t: &Tip) -> Result<(), store::Error>; fn save_sync_head(&self, t: &Tip) -> Result<(), store::Error>;
/// Reset header_head and sync_head to head of current body chain
fn reset_head(&self) -> Result<(), store::Error>;
/// Gets the block header at the provided height /// Gets the block header at the provided height
fn get_header_by_height(&self, height: u64) -> Result<BlockHeader, store::Error>; fn get_header_by_height(&self, height: u64) -> Result<BlockHeader, store::Error>;

View file

@ -79,11 +79,8 @@ impl p2p::ChainAdapter for NetToChainAdapter {
if let &Err(ref e) = &res { if let &Err(ref e) = &res {
debug!(LOGGER, "Block {} refused by chain: {:?}", bhash, e); debug!(LOGGER, "Block {} refused by chain: {:?}", bhash, e);
if e.is_bad_block() { if e.is_bad_block() {
// header chain should be consistent with the sync head here debug!(LOGGER, "block_received: {} is a bad block, resetting head", bhash);
// we just banned the peer that sent a bad block so let _ = self.chain.reset_head();
// sync head should resolve itself if/when we find an alternative peer
// with more work than ourselves
// we should not need to reset the header head here
return false; return false;
} }
} }

View file

@ -110,9 +110,9 @@ fn body_sync(peers: Peers, chain: Arc<chain::Chain>) {
} }
hashes.reverse(); hashes.reverse();
// if we have 5 most_work_peers then ask for 50 blocks total (peer_count * 10) // if we have 5 peers to sync from then ask for 50 blocks total (peer_count * 10)
// max will be 80 if all 8 peers are advertising most_work // max will be 80 if all 8 peers are advertising more work
let peer_count = cmp::min(peers.most_work_peers().len(), 10); let peer_count = cmp::min(peers.more_work_peers().len(), 10);
let block_count = peer_count * 10; let block_count = peer_count * 10;
let hashes_to_get = hashes let hashes_to_get = hashes
@ -137,7 +137,8 @@ fn body_sync(peers: Peers, chain: Arc<chain::Chain>) {
); );
for hash in hashes_to_get.clone() { for hash in hashes_to_get.clone() {
let peer = peers.most_work_peer(); // TODO - Is there a threshold where we sync from most_work_peer (not more_work_peer)?
let peer = peers.more_work_peer();
if let Some(peer) = peer { if let Some(peer) = peer {
if let Ok(peer) = peer.try_read() { if let Ok(peer) = peer.try_read() {
let _ = peer.send_block_request(hash); let _ = peer.send_block_request(hash);
@ -209,6 +210,7 @@ pub fn needs_syncing(
if peer.info.total_difficulty <= local_diff { if peer.info.total_difficulty <= local_diff {
info!(LOGGER, "synchronize stopped, at {:?} @ {:?}", local_diff, chain.head().unwrap().height); info!(LOGGER, "synchronize stopped, at {:?} @ {:?}", local_diff, chain.head().unwrap().height);
currently_syncing.store(false, Ordering::Relaxed); currently_syncing.store(false, Ordering::Relaxed);
let _ = chain.reset_head();
} }
} }
} else { } else {

View file

@ -90,7 +90,42 @@ impl Peers {
self.connected_peers().len() as u32 self.connected_peers().len() as u32
} }
/// Return vec of all peers that currently have the most worked branch, // Return vec of connected peers that currently advertise more work (total_difficulty)
// than we do.
pub fn more_work_peers(&self) -> Vec<Arc<RwLock<Peer>>> {
let peers = self.connected_peers();
if peers.len() == 0 {
return vec![];
}
let total_difficulty = self.total_difficulty();
let mut max_peers = peers
.iter()
.filter(|x| {
match x.try_read() {
Ok(peer) => {
peer.info.total_difficulty > total_difficulty
},
Err(_) => false,
}
})
.cloned()
.collect::<Vec<_>>();
thread_rng().shuffle(&mut max_peers);
max_peers
}
/// Returns single random peer with more work than us.
pub fn more_work_peer(&self) -> Option<Arc<RwLock<Peer>>> {
match self.more_work_peers().first() {
Some(x) => Some(x.clone()),
None => None
}
}
/// Return vec of connected peers that currently have the most worked branch,
/// showing the highest total difficulty. /// showing the highest total difficulty.
pub fn most_work_peers(&self) -> Vec<Arc<RwLock<Peer>>> { pub fn most_work_peers(&self) -> Vec<Arc<RwLock<Peer>>> {
let peers = self.connected_peers(); let peers = self.connected_peers();
@ -135,12 +170,6 @@ impl Peers {
} }
} }
/// Returns a random connected peer.
// pub fn random_peer(&self) -> Option<Arc<RwLock<Peer>>> {
// let peers = self.connected_peers();
// Some(thread_rng().choose(&peers).unwrap().clone())
// }
pub fn is_banned(&self, peer_addr: SocketAddr) -> bool { pub fn is_banned(&self, peer_addr: SocketAddr) -> bool {
if let Ok(peer_data) = self.store.get_peer(peer_addr) { if let Ok(peer_data) = self.store.get_peer(peer_addr) {
if peer_data.flags == State::Banned { if peer_data.flags == State::Banned {