Chain handles orphaned blocks and retries them

When a new block is rejected by the validation pipeline as orphan,
the chain now keeps it in a ring buffer of capped size. Anytime
a new block is accepted, all orphans in the ring buffer are
retried. This is helpful to handle all the cases where we receive
blocks out of order for networking/timing reasons.
This commit is contained in:
Ignotus Peverell 2017-07-27 19:08:48 +00:00
parent f0044c631f
commit 3c3b12b13a
No known key found for this signature in database
GPG key ID: 99CD25F39F8F8211
6 changed files with 77 additions and 25 deletions

View file

@ -15,6 +15,7 @@
//! Facade and handler for the rest of the blockchain implementation
//! and mostly the chain pipeline.
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use secp::pedersen::Commitment;
@ -29,7 +30,7 @@ use pipe;
use store;
use types::*;
const MAX_ORPHANS: usize = 20;
/// Helper macro to transform a Result into an Option with None in case
/// of error
@ -49,8 +50,11 @@ macro_rules! none_err {
pub struct Chain {
store: Arc<ChainStore>,
adapter: Arc<ChainAdapter>,
head: Arc<Mutex<Tip>>,
block_process_lock: Arc<Mutex<bool>>,
orphans: Arc<Mutex<VecDeque<Block>>>,
test_mode: bool,
}
@ -101,26 +105,37 @@ impl Chain {
adapter: adapter,
head: Arc::new(Mutex::new(head)),
block_process_lock: Arc::new(Mutex::new(true)),
orphans: Arc::new(Mutex::new(VecDeque::with_capacity(MAX_ORPHANS+1))),
test_mode: test_mode,
})
}
/// Attempt to add a new block to the chain. Returns the new chain tip if it
/// has been added to the longest chain, None if it's added to an (as of
/// now)
/// orphan chain.
pub fn process_block(&self, b: &Block, opts: Options) -> Result<Option<Tip>, Error> {
/// now) orphan chain.
pub fn process_block(&self, b: Block, opts: Options) -> Result<Option<Tip>, Error> {
let head = self.store.head().map_err(&Error::StoreErr)?;
let ctx = self.ctx_from_head(head, opts);
let res = pipe::process_block(b, ctx);
let res = pipe::process_block(&b, ctx);
if let Ok(Some(ref tip)) = res {
let chain_head = self.head.clone();
let mut head = chain_head.lock().unwrap();
*head = tip.clone();
}
match res {
Ok(Some(ref tip)) => {
// block got accepted and extended the head, updating our head
let chain_head = self.head.clone();
let mut head = chain_head.lock().unwrap();
*head = tip.clone();
self.check_orphans();
}
Err(Error::Orphan) => {
let mut orphans = self.orphans.lock().unwrap();
orphans.push_front(b);
orphans.truncate(MAX_ORPHANS);
}
_ => {}
}
res
}
@ -152,6 +167,33 @@ impl Chain {
}
}
/// Pop orphans out of the queue and check if we can now accept them.
fn check_orphans(&self) {
// first check how many we have to retry, unfort. we can't extend the lock
// in the loop as it needs to be freed before going in process_block
let mut orphan_count = 0;
{
let orphans = self.orphans.lock().unwrap();
orphan_count = orphans.len();
}
// pop each orphan and retry, if still orphaned, will be pushed again
let mut opts = NONE;
if self.test_mode {
opts = opts | EASY_POW;
}
for _ in 0..orphan_count {
let mut popped = None;
{
let mut orphans = self.orphans.lock().unwrap();
popped = orphans.pop_back();
}
if let Some(orphan) = popped {
self.process_block(orphan, opts);
}
}
}
/// Gets an unspent output from its commitment. With return None if the
/// output
/// doesn't exist or has been spent. This querying is done in a way that's

View file

