diff --git a/chain/src/chain.rs b/chain/src/chain.rs index 03914ddff..a48a5bf38 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -15,6 +15,7 @@ //! Facade and handler for the rest of the blockchain implementation //! and mostly the chain pipeline. +use std::collections::VecDeque; use std::sync::{Arc, Mutex}; use secp::pedersen::Commitment; @@ -29,7 +30,7 @@ use pipe; use store; use types::*; - +const MAX_ORPHANS: usize = 20; /// Helper macro to transform a Result into an Option with None in case /// of error @@ -49,8 +50,11 @@ macro_rules! none_err { pub struct Chain { store: Arc, adapter: Arc, + head: Arc>, block_process_lock: Arc>, + orphans: Arc>>, + test_mode: bool, } @@ -101,26 +105,37 @@ impl Chain { adapter: adapter, head: Arc::new(Mutex::new(head)), block_process_lock: Arc::new(Mutex::new(true)), + orphans: Arc::new(Mutex::new(VecDeque::with_capacity(MAX_ORPHANS+1))), test_mode: test_mode, }) } /// Attempt to add a new block to the chain. Returns the new chain tip if it /// has been added to the longest chain, None if it's added to an (as of - /// now) - /// orphan chain. - pub fn process_block(&self, b: &Block, opts: Options) -> Result, Error> { + /// now) orphan chain. + pub fn process_block(&self, b: Block, opts: Options) -> Result, Error> { let head = self.store.head().map_err(&Error::StoreErr)?; let ctx = self.ctx_from_head(head, opts); - let res = pipe::process_block(b, ctx); + let res = pipe::process_block(&b, ctx); - if let Ok(Some(ref tip)) = res { - let chain_head = self.head.clone(); - let mut head = chain_head.lock().unwrap(); - *head = tip.clone(); - } + match res { + Ok(Some(ref tip)) => { + // block got accepted and extended the head, updating our head + let chain_head = self.head.clone(); + let mut head = chain_head.lock().unwrap(); + *head = tip.clone(); + + self.check_orphans(); + } + Err(Error::Orphan) => { + let mut orphans = self.orphans.lock().unwrap(); + orphans.push_front(b); + orphans.truncate(MAX_ORPHANS); + } + _ => {} + } res } @@ -152,6 +167,33 @@ impl Chain { } } + /// Pop orphans out of the queue and check if we can now accept them. + fn check_orphans(&self) { + // first check how many we have to retry, unfort. we can't extend the lock + // in the loop as it needs to be freed before going in process_block + let mut orphan_count = 0; + { + let orphans = self.orphans.lock().unwrap(); + orphan_count = orphans.len(); + } + + // pop each orphan and retry, if still orphaned, will be pushed again + let mut opts = NONE; + if self.test_mode { + opts = opts | EASY_POW; + } + for _ in 0..orphan_count { + let mut popped = None; + { + let mut orphans = self.orphans.lock().unwrap(); + popped = orphans.pop_back(); + } + if let Some(orphan) = popped { + self.process_block(orphan, opts); + } + } + } + /// Gets an unspent output from its commitment. With return None if the /// output /// doesn't exist or has been spent. This querying is done in a way that's diff --git a/chain/src/pipe.rs b/chain/src/pipe.rs index 1ec2d0ab6..64c2fed37 100644 --- a/chain/src/pipe.rs +++ b/chain/src/pipe.rs @@ -103,8 +103,7 @@ fn check_known(bh: Hash, ctx: &mut BlockContext) -> Result<(), Error> { /// TODO require only the block header (with length information) fn validate_header(header: &BlockHeader, ctx: &mut BlockContext) -> Result<(), Error> { if header.height > ctx.head.height + 1 { - // TODO actually handle orphans and add them to a size-limited set - return Err(Error::Unfit("orphan".to_string())); + return Err(Error::Orphan); } let prev = try!(ctx.store.get_block_header(&header.previous).map_err(&Error::StoreErr)); diff --git a/chain/src/types.rs b/chain/src/types.rs index 3daf6af1e..a19e235f9 100644 --- a/chain/src/types.rs +++ b/chain/src/types.rs @@ -41,6 +41,8 @@ bitflags! { pub enum Error { /// The block doesn't fit anywhere in our chain Unfit(String), + /// Special case of orphan blocks + Orphan, /// Difficulty is too low either compared to ours or the block PoW hash DifficultyTooLow, /// Addition of difficulties on all previous block is wrong diff --git a/chain/tests/mine_simple_chain.rs b/chain/tests/mine_simple_chain.rs index afe795683..5d28cb22f 100644 --- a/chain/tests/mine_simple_chain.rs +++ b/chain/tests/mine_simple_chain.rs @@ -68,13 +68,20 @@ fn mine_empty_chain() { let difficulty = consensus::next_difficulty(chain.difficulty_iter()).unwrap(); b.header.difficulty = difficulty.clone(); - pow::pow_size(&mut cuckoo_miner, &mut b.header, difficulty, consensus::TEST_SIZESHIFT as u32).unwrap(); - chain.process_block(&b, grin_chain::EASY_POW).unwrap(); + pow::pow_size( + &mut cuckoo_miner, + &mut b.header, + difficulty, + consensus::TEST_SIZESHIFT as u32, + ).unwrap(); + + let bhash = b.hash(); + chain.process_block(b, grin_chain::EASY_POW).unwrap(); // checking our new head let head = chain.head().unwrap(); assert_eq!(head.height, n); - assert_eq!(head.last_block_h, b.hash()); + assert_eq!(head.last_block_h, bhash); } } @@ -93,26 +100,28 @@ fn mine_forks() { let mut b = core::Block::new(&prev, vec![], reward_key).unwrap(); b.header.timestamp = prev.timestamp + time::Duration::seconds(60); b.header.total_difficulty = Difficulty::from_num(2*n); - chain.process_block(&b, grin_chain::SKIP_POW).unwrap(); + let bhash = b.hash(); + chain.process_block(b, grin_chain::SKIP_POW).unwrap(); // checking our new head thread::sleep(::std::time::Duration::from_millis(50)); let head = chain.head().unwrap(); assert_eq!(head.height, n as u64); - assert_eq!(head.last_block_h, b.hash()); + assert_eq!(head.last_block_h, bhash); assert_eq!(head.prev_block_h, prev.hash()); // build another block with higher difficulty let mut b = core::Block::new(&prev, vec![], reward_key).unwrap(); b.header.timestamp = prev.timestamp + time::Duration::seconds(60); b.header.total_difficulty = Difficulty::from_num(2*n+1); - chain.process_block(&b, grin_chain::SKIP_POW).unwrap(); + let bhash = b.hash(); + chain.process_block(b, grin_chain::SKIP_POW).unwrap(); // checking head switch thread::sleep(::std::time::Duration::from_millis(50)); let head = chain.head().unwrap(); assert_eq!(head.height, n as u64); - assert_eq!(head.last_block_h, b.hash()); + assert_eq!(head.last_block_h, bhash); assert_eq!(head.prev_block_h, prev.hash()); } } diff --git a/grin/src/adapters.rs b/grin/src/adapters.rs index 7869f9193..8a614da57 100644 --- a/grin/src/adapters.rs +++ b/grin/src/adapters.rs @@ -56,18 +56,18 @@ impl NetAdapter for NetToChainAdapter { } fn block_received(&self, b: core::Block) { - debug!("Received block {} from network, going to process.", - b.hash()); + let bhash = b.hash(); + debug!("Received block {} from network, going to process.", bhash); // pushing the new block through the chain pipeline - let res = self.chain.process_block(&b, self.chain_opts()); + let res = self.chain.process_block(b, self.chain_opts()); if let Err(e) = res { - debug!("Block {} refused by chain: {:?}", b.hash(), e); + debug!("Block {} refused by chain: {:?}", bhash, e); } if self.syncer.borrow().syncing() { - self.syncer.borrow().block_received(b.hash()); + self.syncer.borrow().block_received(bhash); } } diff --git a/grin/src/miner.rs b/grin/src/miner.rs index 0feafbd91..9078fa215 100644 --- a/grin/src/miner.rs +++ b/grin/src/miner.rs @@ -139,7 +139,7 @@ impl Miner { } else { chain::NONE }; - let res = self.chain.process_block(&b, opts); + let res = self.chain.process_block(b, opts); if let Err(e) = res { error!("(Server ID: {}) Error validating mined block: {:?}", self.debug_output_id, e);