From bcf41438dcd40580c1d865691967710f31d60702 Mon Sep 17 00:00:00 2001 From: Antioch Peverell Date: Fri, 5 Oct 2018 08:29:33 +0100 Subject: [PATCH] Simplify chain, remove head. (#1657) * Go back to db for head and header_head * Use batch consistently for db access * Pass active txhashset and batch in the ctx * Only update head in db if total work increases * Only updated header_head if total work (on header chain) increases --- chain/src/chain.rs | 211 ++++++++++++---------------- chain/src/pipe.rs | 250 ++++++++++++++------------------- chain/src/store.rs | 10 +- servers/src/common/adapters.rs | 8 +- servers/src/grin/server.rs | 2 +- servers/src/grin/sync.rs | 10 +- src/bin/tui/status.rs | 22 ++- 7 files changed, 232 insertions(+), 281 deletions(-) diff --git a/chain/src/chain.rs b/chain/src/chain.rs index fedc2d469..d8fb5493f 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -18,7 +18,7 @@ use std::collections::HashMap; use std::fs::File; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; use lmdb; @@ -29,12 +29,11 @@ use core::core::merkle_proof::MerkleProof; use core::core::verifier_cache::VerifierCache; use core::core::{Block, BlockHeader, BlockSums, Output, OutputIdentifier, Transaction, TxKernel}; use core::global; -use core::pow::{self, Difficulty}; +use core::pow; use error::{Error, ErrorKind}; use grin_store::Error::NotFoundErr; use pipe; use store; -use store::Batch; use txhashset; use types::{ChainAdapter, NoStatus, Options, Tip, TxHashsetWriteStatus}; use util::secp::pedersen::{Commitment, RangeProof}; @@ -146,8 +145,6 @@ pub struct Chain { db_root: String, store: Arc, adapter: Arc, - - head: Arc>, orphans: Arc, txhashset: Arc>, // Recently processed blocks to avoid double-processing @@ -183,9 +180,7 @@ impl Chain { setup_head(genesis, store.clone(), &mut txhashset)?; - // Now reload the chain head (either existing head or genesis from above) let head = store.head()?; - debug!( LOGGER, "Chain init: {} @ {} [{}]", @@ -198,7 +193,6 @@ impl Chain { db_root: db_root, store: store, adapter: adapter, - head: Arc::new(Mutex::new(head)), orphans: Arc::new(OrphanBlockPool::new()), txhashset: Arc::new(RwLock::new(txhashset)), pow_verifier, @@ -210,73 +204,42 @@ impl Chain { /// Processes a single block, then checks for orphans, processing /// those as well if they're found - pub fn process_block( - &self, - b: Block, - opts: Options, - ) -> Result<(Option, Option), Error> { - match self.process_block_single(b, opts) { - Ok((t, b)) => { - // We accepted a block, so see if we can accept any orphans - if let Some(ref b) = b { - self.check_orphans(b.header.height + 1); - } - Ok((t, b)) - } - Err(e) => Err(e), + pub fn process_block(&self, b: Block, opts: Options) -> Result, Error> { + let height = b.header.height; + let res = self.process_block_single(b, opts); + if res.is_ok() { + self.check_orphans(height + 1); } + res } - /// Attempt to add a new block to the chain. Returns the new chain tip if it - /// has been added to the longest chain, None if it's added to an (as of - /// now) orphan chain. - fn process_block_single( - &self, - b: Block, - opts: Options, - ) -> Result<(Option, Option), Error> { - let mut batch = self.store.batch()?; - let bhash = b.hash(); - let mut ctx = self.new_ctx(opts, &mut batch)?; + /// Attempt to add a new block to the chain. + /// Returns true if it has been added to the longest chain + /// or false if it has added to a fork (or orphan?). + fn process_block_single(&self, b: Block, opts: Options) -> Result, Error> { + let batch = self.store.batch()?; + let mut txhashset = self.txhashset.write().unwrap(); + let mut ctx = self.new_ctx(opts, batch, &mut txhashset)?; - let res = pipe::process_block(&b, &mut ctx, &mut batch); - - let add_to_hash_cache = || { + // let hash = b.hash(); + let add_to_hash_cache = |hash: Hash| { // only add to hash cache below if block is definitively accepted // or rejected let mut cache = self.block_hashes_cache.write().unwrap(); - cache.insert(bhash, true); + cache.insert(hash, true); }; - match res { - Ok(Some(ref tip)) => { - batch.commit()?; + match pipe::process_block(&b, &mut ctx) { + Ok(head) => { + // Commit the batch in the ctx to the db. + ctx.batch.commit()?; - // block got accepted and extended the head, updating our head - let chain_head = self.head.clone(); - { - let mut head = chain_head.lock().unwrap(); - *head = tip.clone(); - } - add_to_hash_cache(); + add_to_hash_cache(b.hash()); // notifying other parts of the system of the update self.adapter.block_accepted(&b, opts); - Ok((Some(tip.clone()), Some(b))) - } - Ok(None) => { - batch.commit()?; - - add_to_hash_cache(); - - // block got accepted but we did not extend the head - // so its on a fork (or is the start of a new fork) - // broadcast the block out so everyone knows about the fork - // broadcast the block - self.adapter.block_accepted(&b, opts); - - Ok((None, Some(b))) + Ok(head) } Err(e) => { match e.kind() { @@ -325,7 +288,7 @@ impl Chain { b.header.height, e ); - add_to_hash_cache(); + add_to_hash_cache(b.hash()); Err(ErrorKind::Other(format!("{:?}", e).to_owned()).into()) } } @@ -335,38 +298,45 @@ impl Chain { /// Process a block header received during "header first" propagation. pub fn process_block_header(&self, bh: &BlockHeader, opts: Options) -> Result<(), Error> { - let mut batch = self.store.batch()?; - let mut ctx = self.new_ctx(opts, &mut batch)?; - pipe::process_block_header(bh, &mut ctx, &mut batch)?; - batch.commit()?; + let batch = self.store.batch()?; + let mut txhashset = self.txhashset.write().unwrap(); + let mut ctx = self.new_ctx(opts, batch, &mut txhashset)?; + pipe::process_block_header(bh, &mut ctx)?; + ctx.batch.commit()?; Ok(()) } - /// Attempt to add a new header to the header chain. - /// This is only ever used during sync and uses sync_head. + /// Attempt to add new headers to the header chain (or fork). + /// This is only ever used during sync and is based on sync_head. + /// We update header_head here if our total work increases. pub fn sync_block_headers( &self, headers: &Vec, opts: Options, ) -> Result<(), Error> { - let mut batch = self.store.batch()?; - let mut ctx = self.new_ctx(opts, &mut batch)?; - pipe::sync_block_headers(headers, &mut ctx, &mut batch)?; - batch.commit()?; + let batch = self.store.batch()?; + let mut txhashset = self.txhashset.write().unwrap(); + let mut ctx = self.new_ctx(opts, batch, &mut txhashset)?; + + pipe::sync_block_headers(headers, &mut ctx)?; + ctx.batch.commit()?; + Ok(()) } - fn new_ctx(&self, opts: Options, batch: &mut Batch) -> Result { - let head = batch.head()?; - let header_head = batch.get_header_head()?; + fn new_ctx<'a>( + &self, + opts: Options, + batch: store::Batch<'a>, + txhashset: &'a mut txhashset::TxHashSet, + ) -> Result, Error> { Ok(pipe::BlockContext { opts, - head, - header_head, pow_verifier: self.pow_verifier, block_hashes_cache: self.block_hashes_cache.clone(), verifier_cache: self.verifier_cache.clone(), - txhashset: self.txhashset.clone(), + txhashset, + batch, orphans: self.orphans.clone(), }) } @@ -411,10 +381,11 @@ impl Chain { String::new() }, ); + let height = orphan.block.header.height; let res = self.process_block_single(orphan.block, orphan.opts); - if let Ok((_, Some(b))) = res { + if res.is_ok() { orphan_accepted = true; - height_accepted = b.header.height; + height_accepted = height; } } @@ -568,7 +539,7 @@ impl Chain { // so we can send these across as part of the zip file. // The fast sync client does *not* have the necessary data // to rewind after receiving the txhashset zip. - let header = self.store.get_block_header(&h)?; + let header = self.get_block_header(&h)?; { let mut txhashset = self.txhashset.write().unwrap(); txhashset::extending_readonly(&mut txhashset, |extension| { @@ -633,13 +604,17 @@ impl Chain { status: &TxHashsetWriteStatus, ) -> Result<(), Error> { status.on_setup(); - let head = self.head().unwrap(); - let header_head = self.get_header_head().unwrap(); - if header_head.height - head.height < global::cut_through_horizon() as u64 { - return Err(ErrorKind::InvalidTxHashSet("not needed".to_owned()).into()); + + // Initial check based on relative heights of current head and header_head. + { + let head = self.head().unwrap(); + let header_head = self.header_head().unwrap(); + if header_head.height - head.height < global::cut_through_horizon() as u64 { + return Err(ErrorKind::InvalidTxHashSet("not needed".to_owned()).into()); + } } - let header = self.store.get_block_header(&h)?; + let header = self.get_block_header(&h)?; txhashset::zip_write(self.db_root.clone(), txhashset_data, &header)?; let mut txhashset = @@ -653,7 +628,9 @@ impl Chain { LOGGER, "chain: txhashset_write: rewinding a 2nd time (writeable)" ); + let mut batch = self.store.batch()?; + txhashset::extending(&mut txhashset, &mut batch, |extension| { extension.rewind(&header)?; @@ -686,23 +663,26 @@ impl Chain { ); status.on_save(); + // Replace the chain txhashset with the newly built one. { let mut txhashset_ref = self.txhashset.write().unwrap(); *txhashset_ref = txhashset; } - // Setup new head. - let head = { - let mut head = self.head.lock().unwrap(); - *head = Tip::from_block(&header); - head.clone() - }; + + debug!( + LOGGER, + "chain: txhashset_write: replaced our txhashset with the new one" + ); + // Save the new head to the db and rebuild the header by height index. { - batch.save_body_head(&head)?; + let tip = Tip::from_block(&header); + batch.save_body_head(&tip)?; batch.save_header_height(&header)?; batch.build_by_height_index(&header, true)?; } + // Commit all the changes to the db. batch.commit()?; @@ -741,11 +721,11 @@ impl Chain { debug!(LOGGER, "Starting blockchain compaction."); // Compact the txhashset via the extension. { - let mut txhashes = self.txhashset.write().unwrap(); - txhashes.compact()?; + let mut txhashset = self.txhashset.write().unwrap(); + txhashset.compact()?; // print out useful debug info after compaction - txhashset::extending_readonly(&mut txhashes, |extension| { + txhashset::extending_readonly(&mut txhashset, |extension| { extension.dump_output_pmmr(); Ok(()) })?; @@ -771,9 +751,10 @@ impl Chain { head.height - horizon ); let mut count = 0; - let mut current = self.store.get_header_by_height(head.height - horizon - 1)?; let batch = self.store.batch()?; + let mut current = batch.get_header_by_height(head.height - horizon - 1)?; loop { + // Go to the store directly so we can handle NotFoundErr robustly. match self.store.get_block(¤t.hash()) { Ok(b) => { batch.delete_block(&b.hash())?; @@ -791,7 +772,7 @@ impl Chain { if current.height <= 1 { break; } - match self.store.get_block_header(¤t.previous) { + match batch.get_block_header(¤t.previous) { Ok(h) => current = h, Err(NotFoundErr(_)) => break, Err(e) => return Err(From::from(e)), @@ -846,21 +827,11 @@ impl Chain { Ok((outputs.0, max_index, output_vec)) } - /// Total difficulty at the head of the chain - pub fn total_difficulty(&self) -> Difficulty { - self.head.lock().unwrap().clone().total_difficulty - } - /// Orphans pool size pub fn orphans_len(&self) -> usize { self.orphans.len() } - /// Total difficulty at the head of the header chain - pub fn total_header_difficulty(&self) -> Result { - Ok(self.store.get_header_head()?.total_difficulty) - } - /// Reset header_head and sync_head to head of current body chain pub fn reset_head(&self) -> Result<(), Error> { let batch = self.store.batch()?; @@ -869,9 +840,18 @@ impl Chain { Ok(()) } - /// Get the tip that's also the head of the chain + /// Tip (head) of the block chain. pub fn head(&self) -> Result { - Ok(self.head.lock().unwrap().clone()) + self.store + .head() + .map_err(|e| ErrorKind::StoreErr(e, "chain head".to_owned()).into()) + } + + /// Tip (head) of the header chain. + pub fn header_head(&self) -> Result { + self.store + .header_head() + .map_err(|e| ErrorKind::StoreErr(e, "chain header head".to_owned()).into()) } /// Block header for the chain head @@ -918,7 +898,7 @@ impl Chain { let (_, pos) = txhashset.is_unspent(output_ref)?; let mut min = 1; let mut max = { - let h = self.head.lock().unwrap(); + let h = self.head()?; h.height }; @@ -957,19 +937,12 @@ impl Chain { .map_err(|e| ErrorKind::StoreErr(e, "chain get sync head".to_owned()).into()) } - /// Get the tip of the header chain. - pub fn get_header_head(&self) -> Result { - self.store - .get_header_head() - .map_err(|e| ErrorKind::StoreErr(e, "chain get header head".to_owned()).into()) - } - /// Builds an iterator on blocks starting from the current chain head and /// running backward. Specialized to return information pertaining to block /// difficulty calculation (timestamp and previous difficulties). pub fn difficulty_iter(&self) -> store::DifficultyIter { + let head = self.head().unwrap(); let batch = self.store.batch().unwrap(); - let head = self.head.lock().unwrap(); store::DifficultyIter::from(head.last_block_h, batch) } diff --git a/chain/src/pipe.rs b/chain/src/pipe.rs index ae6d03fff..a98943cfa 100644 --- a/chain/src/pipe.rs +++ b/chain/src/pipe.rs @@ -40,17 +40,16 @@ use failure::ResultExt; /// Contextual information required to process a new block and either reject or /// accept it. -pub struct BlockContext { +pub struct BlockContext<'a> { /// The options pub opts: Options, - /// The head - pub head: Tip, - /// The header head - pub header_head: Tip, - /// The POW verification function + /// The pow verifier to use when processing a block. pub pow_verifier: fn(&BlockHeader, u8) -> Result<(), pow::Error>, - /// MMR sum tree states - pub txhashset: Arc>, + /// The active txhashset (rewindable MMRs) to use for block processing. + pub txhashset: &'a mut txhashset::TxHashSet, + /// The active batch to use for block processing. + pub batch: store::Batch<'a>, + /// Recently processed blocks to avoid double-processing pub block_hashes_cache: Arc>>, /// The verifier cache (caching verifier for rangeproofs and kernel signatures) @@ -61,18 +60,14 @@ pub struct BlockContext { // Check if this block is the next block *immediately* // after our current chain head. -fn is_next_block(header: &BlockHeader, ctx: &mut BlockContext) -> bool { - header.previous == ctx.head.last_block_h +fn is_next_block(header: &BlockHeader, head: &Tip) -> bool { + header.previous == head.last_block_h } /// Runs the block processing pipeline, including validation and finding a -/// place for the new block in the chain. Returns the new chain head if -/// updated. -pub fn process_block( - b: &Block, - ctx: &mut BlockContext, - batch: &mut store::Batch, -) -> Result, Error> { +/// place for the new block in the chain. +/// Returns new head if chain head updated. +pub fn process_block(b: &Block, ctx: &mut BlockContext) -> Result, Error> { // TODO should just take a promise for a block with a full header so we don't // spend resources reading the full block when its header is invalid @@ -86,13 +81,6 @@ pub fn process_block( b.kernels().len(), ); - // First thing we do is take a write lock on the txhashset. - // We may receive the same block from multiple peers simultaneously. - // We want to process the first one fully to avoid redundant work - // processing the duplicates. - let txhashset = ctx.txhashset.clone(); - let mut txhashset = txhashset.write().unwrap(); - // Fast in-memory checks to avoid re-processing a block we recently processed. { // Check if we have recently processed this block (via ctx chain head). @@ -106,16 +94,11 @@ pub fn process_block( } // Header specific processing. - { - handle_block_header(&b.header, ctx, batch)?; - - // Update header_head (but only if this header increases our total known work). - // i.e. Only if this header is now the head of the current "most work" chain. - update_header_head(&b.header, ctx, batch)?; - } + handle_block_header(&b.header, ctx)?; // Check if are processing the "next" block relative to the current chain head. - if is_next_block(&b.header, ctx) { + let head = ctx.batch.head()?; + if is_next_block(&b.header, &head) { // If this is the "next" block then either - // * common case where we process blocks sequentially. // * special case where this is the first fast sync full block @@ -124,32 +107,32 @@ pub fn process_block( // Check we have *this* block in the store. // Stop if we have processed this block previously (it is in the store). // This is more expensive than the earlier check_known() as we hit the store. - check_known_store(&b.header, ctx, batch)?; + check_known_store(&b.header, ctx)?; // Check existing MMR (via rewind) to see if this block is known to us already. // This should catch old blocks before we check to see if they appear to be // orphaned due to compacting/pruning on a fast-sync node. // This is more expensive than check_known_store() as we rewind the txhashset. // But we only incur the cost of the rewind if this is an earlier block on the same chain. - check_known_mmr(&b.header, ctx, batch, &mut txhashset)?; + check_known_mmr(&b.header, ctx)?; // At this point it looks like this is a new block that we have not yet processed. // Check we have the *previous* block in the store. // If we do not then treat this block as an orphan. - check_prev_store(&b.header, batch)?; + check_prev_store(&b.header, &mut ctx.batch)?; } // Validate the block itself, make sure it is internally consistent. // Use the verifier_cache for verifying rangeproofs and kernel signatures. - validate_block(b, batch, ctx.verifier_cache.clone())?; + validate_block(b, ctx)?; // Start a chain extension unit of work dependent on the success of the // internal validation and saving operations - txhashset::extending(&mut txhashset, batch, |mut extension| { + txhashset::extending(&mut ctx.txhashset, &mut ctx.batch, |mut extension| { // First we rewind the txhashset extension if necessary // to put it into a consistent state for validating the block. // We can skip this step if the previous header is the latest header we saw. - if is_next_block(&b.header, ctx) { + if is_next_block(&b.header, &head) { // No need to rewind if we are processing the next block. } else { // Rewind the re-apply blocks on the forked chain to @@ -180,7 +163,8 @@ pub fn process_block( // If applying this block does not increase the work on the chain then // we know we have not yet updated the chain to produce a new chain head. - if !block_has_more_work(&b.header, &ctx.head) { + let head = extension.batch.head()?; + if !has_more_work(&b.header, &head) { extension.force_rollback(); } @@ -195,13 +179,14 @@ pub fn process_block( ); // Add the newly accepted block and header to our index. - add_block(b, batch)?; + add_block(b, ctx)?; - // Update the chain head in the index (if necessary) - let res = update_head(b, ctx, batch)?; - - // Return the new chain tip if we added work, or - // None if this block has not added work. + // Update the chain head (and header_head) if total work is increased. + let res = { + let _ = update_header_head(&b.header, ctx)?; + let res = update_head(b, ctx)?; + res + }; Ok(res) } @@ -210,8 +195,7 @@ pub fn process_block( pub fn sync_block_headers( headers: &Vec, ctx: &mut BlockContext, - batch: &mut store::Batch, -) -> Result<(), Error> { +) -> Result, Error> { if let Some(header) = headers.first() { debug!( LOGGER, @@ -221,18 +205,18 @@ pub fn sync_block_headers( header.height, ); } else { - return Ok(()); + return Ok(None); } let all_known = if let Some(last_header) = headers.last() { - batch.get_block_header(&last_header.hash()).is_ok() + ctx.batch.get_block_header(&last_header.hash()).is_ok() } else { false }; if !all_known { for header in headers { - handle_block_header(header, ctx, batch)?; + handle_block_header(header, ctx)?; } } @@ -242,24 +226,21 @@ pub fn sync_block_headers( // progressing. // We only need to do this once at the end of this batch of headers. if let Some(header) = headers.last() { + // Update sync_head regardless of total work. + update_sync_head(header, &mut ctx.batch)?; + // Update header_head (but only if this header increases our total known work). // i.e. Only if this header is now the head of the current "most work" chain. - update_header_head(header, ctx, batch)?; - - // Update sync_head regardless of total work. - update_sync_head(header, batch)?; + let res = update_header_head(header, ctx)?; + Ok(res) + } else { + Ok(None) } - - Ok(()) } -fn handle_block_header( - header: &BlockHeader, - ctx: &mut BlockContext, - batch: &mut store::Batch, -) -> Result<(), Error> { - validate_header(header, ctx, batch)?; - add_block_header(header, batch)?; +fn handle_block_header(header: &BlockHeader, ctx: &mut BlockContext) -> Result<(), Error> { + validate_header(header, ctx)?; + add_block_header(header, ctx)?; Ok(()) } @@ -267,27 +248,25 @@ fn handle_block_header( /// We validate the header but we do not store it or update header head based /// on this. We will update these once we get the block back after requesting /// it. -pub fn process_block_header( - bh: &BlockHeader, - ctx: &mut BlockContext, - batch: &mut store::Batch, -) -> Result<(), Error> { +pub fn process_block_header(header: &BlockHeader, ctx: &mut BlockContext) -> Result<(), Error> { debug!( LOGGER, - "pipe: process_block_header at {} [{}]", - bh.height, - bh.hash() + "pipe: process_block_header: {} at {}", + header.hash(), + header.height, ); // keep this - check_header_known(bh.hash(), ctx)?; - validate_header(&bh, ctx, batch) + check_header_known(header, ctx)?; + validate_header(header, ctx)?; + Ok(()) } /// Quick in-memory check to fast-reject any block header we've already handled /// recently. Keeps duplicates from the network in check. /// ctx here is specific to the header_head (tip of the header chain) -fn check_header_known(bh: Hash, ctx: &mut BlockContext) -> Result<(), Error> { - if bh == ctx.header_head.last_block_h || bh == ctx.header_head.prev_block_h { +fn check_header_known(header: &BlockHeader, ctx: &mut BlockContext) -> Result<(), Error> { + let header_head = ctx.batch.header_head()?; + if header.hash() == header_head.last_block_h || header.hash() == header_head.prev_block_h { return Err(ErrorKind::Unfit("header already known".to_string()).into()); } Ok(()) @@ -297,8 +276,9 @@ fn check_header_known(bh: Hash, ctx: &mut BlockContext) -> Result<(), Error> { /// Keeps duplicates from the network in check. /// Checks against the last_block_h and prev_block_h of the chain head. fn check_known_head(header: &BlockHeader, ctx: &mut BlockContext) -> Result<(), Error> { + let head = ctx.batch.head()?; let bh = header.hash(); - if bh == ctx.head.last_block_h || bh == ctx.head.prev_block_h { + if bh == head.last_block_h || bh == head.prev_block_h { return Err(ErrorKind::Unfit("already known in head".to_string()).into()); } Ok(()) @@ -325,14 +305,11 @@ fn check_known_orphans(header: &BlockHeader, ctx: &mut BlockContext) -> Result<( } // Check if this block is in the store already. -fn check_known_store( - header: &BlockHeader, - ctx: &mut BlockContext, - batch: &mut store::Batch, -) -> Result<(), Error> { - match batch.block_exists(&header.hash()) { +fn check_known_store(header: &BlockHeader, ctx: &mut BlockContext) -> Result<(), Error> { + match ctx.batch.block_exists(&header.hash()) { Ok(true) => { - if header.height < ctx.head.height.saturating_sub(50) { + let head = ctx.batch.head()?; + if header.height < head.height.saturating_sub(50) { // TODO - we flag this as an "abusive peer" but only in the case // where we have the full block in our store. // So this is not a particularly exhaustive check. @@ -377,20 +354,16 @@ fn check_prev_store(header: &BlockHeader, batch: &mut store::Batch) -> Result<() // First check the header matches via current height index. // Then peek directly into the MMRs at the appropriate pos. // We can avoid a full rewind in this case. -fn check_known_mmr( - header: &BlockHeader, - ctx: &mut BlockContext, - batch: &mut store::Batch, - write_txhashset: &mut txhashset::TxHashSet, -) -> Result<(), Error> { +fn check_known_mmr(header: &BlockHeader, ctx: &mut BlockContext) -> Result<(), Error> { // No point checking the MMR if this block is not earlier in the chain. - if header.height > ctx.head.height { + let head = ctx.batch.head()?; + if header.height > head.height { return Ok(()); } // Use "header by height" index to look at current most work chain. // Header is not "known if the header differs at the given height. - let local_header = batch.get_header_by_height(header.height)?; + let local_header = ctx.batch.get_header_by_height(header.height)?; if local_header.hash() != header.hash() { return Ok(()); } @@ -399,7 +372,7 @@ fn check_known_mmr( // roots and sizes against the header. // If everything matches then this is a "known" block // and we do not need to spend any more effort - txhashset::extending_readonly(write_txhashset, |extension| { + txhashset::extending_readonly(&mut ctx.txhashset, |extension| { extension.rewind(header)?; // We want to return an error here (block already known) @@ -423,11 +396,7 @@ fn check_known_mmr( /// First level of block validation that only needs to act on the block header /// to make it as cheap as possible. The different validations are also /// arranged by order of cost to have as little DoS surface as possible. -fn validate_header( - header: &BlockHeader, - ctx: &mut BlockContext, - batch: &mut store::Batch, -) -> Result<(), Error> { +fn validate_header(header: &BlockHeader, ctx: &mut BlockContext) -> Result<(), Error> { // check version, enforces scheduled hard fork if !consensus::valid_header_version(header.height, header.version) { error!( @@ -467,7 +436,7 @@ fn validate_header( } // first I/O cost, better as late as possible - let prev = match batch.get_block_header(&header.previous) { + let prev = match ctx.batch.get_block_header(&header.previous) { Ok(prev) => prev, Err(grin_store::Error::NotFoundErr(_)) => return Err(ErrorKind::Orphan.into()), Err(e) => { @@ -516,7 +485,7 @@ fn validate_header( // explicit check to ensure total_difficulty has increased by exactly // the _network_ difficulty of the previous block // (during testnet1 we use _block_ difficulty here) - let child_batch = batch.child()?; + let child_batch = ctx.batch.child()?; let diff_iter = store::DifficultyIter::from(header.previous, child_batch); let network_difficulty = consensus::next_difficulty(diff_iter) .context(ErrorKind::Other("network difficulty".to_owned()))?; @@ -534,17 +503,13 @@ fn validate_header( Ok(()) } -fn validate_block( - block: &Block, - batch: &mut store::Batch, - verifier_cache: Arc>, -) -> Result<(), Error> { - let prev = batch.get_block_header(&block.header.previous)?; +fn validate_block(block: &Block, ctx: &mut BlockContext) -> Result<(), Error> { + let prev = ctx.batch.get_block_header(&block.header.previous)?; block .validate( &prev.total_kernel_offset, &prev.total_kernel_sum, - verifier_cache, + ctx.verifier_cache.clone(), ).map_err(|e| ErrorKind::InvalidBlockProof(e))?; Ok(()) } @@ -610,20 +575,20 @@ fn apply_block_to_txhashset(block: &Block, ext: &mut txhashset::Extension) -> Re } /// Officially adds the block to our chain. -fn add_block(b: &Block, batch: &mut store::Batch) -> Result<(), Error> { +fn add_block(b: &Block, ctx: &mut BlockContext) -> Result<(), Error> { // Save the block itself to the db (via the batch). - batch + ctx.batch .save_block(b) .map_err(|e| ErrorKind::StoreErr(e, "pipe save block".to_owned()))?; // Build the block_input_bitmap, save to the db (via the batch) and cache locally. - batch.build_and_cache_block_input_bitmap(&b)?; + ctx.batch.build_and_cache_block_input_bitmap(&b)?; Ok(()) } /// Officially adds the block header to our header chain. -fn add_block_header(bh: &BlockHeader, batch: &mut store::Batch) -> Result<(), Error> { - batch +fn add_block_header(bh: &BlockHeader, ctx: &mut BlockContext) -> Result<(), Error> { + ctx.batch .save_block_header(bh) .map_err(|e| ErrorKind::StoreErr(e, "pipe save header".to_owned()).into()) } @@ -631,38 +596,27 @@ fn add_block_header(bh: &BlockHeader, batch: &mut store::Batch) -> Result<(), Er /// Directly updates the head if we've just appended a new block to it or handle /// the situation where we've just added enough work to have a fork with more /// work than the head. -fn update_head( - b: &Block, - ctx: &BlockContext, - batch: &mut store::Batch, -) -> Result, Error> { +fn update_head(b: &Block, ctx: &BlockContext) -> Result, Error> { // if we made a fork with more work than the head (which should also be true // when extending the head), update it - if block_has_more_work(&b.header, &ctx.head) { - // update the block height index - batch - .setup_height(&b.header, &ctx.head) + let head = ctx.batch.head()?; + if has_more_work(&b.header, &head) { + // Update the block height index based on this new head. + ctx.batch + .setup_height(&b.header, &head) .map_err(|e| ErrorKind::StoreErr(e, "pipe setup height".to_owned()))?; - // in sync mode, only update the "body chain", otherwise update both the - // "header chain" and "body chain", updating the header chain in sync resets - // all additional "future" headers we've received let tip = Tip::from_block(&b.header); - if ctx.opts.contains(Options::SYNC) { - batch - .save_body_head(&tip) - .map_err(|e| ErrorKind::StoreErr(e, "pipe save body".to_owned()))?; - } else { - batch - .save_head(&tip) - .map_err(|e| ErrorKind::StoreErr(e, "pipe save head".to_owned()))?; - } + + ctx.batch + .save_body_head(&tip) + .map_err(|e| ErrorKind::StoreErr(e, "pipe save body".to_owned()))?; + debug!( LOGGER, - "pipe: chain head {} @ {}", - b.hash(), - b.header.height + "pipe: head updated to {} at {}", tip.last_block_h, tip.height ); + Ok(Some(tip)) } else { Ok(None) @@ -670,9 +624,8 @@ fn update_head( } // Whether the provided block totals more work than the chain tip -fn block_has_more_work(header: &BlockHeader, tip: &Tip) -> bool { - let block_tip = Tip::from_block(header); - block_tip.total_difficulty > tip.total_difficulty +fn has_more_work(header: &BlockHeader, head: &Tip) -> bool { + header.total_difficulty() > head.total_difficulty } /// Update the sync head so we can keep syncing from where we left off. @@ -686,18 +639,19 @@ fn update_sync_head(bh: &BlockHeader, batch: &mut store::Batch) -> Result<(), Er } /// Update the header head if this header has most work. -fn update_header_head( - bh: &BlockHeader, - ctx: &mut BlockContext, - batch: &mut store::Batch, -) -> Result, Error> { - let tip = Tip::from_block(bh); - if tip.total_difficulty > ctx.header_head.total_difficulty { - batch +fn update_header_head(bh: &BlockHeader, ctx: &mut BlockContext) -> Result, Error> { + let header_head = ctx.batch.header_head()?; + if has_more_work(&bh, &header_head) { + let tip = Tip::from_block(bh); + ctx.batch .save_header_head(&tip) .map_err(|e| ErrorKind::StoreErr(e, "pipe save header head".to_owned()))?; - ctx.header_head = tip.clone(); - debug!(LOGGER, "header head {} @ {}", bh.hash(), bh.height); + + debug!( + LOGGER, + "pipe: header_head updated to {} at {}", tip.last_block_h, tip.height + ); + Ok(Some(tip)) } else { Ok(None) diff --git a/chain/src/store.rs b/chain/src/store.rs index 1f87b6b1a..800050c7a 100644 --- a/chain/src/store.rs +++ b/chain/src/store.rs @@ -67,11 +67,13 @@ impl ChainStore { option_to_not_found(self.db.get_ser(&vec![HEAD_PREFIX]), "HEAD") } + /// Header of the block at the head of the block chain (not the same thing as header_head). pub fn head_header(&self) -> Result { self.get_block_header(&self.head()?.last_block_h) } - pub fn get_header_head(&self) -> Result { + /// Head of the header chain (not the same thing as head_header). + pub fn header_head(&self) -> Result { option_to_not_found(self.db.get_ser(&vec![HEADER_HEAD_PREFIX]), "HEADER_HEAD") } @@ -170,11 +172,13 @@ impl<'a> Batch<'a> { option_to_not_found(self.db.get_ser(&vec![HEAD_PREFIX]), "HEAD") } + /// Header of the block at the head of the block chain (not the same thing as header_head). pub fn head_header(&self) -> Result { self.get_block_header(&self.head()?.last_block_h) } - pub fn get_header_head(&self) -> Result { + /// Head of the header chain (not the same thing as head_header). + pub fn header_head(&self) -> Result { option_to_not_found(self.db.get_ser(&vec![HEADER_HEAD_PREFIX]), "HEADER_HEAD") } @@ -207,7 +211,7 @@ impl<'a> Batch<'a> { } pub fn reset_sync_head(&self) -> Result<(), Error> { - let head = self.get_header_head()?; + let head = self.header_head()?; self.save_sync_head(&head) } diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index ccbb15765..83476501a 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -61,7 +61,7 @@ pub struct NetToChainAdapter { impl p2p::ChainAdapter for NetToChainAdapter { fn total_difficulty(&self) -> Difficulty { - w(&self.chain).total_difficulty() + w(&self.chain).head().unwrap().total_difficulty } fn total_height(&self) -> u64 { @@ -450,7 +450,7 @@ impl NetToChainAdapter { let prev_hash = b.header.previous; let bhash = b.hash(); match chain.process_block(b, self.chain_opts()) { - Ok((tip, _)) => { + Ok(tip) => { self.validate_chain(bhash); self.check_compact(tip); true @@ -523,13 +523,13 @@ impl NetToChainAdapter { } } - fn check_compact(&self, tip_res: Option) { + fn check_compact(&self, tip: Option) { // no compaction during sync or if we're in historical mode if self.archive_mode || self.sync_state.is_syncing() { return; } - if let Some(tip) = tip_res { + if let Some(tip) = tip { // trigger compaction every 2000 blocks, uses a different thread to avoid // blocking the caller thread (likely a peer) if tip.height % 2000 == 0 { diff --git a/servers/src/grin/server.rs b/servers/src/grin/server.rs index c08c20ce9..a2f0827bd 100644 --- a/servers/src/grin/server.rs +++ b/servers/src/grin/server.rs @@ -379,7 +379,7 @@ impl Server { /// The head of the block header chain pub fn header_head(&self) -> chain::Tip { - self.chain.get_header_head().unwrap() + self.chain.header_head().unwrap() } /// Returns a set of stats about this server. This and the ServerStats diff --git a/servers/src/grin/sync.rs b/servers/src/grin/sync.rs index a626664bb..c12da882c 100644 --- a/servers/src/grin/sync.rs +++ b/servers/src/grin/sync.rs @@ -148,7 +148,7 @@ pub fn run_sync( // if syncing is needed let head = chain.head().unwrap(); - let header_head = chain.get_header_head().unwrap(); + let header_head = chain.header_head().unwrap(); // run the header sync in every 10s at least if si.header_sync_due(sync_state.as_ref(), &header_head) { @@ -365,7 +365,7 @@ impl BodySyncInfo { fn body_sync(peers: Arc, chain: Arc, body_sync_info: &mut BodySyncInfo) { let horizon = global::cut_through_horizon() as u64; let body_head: chain::Tip = chain.head().unwrap(); - let header_head: chain::Tip = chain.get_header_head().unwrap(); + let header_head: chain::Tip = chain.header_head().unwrap(); let sync_head: chain::Tip = chain.get_sync_head().unwrap(); body_sync_info.reset(); @@ -455,7 +455,7 @@ fn header_sync( chain: Arc, history_locators: &mut Vec<(u64, Hash)>, ) { - if let Ok(header_head) = chain.get_header_head() { + if let Ok(header_head) = chain.header_head() { let difficulty = header_head.total_difficulty; if let Some(peer) = peers.most_work_peer() { @@ -522,7 +522,7 @@ fn needs_syncing( peers: Arc, chain: Arc, ) -> (bool, u64) { - let local_diff = chain.total_difficulty(); + let local_diff = chain.head().unwrap().total_difficulty; let peer = peers.most_work_peer(); let is_syncing = sync_state.is_syncing(); let mut most_work_height = 0; @@ -596,7 +596,7 @@ fn get_locator( // for security, clear history_locators[] in any case of header chain rollback, // the easiest way is to check whether the sync head and the header head are identical. - if history_locators.len() > 0 && tip.hash() != chain.get_header_head()?.hash() { + if history_locators.len() > 0 && tip.hash() != chain.header_head()?.hash() { history_locators.clear(); } diff --git a/src/bin/tui/status.rs b/src/bin/tui/status.rs index db137747e..cb21c659c 100644 --- a/src/bin/tui/status.rs +++ b/src/bin/tui/status.rs @@ -41,13 +41,27 @@ impl TUIStatusListener for TUIStatusView { LinearLayout::new(Orientation::Horizontal) .child(TextView::new("Connected Peers: ")) .child(TextView::new("0").with_id("connected_peers")), + ).child( + LinearLayout::new(Orientation::Horizontal) + .child(TextView::new("------------------------")), + ).child( + LinearLayout::new(Orientation::Horizontal) + .child(TextView::new("Header Chain Height: ")) + .child(TextView::new(" ").with_id("basic_header_chain_height")), + ).child( + LinearLayout::new(Orientation::Horizontal) + .child(TextView::new("Header Cumulative Difficulty: ")) + .child(TextView::new(" ").with_id("basic_header_total_difficulty")), + ).child( + LinearLayout::new(Orientation::Horizontal) + .child(TextView::new("------------------------")), ).child( LinearLayout::new(Orientation::Horizontal) .child(TextView::new("Chain Height: ")) .child(TextView::new(" ").with_id("chain_height")), ).child( LinearLayout::new(Orientation::Horizontal) - .child(TextView::new("Total Difficulty: ")) + .child(TextView::new("Cumulative Difficulty: ")) .child(TextView::new(" ").with_id("basic_total_difficulty")), ).child( LinearLayout::new(Orientation::Horizontal) @@ -178,6 +192,12 @@ impl TUIStatusListener for TUIStatusView { c.call_on_id("basic_total_difficulty", |t: &mut TextView| { t.set_content(stats.head.total_difficulty.to_string()); }); + c.call_on_id("basic_header_chain_height", |t: &mut TextView| { + t.set_content(stats.header_head.height.to_string()); + }); + c.call_on_id("basic_header_total_difficulty", |t: &mut TextView| { + t.set_content(stats.header_head.total_difficulty.to_string()); + }); /*c.call_on_id("basic_mining_config_status", |t: &mut TextView| { t.set_content(basic_mining_config_status); });