@ -103,8 +103,7 @@ fn check_known(bh: Hash, ctx: &mut BlockContext) -> Result<(), Error> {
/// TODO require only the block header (with length information)
fn validate_header(header: &BlockHeader, ctx: &mut BlockContext) -> Result<(), Error> {
if header.height > ctx.head.height + 1 {
// TODO actually handle orphans and add them to a size-limited set
return Err(Error::Unfit("orphan".to_string()));
return Err(Error::Orphan);
}
let prev = try!(ctx.store.get_block_header(&header.previous).map_err(&Error::StoreErr));

View file

@ -41,6 +41,8 @@ bitflags! {
pub enum Error {
/// The block doesn't fit anywhere in our chain
Unfit(String),
/// Special case of orphan blocks
Orphan,
/// Difficulty is too low either compared to ours or the block PoW hash
DifficultyTooLow,
/// Addition of difficulties on all previous block is wrong

View file

@ -68,13 +68,20 @@ fn mine_empty_chain() {
let difficulty = consensus::next_difficulty(chain.difficulty_iter()).unwrap();
b.header.difficulty = difficulty.clone();
pow::pow_size(&mut cuckoo_miner, &mut b.header, difficulty, consensus::TEST_SIZESHIFT as u32).unwrap();
chain.process_block(&b, grin_chain::EASY_POW).unwrap();
pow::pow_size(
&mut cuckoo_miner,
&mut b.header,
difficulty,
consensus::TEST_SIZESHIFT as u32,
).unwrap();
let bhash = b.hash();
chain.process_block(b, grin_chain::EASY_POW).unwrap();
// checking our new head
let head = chain.head().unwrap();
assert_eq!(head.height, n);
assert_eq!(head.last_block_h, b.hash());
assert_eq!(head.last_block_h, bhash);
}
}
@ -93,26 +100,28 @@ fn mine_forks() {
let mut b = core::Block::new(&prev, vec![], reward_key).unwrap();
b.header.timestamp = prev.timestamp + time::Duration::seconds(60);
b.header.total_difficulty = Difficulty::from_num(2*n);
chain.process_block(&b, grin_chain::SKIP_POW).unwrap();
let bhash = b.hash();
chain.process_block(b, grin_chain::SKIP_POW).unwrap();
// checking our new head
thread::sleep(::std::time::Duration::from_millis(50));
let head = chain.head().unwrap();
assert_eq!(head.height, n as u64);
assert_eq!(head.last_block_h, b.hash());
assert_eq!(head.last_block_h, bhash);
assert_eq!(head.prev_block_h, prev.hash());
// build another block with higher difficulty
let mut b = core::Block::new(&prev, vec![], reward_key).unwrap();
b.header.timestamp = prev.timestamp + time::Duration::seconds(60);
b.header.total_difficulty = Difficulty::from_num(2*n+1);
chain.process_block(&b, grin_chain::SKIP_POW).unwrap();
let bhash = b.hash();
chain.process_block(b, grin_chain::SKIP_POW).unwrap();
// checking head switch
thread::sleep(::std::time::Duration::from_millis(50));
let head = chain.head().unwrap();
assert_eq!(head.height, n as u64);
assert_eq!(head.last_block_h, b.hash());
assert_eq!(head.last_block_h, bhash);
assert_eq!(head.prev_block_h, prev.hash());
}
}

View file

@ -56,18 +56,18 @@ impl NetAdapter for NetToChainAdapter {
}
fn block_received(&self, b: core::Block) {
debug!("Received block {} from network, going to process.",
b.hash());
let bhash = b.hash();
debug!("Received block {} from network, going to process.", bhash);
// pushing the new block through the chain pipeline
let res = self.chain.process_block(&b, self.chain_opts());
let res = self.chain.process_block(b, self.chain_opts());
if let Err(e) = res {
debug!("Block {} refused by chain: {:?}", b.hash(), e);
debug!("Block {} refused by chain: {:?}", bhash, e);
}
if self.syncer.borrow().syncing() {
self.syncer.borrow().block_received(b.hash());
self.syncer.borrow().block_received(bhash);
}
}

View file

@ -139,7 +139,7 @@ impl Miner {
} else {
chain::NONE
};
let res = self.chain.process_block(&b, opts);
let res = self.chain.process_block(b, opts);
if let Err(e) = res {
error!("(Server ID: {}) Error validating mined block: {:?}",
self.debug_output_id, e);