From 383985292cd5610ff2e9b8a9ac4fc3722c5baf70 Mon Sep 17 00:00:00 2001 From: Antioch Peverell Date: Sat, 7 Sep 2019 00:28:26 +0100 Subject: [PATCH] Split header MMR (and sync MMR) out from txhashset (#3004) * wip * sync sort of works now * get rid of the deadlock during compaction there is *always* a deadlock in there when we make changes like this... * cleanup how we rebuild the sync MMR on init * cleanup rewind logic * roll the "fix invalid root" changes into this PR * move rebuild_height_pos_index into txhashset and pass in header_pmmr for header lookups * cleanup and remember to setup sync head on init * cleanup unnecessary ref muts * rebuild_height_pos_index when writing txhashset --- chain/src/chain.rs | 254 ++++++----- chain/src/pipe.rs | 146 +++---- chain/src/txhashset/txhashset.rs | 647 +++++++++------------------- chain/src/txhashset/utxo_view.rs | 2 +- chain/src/types.rs | 4 +- chain/tests/store_indices.rs | 1 - chain/tests/test_txhashset.rs | 2 - core/src/core/pmmr/pmmr.rs | 7 - core/src/core/pmmr/readonly_pmmr.rs | 5 + servers/src/common/adapters.rs | 19 +- servers/src/grin/server.rs | 8 +- src/bin/tui/ui.rs | 5 +- 12 files changed, 437 insertions(+), 663 deletions(-) diff --git a/chain/src/chain.rs b/chain/src/chain.rs index 2a4cc21dd..c76fca973 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -29,7 +29,7 @@ use crate::error::{Error, ErrorKind}; use crate::pipe; use crate::store; use crate::txhashset; -use crate::txhashset::TxHashSet; +use crate::txhashset::{PMMRHandle, TxHashSet}; use crate::types::{ BlockStatus, ChainAdapter, NoStatus, Options, OutputMMRPosition, Tip, TxHashSetRoots, TxHashsetWriteStatus, @@ -150,6 +150,8 @@ pub struct Chain { adapter: Arc, orphans: Arc, txhashset: Arc>, + header_pmmr: Arc>>, + sync_pmmr: Arc>>, verifier_cache: Arc>, // POW verification function pow_verifier: fn(&BlockHeader) -> Result<(), pow::Error>, @@ -174,7 +176,17 @@ impl Chain { // open the txhashset, creating a new one if necessary let mut txhashset = txhashset::TxHashSet::open(db_root.clone(), store.clone(), None)?; - setup_head(&genesis, &store, &mut txhashset)?; + let mut header_pmmr = + PMMRHandle::new(&db_root, "header", "header_head", false, true, None)?; + let mut sync_pmmr = PMMRHandle::new(&db_root, "header", "sync_head", false, true, None)?; + + setup_head( + &genesis, + &store, + &mut header_pmmr, + &mut sync_pmmr, + &mut txhashset, + )?; Chain::log_heads(&store)?; Ok(Chain { @@ -183,6 +195,8 @@ impl Chain { adapter, orphans: Arc::new(OrphanBlockPool::new()), txhashset: Arc::new(RwLock::new(txhashset)), + header_pmmr: Arc::new(RwLock::new(header_pmmr)), + sync_pmmr: Arc::new(RwLock::new(sync_pmmr)), pow_verifier, verifier_cache, archive_mode, @@ -190,6 +204,11 @@ impl Chain { }) } + /// Return our shared header MMR handle. + pub fn header_pmmr(&self) -> Arc>> { + self.header_pmmr.clone() + } + /// Return our shared txhashset instance. pub fn txhashset(&self) -> Arc> { self.txhashset.clone() @@ -277,9 +296,10 @@ impl Chain { /// or false if it has added to a fork (or orphan?). fn process_block_single(&self, b: Block, opts: Options) -> Result, Error> { let (maybe_new_head, prev_head) = { + let mut header_pmmr = self.header_pmmr.write(); let mut txhashset = self.txhashset.write(); let batch = self.store.batch()?; - let mut ctx = self.new_ctx(opts, batch, &mut txhashset)?; + let mut ctx = self.new_ctx(opts, batch, &mut header_pmmr, &mut txhashset)?; let prev_head = ctx.batch.head()?; @@ -355,9 +375,10 @@ impl Chain { /// Note: This will update header MMR and corresponding header_head /// if total work increases (on the header chain). pub fn process_block_header(&self, bh: &BlockHeader, opts: Options) -> Result<(), Error> { + let mut header_pmmr = self.header_pmmr.write(); let mut txhashset = self.txhashset.write(); let batch = self.store.batch()?; - let mut ctx = self.new_ctx(opts, batch, &mut txhashset)?; + let mut ctx = self.new_ctx(opts, batch, &mut header_pmmr, &mut txhashset)?; pipe::process_block_header(bh, &mut ctx)?; ctx.batch.commit()?; Ok(()) @@ -367,19 +388,26 @@ impl Chain { /// This is only ever used during sync and is based on sync_head. /// We update header_head here if our total work increases. pub fn sync_block_headers(&self, headers: &[BlockHeader], opts: Options) -> Result<(), Error> { + let mut sync_pmmr = self.sync_pmmr.write(); + let mut header_pmmr = self.header_pmmr.write(); let mut txhashset = self.txhashset.write(); - let batch = self.store.batch()?; - let mut ctx = self.new_ctx(opts, batch, &mut txhashset)?; // Sync the chunk of block headers, updating sync_head as necessary. - pipe::sync_block_headers(headers, &mut ctx)?; + { + let batch = self.store.batch()?; + let mut ctx = self.new_ctx(opts, batch, &mut sync_pmmr, &mut txhashset)?; + pipe::sync_block_headers(headers, &mut ctx)?; + ctx.batch.commit()?; + } // Now "process" the last block header, updating header_head to match sync_head. if let Some(header) = headers.last() { + let batch = self.store.batch()?; + let mut ctx = self.new_ctx(opts, batch, &mut header_pmmr, &mut txhashset)?; pipe::process_block_header(header, &mut ctx)?; + ctx.batch.commit()?; } - ctx.batch.commit()?; Ok(()) } @@ -387,12 +415,14 @@ impl Chain { &self, opts: Options, batch: store::Batch<'a>, + header_pmmr: &'a mut txhashset::PMMRHandle, txhashset: &'a mut txhashset::TxHashSet, ) -> Result, Error> { Ok(pipe::BlockContext { opts, pow_verifier: self.pow_verifier, verifier_cache: self.verifier_cache.clone(), + header_pmmr, txhashset, batch, }) @@ -476,8 +506,9 @@ impl Chain { /// Validate the tx against the current UTXO set. pub fn validate_tx(&self, tx: &Transaction) -> Result<(), Error> { + let header_pmmr = self.header_pmmr.read(); let txhashset = self.txhashset.read(); - txhashset::utxo_view(&txhashset, |utxo| { + txhashset::utxo_view(&header_pmmr, &txhashset, |utxo| { utxo.validate_tx(tx)?; Ok(()) }) @@ -492,8 +523,9 @@ impl Chain { /// that has not yet sufficiently matured. pub fn verify_coinbase_maturity(&self, tx: &Transaction) -> Result<(), Error> { let height = self.next_block_height()?; + let header_pmmr = self.header_pmmr.read(); let txhashset = self.txhashset.read(); - txhashset::utxo_view(&txhashset, |utxo| { + txhashset::utxo_view(&header_pmmr, &txhashset, |utxo| { utxo.verify_coinbase_maturity(&tx.inputs(), height)?; Ok(()) }) @@ -519,15 +551,16 @@ impl Chain { return Ok(()); } + let mut header_pmmr = self.header_pmmr.write(); let mut txhashset = self.txhashset.write(); // Now create an extension from the txhashset and validate against the // latest block header. Rewind the extension to the specified header to // ensure the view is consistent. - txhashset::extending_readonly(&mut txhashset, |extension| { - let head = extension.batch.head()?; - pipe::rewind_and_apply_fork(&header, &head, extension)?; - extension.validate(fast_validation, &NoStatus)?; + txhashset::extending_readonly(&mut header_pmmr, &mut txhashset, |mut ext| { + pipe::rewind_and_apply_fork(&header, &mut ext)?; + let ref mut extension = ext.extension; + extension.validate(&self.genesis, fast_validation, &NoStatus)?; Ok(()) }) } @@ -535,15 +568,19 @@ impl Chain { /// Sets the txhashset roots on a brand new block by applying the block on /// the current txhashset state. pub fn set_txhashset_roots(&self, b: &mut Block) -> Result<(), Error> { + let mut header_pmmr = self.header_pmmr.write(); let mut txhashset = self.txhashset.write(); + let (prev_root, roots, sizes) = - txhashset::extending_readonly(&mut txhashset, |extension| { - let previous_header = extension.batch.get_previous_header(&b.header)?; - let head = extension.batch.head()?; - pipe::rewind_and_apply_fork(&previous_header, &head, extension)?; + txhashset::extending_readonly(&mut header_pmmr, &mut txhashset, |ext| { + let previous_header = ext.batch().get_previous_header(&b.header)?; + pipe::rewind_and_apply_fork(&previous_header, ext)?; + + let ref mut extension = ext.extension; + let ref mut header_extension = ext.header_extension; // Retrieve the header root before we apply the new block - let prev_root = extension.header_root()?; + let prev_root = header_extension.root()?; // Apply the latest block to the chain state via the extension. extension.apply_block(b)?; @@ -562,7 +599,7 @@ impl Chain { // Set the output and kernel MMR sizes. { // Carefully destructure these correctly... - let (_, output_mmr_size, _, kernel_mmr_size) = sizes; + let (output_mmr_size, _, kernel_mmr_size) = sizes; b.header.output_mmr_size = output_mmr_size; b.header.kernel_mmr_size = kernel_mmr_size; } @@ -576,12 +613,14 @@ impl Chain { output: &OutputIdentifier, header: &BlockHeader, ) -> Result { + let mut header_pmmr = self.header_pmmr.write(); let mut txhashset = self.txhashset.write(); - let merkle_proof = txhashset::extending_readonly(&mut txhashset, |extension| { - let head = extension.batch.head()?; - pipe::rewind_and_apply_fork(&header, &head, extension)?; - extension.merkle_proof(output) - })?; + let merkle_proof = + txhashset::extending_readonly(&mut header_pmmr, &mut txhashset, |ext| { + pipe::rewind_and_apply_fork(&header, ext)?; + let ref mut extension = ext.extension; + extension.merkle_proof(output) + })?; Ok(merkle_proof) } @@ -631,11 +670,11 @@ impl Chain { // to rewind after receiving the txhashset zip. let header = self.get_block_header(&h)?; { + let mut header_pmmr = self.header_pmmr.write(); let mut txhashset = self.txhashset.write(); - txhashset::extending_readonly(&mut txhashset, |extension| { - let head = extension.batch.head()?; - pipe::rewind_and_apply_fork(&header, &head, extension)?; - + txhashset::extending_readonly(&mut header_pmmr, &mut txhashset, |ext| { + pipe::rewind_and_apply_fork(&header, ext)?; + let ref mut extension = ext.extension; extension.snapshot()?; Ok(()) })?; @@ -706,31 +745,13 @@ impl Chain { /// Rebuild the sync MMR based on current header_head. /// We rebuild the sync MMR when first entering sync mode so ensure we /// have an MMR we can safely rewind based on the headers received from a peer. - /// TODO - think about how to optimize this. pub fn rebuild_sync_mmr(&self, head: &Tip) -> Result<(), Error> { - let mut txhashset = self.txhashset.write(); + let mut sync_pmmr = self.sync_pmmr.write(); let mut batch = self.store.batch()?; - txhashset::sync_extending(&mut txhashset, &mut batch, |extension| { - extension.rebuild(head, &self.genesis)?; - Ok(()) - })?; - batch.commit()?; - Ok(()) - } - - /// Rebuild the header MMR based on current header_head. - /// We rebuild the header MMR after receiving a txhashset from a peer. - /// The txhashset contains output, rangeproof and kernel MMRs but we construct - /// the header MMR locally based on headers from our db. - /// TODO - think about how to optimize this. - fn rebuild_header_mmr( - &self, - head: &Tip, - txhashset: &mut txhashset::TxHashSet, - ) -> Result<(), Error> { - let mut batch = self.store.batch()?; - txhashset::header_extending(txhashset, &mut batch, |extension| { - extension.rebuild(head, &self.genesis)?; + let sync_head = batch.get_sync_head()?; + let header = batch.get_block_header(&head.hash())?; + txhashset::header_extending(&mut sync_pmmr, &sync_head, &mut batch, |extension| { + pipe::rewind_and_apply_header_fork(&header, extension)?; Ok(()) })?; batch.commit()?; @@ -824,7 +845,6 @@ impl Chain { /// Clean the temporary sandbox folder pub fn clean_txhashset_sandbox(&self) { txhashset::clean_txhashset_folder(&self.get_tmp_dir()); - txhashset::clean_header_folder(&self.get_tmp_dir()); } /// Specific tmp dir. @@ -889,7 +909,6 @@ impl Chain { // Write txhashset to sandbox (in the Grin specific tmp dir) let sandbox_dir = self.get_tmp_dir(); txhashset::clean_txhashset_folder(&sandbox_dir); - txhashset::clean_header_folder(&sandbox_dir); txhashset::zip_write(sandbox_dir.clone(), txhashset_data.try_clone()?, &header)?; let mut txhashset = txhashset::TxHashSet::open( @@ -901,24 +920,21 @@ impl Chain { Some(&header), )?; - // The txhashset.zip contains the output, rangeproof and kernel MMRs. - // We must rebuild the header MMR ourselves based on the headers in our db. - self.rebuild_header_mmr(&Tip::from_header(&header), &mut txhashset)?; - // Validate the full kernel history (kernel MMR root for every block header). self.validate_kernel_history(&header, &txhashset)?; // all good, prepare a new batch and update all the required records debug!("txhashset_write: rewinding a 2nd time (writeable)"); + let mut header_pmmr = self.header_pmmr.write(); let mut batch = self.store.batch()?; - - txhashset::extending(&mut txhashset, &mut batch, |extension| { + txhashset::extending(&mut header_pmmr, &mut txhashset, &mut batch, |ext| { + let extension = &mut ext.extension; extension.rewind(&header)?; // Validate the extension, generating the utxo_sum and kernel_sum. // Full validation, including rangeproofs and kernel signature verification. - let (utxo_sum, kernel_sum) = extension.validate(false, status)?; + let (utxo_sum, kernel_sum) = extension.validate(&self.genesis, false, status)?; // Save the block_sums (utxo_sum, kernel_sum) to the db for use later. extension.batch.save_block_sums( @@ -946,6 +962,9 @@ impl Chain { batch.save_body_tail(&tip)?; } + // Rebuild our output_pos index in the db based on current UTXO set. + txhashset.rebuild_height_pos_index(&header_pmmr, &mut batch)?; + // Commit all the changes to the db. batch.commit()?; @@ -969,21 +988,17 @@ impl Chain { Some(&header), )?; - self.rebuild_header_mmr(&Tip::from_header(&header), &mut txhashset)?; - txhashset::clean_header_folder(&sandbox_dir); - // Replace the chain txhashset with the newly built one. *txhashset_ref = txhashset; } debug!("txhashset_write: replaced our txhashset with the new one"); - self.rebuild_height_for_pos()?; - // Check for any orphan blocks and process them based on the new chain state. self.check_orphans(header.height + 1); status.on_done(); + Ok(false) } @@ -992,7 +1007,7 @@ impl Chain { /// *Only* runs if we are not in archive mode. fn remove_historical_blocks( &self, - txhashset: &txhashset::TxHashSet, + header_pmmr: &txhashset::PMMRHandle, batch: &mut store::Batch<'_>, ) -> Result<(), Error> { if self.archive_mode { @@ -1019,7 +1034,7 @@ impl Chain { } let mut count = 0; - let tail_hash = txhashset.get_header_hash_by_height(head.height - horizon)?; + let tail_hash = header_pmmr.get_header_hash_by_height(head.height - horizon)?; let tail = batch.get_block_header(&tail_hash)?; // Remove old blocks (including short lived fork blocks) which height < tail.height @@ -1067,29 +1082,34 @@ impl Chain { } } + // Take a write lock on the txhashet and start a new writeable db batch. + let header_pmmr = self.header_pmmr.read(); + let mut txhashset = self.txhashset.write(); + let mut batch = self.store.batch()?; + + // Compact the txhashset itself (rewriting the pruned backend files). { - // Take a write lock on the txhashet and start a new writeable db batch. - let mut txhashset = self.txhashset.write(); - let mut batch = self.store.batch()?; + let head_header = batch.head_header()?; + let current_height = head_header.height; + let horizon_height = + current_height.saturating_sub(global::cut_through_horizon().into()); + let horizon_hash = header_pmmr.get_header_hash_by_height(horizon_height)?; + let horizon_header = batch.get_block_header(&horizon_hash)?; - // Compact the txhashset itself (rewriting the pruned backend files). - txhashset.compact(&mut batch)?; - - // Rebuild our output_pos index in the db based on current UTXO set. - txhashset::extending(&mut txhashset, &mut batch, |extension| { - extension.rebuild_height_pos_index()?; - Ok(()) - })?; - - // If we are not in archival mode remove historical blocks from the db. - if !self.archive_mode { - self.remove_historical_blocks(&txhashset, &mut batch)?; - } - - // Commit all the above db changes. - batch.commit()?; + txhashset.compact(&horizon_header, &mut batch)?; } + // Rebuild our output_pos index in the db based on current UTXO set. + txhashset.rebuild_height_pos_index(&header_pmmr, &mut batch)?; + + // If we are not in archival mode remove historical blocks from the db. + if !self.archive_mode { + self.remove_historical_blocks(&header_pmmr, &mut batch)?; + } + + // Commit all the above db changes. + batch.commit()?; + Ok(()) } @@ -1202,20 +1222,16 @@ impl Chain { } /// Gets the block header at the provided height. - /// Note: Takes a read lock on the txhashset. - /// Take care not to call this repeatedly in a tight loop. + /// Note: Takes a read lock on the header_pmmr. pub fn get_header_by_height(&self, height: u64) -> Result { let hash = self.get_header_hash_by_height(height)?; self.get_block_header(&hash) } /// Gets the header hash at the provided height. - /// Note: Takes a read lock on the txhashset. - /// Take care not to call this repeatedly in a tight loop. + /// Note: Takes a read lock on the header_pmmr. fn get_header_hash_by_height(&self, height: u64) -> Result { - let txhashset = self.txhashset.read(); - let hash = txhashset.get_header_hash_by_height(height)?; - Ok(hash) + self.header_pmmr.read().get_header_hash_by_height(height) } /// Migrate the index 'commitment -> output_pos' to index 'commitment -> (output_pos, block_height)' @@ -1223,6 +1239,7 @@ impl Chain { /// - Node start-up. For database migration from the old version. /// - After the txhashset 'rebuild_index' when state syncing. pub fn rebuild_height_for_pos(&self) -> Result<(), Error> { + let header_pmmr = self.header_pmmr.read(); let txhashset = self.txhashset.read(); let mut outputs_pos = txhashset.get_all_output_pos()?; let total_outputs = outputs_pos.len(); @@ -1248,7 +1265,8 @@ impl Chain { let mut i = 0; for search_height in 0..max_height { - let h = txhashset.get_header_by_height(search_height + 1)?; + let hash = header_pmmr.get_header_hash_by_height(search_height + 1)?; + let h = batch.get_block_header(&hash)?; while i < total_outputs { let (commit, pos) = outputs_pos[i]; if pos > h.output_mmr_size { @@ -1274,10 +1292,11 @@ impl Chain { &self, output_ref: &OutputIdentifier, ) -> Result { + let header_pmmr = self.header_pmmr.read(); let txhashset = self.txhashset.read(); - let output_pos = txhashset.is_unspent(output_ref)?; - Ok(txhashset.get_header_by_height(output_pos.height)?) + let hash = header_pmmr.get_header_hash_by_height(output_pos.height)?; + Ok(self.get_block_header(&hash)?) } /// Gets the kernel with a given excess and the block height it is included in. @@ -1318,7 +1337,7 @@ impl Chain { min_height: Option, max_height: Option, ) -> Result { - let txhashset = self.txhashset.read(); + let header_pmmr = self.header_pmmr.read(); let mut min = min_height.unwrap_or(0).saturating_sub(1); let mut max = match max_height { @@ -1328,11 +1347,13 @@ impl Chain { loop { let search_height = max - (max - min) / 2; - let h = txhashset.get_header_by_height(search_height)?; + let hash = header_pmmr.get_header_hash_by_height(search_height)?; + let h = self.get_block_header(&hash)?; if search_height == 0 { return Ok(h); } - let h_prev = txhashset.get_header_by_height(search_height - 1)?; + let hash_prev = header_pmmr.get_header_hash_by_height(search_height - 1)?; + let h_prev = self.get_block_header(&hash_prev)?; if kernel_mmr_index > h.kernel_mmr_size { min = search_height; } else if kernel_mmr_index < h_prev.kernel_mmr_size { @@ -1386,6 +1407,8 @@ impl Chain { fn setup_head( genesis: &Block, store: &store::ChainStore, + header_pmmr: &mut txhashset::PMMRHandle, + sync_pmmr: &mut txhashset::PMMRHandle, txhashset: &mut txhashset::TxHashSet, ) -> Result<(), Error> { let mut batch = store.batch()?; @@ -1403,24 +1426,11 @@ fn setup_head( // to match the provided block header. let header = batch.get_block_header(&head.last_block_h)?; - // If we have no header MMR then rebuild as necessary. - // Supports old nodes with no header MMR. - txhashset::header_extending(txhashset, &mut batch, |extension| { - let needs_rebuild = match extension.get_header_by_height(head.height) { - Ok(header) => header.hash() != head.last_block_h, - Err(_) => true, - }; + let res = txhashset::extending(header_pmmr, txhashset, &mut batch, |ext| { + pipe::rewind_and_apply_fork(&header, ext)?; - if needs_rebuild { - extension.rebuild(&head, &genesis.header)?; - } + let ref mut extension = ext.extension; - Ok(()) - })?; - - let res = txhashset::extending(txhashset, &mut batch, |extension| { - let head = extension.batch.head()?; - pipe::rewind_and_apply_fork(&header, &head, extension)?; extension.validate_roots()?; // now check we have the "block sums" for the block in question @@ -1436,7 +1446,8 @@ fn setup_head( // Do a full (and slow) validation of the txhashset extension // to calculate the utxo_sum and kernel_sum at this block height. - let (utxo_sum, kernel_sum) = extension.validate_kernel_sums()?; + let (utxo_sum, kernel_sum) = + extension.validate_kernel_sums(&genesis.header)?; // Save the block_sums to the db for use later. extension.batch.save_block_sums( @@ -1494,11 +1505,16 @@ fn setup_head( kernel_sum, }; } - txhashset::header_extending(txhashset, &mut batch, |extension| { + txhashset::header_extending(header_pmmr, &tip, &mut batch, |extension| { extension.apply_header(&genesis.header)?; Ok(()) })?; - txhashset::extending(txhashset, &mut batch, |extension| { + txhashset::header_extending(sync_pmmr, &tip, &mut batch, |extension| { + extension.apply_header(&genesis.header)?; + Ok(()) + })?; + txhashset::extending(header_pmmr, txhashset, &mut batch, |ext| { + let ref mut extension = ext.extension; extension.apply_block(&genesis)?; extension.validate_roots()?; extension.validate_sizes()?; diff --git a/chain/src/pipe.rs b/chain/src/pipe.rs index 4d3c8e1ba..26d8c9dfd 100644 --- a/chain/src/pipe.rs +++ b/chain/src/pipe.rs @@ -39,6 +39,8 @@ pub struct BlockContext<'a> { pub pow_verifier: fn(&BlockHeader) -> Result<(), pow::Error>, /// The active txhashset (rewindable MMRs) to use for block processing. pub txhashset: &'a mut txhashset::TxHashSet, + /// The active header MMR handle. + pub header_pmmr: &'a mut txhashset::PMMRHandle, /// The active batch to use for block processing. pub batch: store::Batch<'a>, /// The verifier cache (caching verifier for rangeproofs and kernel signatures) @@ -113,35 +115,38 @@ pub fn process_block(b: &Block, ctx: &mut BlockContext<'_>) -> Result) -> // Check this header is not an orphan, we must know about the previous header to continue. let prev_header = ctx.batch.get_previous_header(&header)?; - // If this header is "known" then stop processing the header. - // Do not stop processing with an error though. + // Check if we know about the full block for this header. if check_known(header, ctx).is_err() { return Ok(()); } @@ -238,12 +236,12 @@ pub fn process_block_header(header: &BlockHeader, ctx: &mut BlockContext<'_>) -> } } - txhashset::header_extending(&mut ctx.txhashset, &mut ctx.batch, |extension| { - rewind_and_apply_header_fork(&prev_header, extension)?; - extension.validate_root(header)?; - extension.apply_header(header)?; + txhashset::header_extending(&mut ctx.header_pmmr, &header_head, &mut ctx.batch, |ext| { + rewind_and_apply_header_fork(&prev_header, ext)?; + ext.validate_root(header)?; + ext.apply_header(header)?; if !has_more_work(&header, &header_head) { - extension.force_rollback(); + ext.force_rollback(); } Ok(()) })?; @@ -413,8 +411,14 @@ fn validate_block(block: &Block, ctx: &mut BlockContext<'_>) -> Result<(), Error } /// Verify the block is not spending coinbase outputs before they have sufficiently matured. -fn verify_coinbase_maturity(block: &Block, ext: &txhashset::Extension<'_>) -> Result<(), Error> { - ext.utxo_view() +fn verify_coinbase_maturity( + block: &Block, + ext: &txhashset::ExtensionPair<'_>, +) -> Result<(), Error> { + let ref extension = ext.extension; + let ref header_extension = ext.header_extension; + extension + .utxo_view(header_extension) .verify_coinbase_maturity(&block.inputs(), block.header.height) } @@ -445,12 +449,12 @@ fn verify_block_sums(b: &Block, batch: &store::Batch<'_>) -> Result, + ext: &mut txhashset::ExtensionPair<'_>, ) -> Result<(), Error> { - ext.validate_header_root(&block.header)?; - ext.apply_block(block)?; - ext.validate_roots()?; - ext.validate_sizes()?; + let ref mut extension = ext.extension; + extension.apply_block(block)?; + extension.validate_roots()?; + extension.validate_sizes()?; Ok(()) } @@ -545,6 +549,7 @@ pub fn rewind_and_apply_header_fork( .batch .get_block_header(&h) .map_err(|e| ErrorKind::StoreErr(e, format!("getting forked headers")))?; + ext.validate_root(&header)?; ext.apply_header(&header)?; } @@ -557,48 +562,35 @@ pub fn rewind_and_apply_header_fork( /// the expected state. pub fn rewind_and_apply_fork( header: &BlockHeader, - header_head: &Tip, - ext: &mut txhashset::Extension<'_>, + ext: &mut txhashset::ExtensionPair<'_>, ) -> Result<(), Error> { - // TODO - Skip the "rewind and reapply" if everything is aligned and this is the "next" block. - // This will be significantly easier once we break out the header extension. + let ref mut batch = ext.batch(); + let ref mut extension = ext.extension; + let ref mut header_extension = ext.header_extension; - // Find the fork point where head and header_head diverge. - // We may need to rewind back to this fork point if they diverged - // prior to the fork point for the provided header. - let header_forked_header = { - let mut current = ext.batch.get_block_header(&header_head.last_block_h)?; - while current.height > 0 && !ext.is_on_current_chain(¤t).is_ok() { - current = ext.batch.get_previous_header(¤t)?; - } - current - }; + // Prepare the header MMR. + rewind_and_apply_header_fork(header, header_extension)?; - // Find the fork point where the provided header diverges from our main chain. - // Account for the header fork point. Use the earliest fork point to determine - // where we need to rewind to. We need to do this - let (forked_header, fork_hashes) = { - let mut fork_hashes = vec![]; - let mut current = header.clone(); - while current.height > 0 - && (!ext.is_on_current_chain(¤t).is_ok() - || current.height > header_forked_header.height) - { - fork_hashes.push(current.hash()); - current = ext.batch.get_previous_header(¤t)?; - } - fork_hashes.reverse(); + // Rewind the txhashset extension back to common ancestor based on header MMR. + let mut current = batch.head_header()?; + while current.height > 0 && !header_extension.is_on_current_chain(¤t).is_ok() { + current = batch.get_previous_header(¤t)?; + } + let fork_point = current; + extension.rewind(&fork_point)?; - (current, fork_hashes) - }; + // Then apply all full blocks since this common ancestor + // to put txhashet extension in a state to accept the new block. + let mut fork_hashes = vec![]; + let mut current = header.clone(); + while current.height > fork_point.height { + fork_hashes.push(current.hash()); + current = batch.get_previous_header(¤t)?; + } + fork_hashes.reverse(); - // Rewind the txhashset state back to the block where we forked from the most work chain. - ext.rewind(&forked_header)?; - - // Now re-apply all blocks on this fork. for h in fork_hashes { - let fb = ext - .batch + let fb = batch .get_block(&h) .map_err(|e| ErrorKind::StoreErr(e, format!("getting forked blocks")))?; @@ -607,7 +599,7 @@ pub fn rewind_and_apply_fork( // Validate the block against the UTXO set. validate_utxo(&fb, ext)?; // Re-verify block_sums to set the block_sums up on this fork correctly. - verify_block_sums(&fb, &ext.batch)?; + verify_block_sums(&fb, batch)?; // Re-apply the blocks. apply_block_to_txhashset(&fb, ext)?; } @@ -615,6 +607,8 @@ pub fn rewind_and_apply_fork( Ok(()) } -fn validate_utxo(block: &Block, ext: &txhashset::Extension<'_>) -> Result<(), Error> { - ext.utxo_view().validate_block(block) +fn validate_utxo(block: &Block, ext: &mut txhashset::ExtensionPair<'_>) -> Result<(), Error> { + let ref mut extension = ext.extension; + let ref mut header_extension = ext.header_extension; + extension.utxo_view(header_extension).validate_block(block) } diff --git a/chain/src/txhashset/txhashset.rs b/chain/src/txhashset/txhashset.rs index 3ffa9f373..6f7e2c456 100644 --- a/chain/src/txhashset/txhashset.rs +++ b/chain/src/txhashset/txhashset.rs @@ -22,7 +22,6 @@ use crate::core::core::pmmr::{self, Backend, ReadonlyPMMR, RewindablePMMR, PMMR} use crate::core::core::{ Block, BlockHeader, Input, Output, OutputIdentifier, TxKernel, TxKernelEntry, }; -use crate::core::global; use crate::core::ser::{PMMRIndexHashable, PMMRable}; use crate::error::{Error, ErrorKind}; use crate::store::{Batch, ChainStore}; @@ -38,25 +37,26 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Instant; -const HEADERHASHSET_SUBDIR: &'static str = "header"; const TXHASHSET_SUBDIR: &'static str = "txhashset"; -const HEADER_HEAD_SUBDIR: &'static str = "header_head"; -const SYNC_HEAD_SUBDIR: &'static str = "sync_head"; - const OUTPUT_SUBDIR: &'static str = "output"; const RANGE_PROOF_SUBDIR: &'static str = "rangeproof"; const KERNEL_SUBDIR: &'static str = "kernel"; const TXHASHSET_ZIP: &'static str = "txhashset_snapshot"; -struct PMMRHandle { - backend: PMMRBackend, - last_pos: u64, +/// Convenience wrapper around a single prunable MMR backend. +pub struct PMMRHandle { + /// The backend storage for the MMR. + pub backend: PMMRBackend, + /// The last position accessible via this MMR handle (backend may continue out beyond this). + pub last_pos: u64, } impl PMMRHandle { - fn new( + /// Constructor to create a PMMR handle from an existing directory structure on disk. + /// Creates the backend files as necessary if they do not already exist. + pub fn new( root_dir: &str, sub_dir: &str, file_name: &str, @@ -75,6 +75,19 @@ impl PMMRHandle { } } +impl PMMRHandle { + /// Get the header hash at the specified height based on the current header MMR state. + pub fn get_header_hash_by_height(&self, height: u64) -> Result { + let pos = pmmr::insertion_to_pmmr_index(height + 1); + let header_pmmr = ReadonlyPMMR::at(&self.backend, self.last_pos); + if let Some(entry) = header_pmmr.get_data(pos) { + Ok(entry.hash()) + } else { + Err(ErrorKind::Other(format!("get header hash by height")).into()) + } + } +} + /// An easy to manipulate structure holding the 3 sum trees necessary to /// validate blocks and capturing the Output set, the range proofs and the /// kernels. Also handles the index of Commitments to positions in the @@ -85,21 +98,6 @@ impl PMMRHandle { /// may have commitments that have already been spent, even with /// pruning enabled. pub struct TxHashSet { - /// Header MMR to support the header_head chain. - /// This is rewound and applied transactionally with the - /// output, rangeproof and kernel MMRs during an extension or a - /// readonly_extension. - /// It can also be rewound and applied separately via a header_extension. - header_pmmr_h: PMMRHandle, - - /// Header MMR to support exploratory sync_head. - /// The header_head and sync_head chains can diverge so we need to maintain - /// multiple header MMRs during the sync process. - /// - /// Note: this is rewound and applied separately to the other MMRs - /// via a "sync_extension". - sync_pmmr_h: PMMRHandle, - output_pmmr_h: PMMRHandle, rproof_pmmr_h: PMMRHandle, kernel_pmmr_h: PMMRHandle, @@ -116,22 +114,6 @@ impl TxHashSet { header: Option<&BlockHeader>, ) -> Result { Ok(TxHashSet { - header_pmmr_h: PMMRHandle::new( - &root_dir, - HEADERHASHSET_SUBDIR, - HEADER_HEAD_SUBDIR, - false, - true, - None, - )?, - sync_pmmr_h: PMMRHandle::new( - &root_dir, - HEADERHASHSET_SUBDIR, - SYNC_HEAD_SUBDIR, - false, - true, - None, - )?, output_pmmr_h: PMMRHandle::new( &root_dir, TXHASHSET_SUBDIR, @@ -162,8 +144,6 @@ impl TxHashSet { /// Close all backend file handles pub fn release_backend_files(&mut self) { - self.header_pmmr_h.backend.release_files(); - self.sync_pmmr_h.backend.release_files(); self.output_pmmr_h.backend.release_files(); self.rproof_pmmr_h.backend.release_files(); self.kernel_pmmr_h.backend.release_files(); @@ -217,25 +197,9 @@ impl TxHashSet { .get_last_n_insertions(distance) } - /// Get the header hash at the specified height based on the current state of the txhashset. - pub fn get_header_hash_by_height(&self, height: u64) -> Result { - let pos = pmmr::insertion_to_pmmr_index(height + 1); - let header_pmmr = - ReadonlyPMMR::at(&self.header_pmmr_h.backend, self.header_pmmr_h.last_pos); - if let Some(entry) = header_pmmr.get_data(pos) { - Ok(entry.hash()) - } else { - Err(ErrorKind::Other(format!("get header hash by height")).into()) - } - } - - /// Get the header at the specified height based on the current state of the txhashset. - /// Derives the MMR pos from the height (insertion index) and retrieves the header hash. - /// Looks the header up in the db by hash. - pub fn get_header_by_height(&self, height: u64) -> Result { - let hash = self.get_header_hash_by_height(height)?; - let header = self.commit_index.get_block_header(&hash)?; - Ok(header) + /// Convenience function to query the db for a header by its hash. + pub fn get_block_header(&self, hash: &Hash) -> Result { + Ok(self.commit_index.get_block_header(&hash)?) } /// Get all outputs MMR pos @@ -294,8 +258,8 @@ impl TxHashSet { /// Get MMR roots. pub fn roots(&self) -> TxHashSetRoots { - let header_pmmr = - ReadonlyPMMR::at(&self.header_pmmr_h.backend, self.header_pmmr_h.last_pos); + // let header_pmmr = + // ReadonlyPMMR::at(&self.header_pmmr_h.backend, self.header_pmmr_h.last_pos); let output_pmmr = ReadonlyPMMR::at(&self.output_pmmr_h.backend, self.output_pmmr_h.last_pos); let rproof_pmmr = @@ -304,7 +268,7 @@ impl TxHashSet { ReadonlyPMMR::at(&self.kernel_pmmr_h.backend, self.kernel_pmmr_h.last_pos); TxHashSetRoots { - header_root: header_pmmr.root(), + // header_root: header_pmmr.root(), output_root: output_pmmr.root(), rproof_root: rproof_pmmr.root(), kernel_root: kernel_pmmr.root(), @@ -325,17 +289,14 @@ impl TxHashSet { } /// Compact the MMR data files and flush the rm logs - pub fn compact(&mut self, batch: &mut Batch<'_>) -> Result<(), Error> { + pub fn compact( + &mut self, + horizon_header: &BlockHeader, + batch: &mut Batch<'_>, + ) -> Result<(), Error> { debug!("txhashset: starting compaction..."); let head_header = batch.head_header()?; - let current_height = head_header.height; - - // horizon for compacting is based on current_height - let horizon_height = current_height.saturating_sub(global::cut_through_horizon().into()); - let horizon_hash = self.get_header_hash_by_height(horizon_height)?; - let horizon_header = batch.get_block_header(&horizon_hash)?; - let rewind_rm_pos = input_pos_to_rewind(&horizon_header, &head_header, batch)?; debug!("txhashset: check_compact output mmr backend..."); @@ -352,6 +313,65 @@ impl TxHashSet { Ok(()) } + + /// Rebuild the index of block height & MMR positions to the corresponding UTXOs. + /// This is a costly operation performed only when we receive a full new chain state. + /// Note: only called by compact. + pub fn rebuild_height_pos_index( + &self, + header_pmmr: &PMMRHandle, + batch: &mut Batch<'_>, + ) -> Result<(), Error> { + let now = Instant::now(); + + let output_pmmr = + ReadonlyPMMR::at(&self.output_pmmr_h.backend, self.output_pmmr_h.last_pos); + + // clear it before rebuilding + batch.clear_output_pos_height()?; + + let mut outputs_pos: Vec<(Commitment, u64)> = vec![]; + for pos in output_pmmr.leaf_pos_iter() { + if let Some(out) = output_pmmr.get_data(pos) { + outputs_pos.push((out.commit, pos)); + } + } + let total_outputs = outputs_pos.len(); + if total_outputs == 0 { + debug!("rebuild_height_pos_index: nothing to be rebuilt"); + return Ok(()); + } else { + debug!( + "rebuild_height_pos_index: rebuilding {} outputs position & height...", + total_outputs + ); + } + + let max_height = batch.head()?.height; + + let mut i = 0; + for search_height in 0..max_height { + let hash = header_pmmr.get_header_hash_by_height(search_height + 1)?; + let h = batch.get_block_header(&hash)?; + while i < total_outputs { + let (commit, pos) = outputs_pos[i]; + if pos > h.output_mmr_size { + // Note: MMR position is 1-based and not 0-based, so here must be '>' instead of '>=' + break; + } + batch.save_output_pos_height(&commit, pos, h.height)?; + trace!("rebuild_height_pos_index: {:?}", (commit, pos, h.height)); + i += 1; + } + } + + debug!( + "rebuild_height_pos_index: {} UTXOs, took {}s", + total_outputs, + now.elapsed().as_secs(), + ); + Ok(()) + } } /// Starts a new unit of work to extend (or rewind) the chain with additional @@ -360,28 +380,37 @@ impl TxHashSet { /// of blocks to the txhashset and the checking of the current tree roots. /// /// The unit of work is always discarded (always rollback) as this is read-only. -pub fn extending_readonly(trees: &mut TxHashSet, inner: F) -> Result +pub fn extending_readonly( + handle: &mut PMMRHandle, + trees: &mut TxHashSet, + inner: F, +) -> Result where - F: FnOnce(&mut Extension<'_>) -> Result, + F: FnOnce(&mut ExtensionPair<'_>) -> Result, { let commit_index = trees.commit_index.clone(); let batch = commit_index.batch()?; - // We want to use the current head of the most work chain unless - // we explicitly rewind the extension. - let head = batch.head()?; - trace!("Starting new txhashset (readonly) extension."); + let head = batch.head()?; + let header_head = batch.header_head()?; + let res = { + let header_pmmr = PMMR::at(&mut handle.backend, handle.last_pos); + let mut header_extension = HeaderExtension::new(header_pmmr, &batch, header_head); let mut extension = Extension::new(trees, &batch, head); - extension.force_rollback(); - inner(&mut extension) + let mut extension_pair = ExtensionPair { + header_extension: &mut header_extension, + extension: &mut extension, + }; + inner(&mut extension_pair) }; trace!("Rollbacking txhashset (readonly) extension."); - trees.header_pmmr_h.backend.discard(); + handle.backend.discard(); + trees.output_pmmr_h.backend.discard(); trees.rproof_pmmr_h.backend.discard(); trees.kernel_pmmr_h.backend.discard(); @@ -393,7 +422,11 @@ where /// Readonly view on the UTXO set. /// Based on the current txhashset output_pmmr. -pub fn utxo_view(trees: &TxHashSet, inner: F) -> Result +pub fn utxo_view( + handle: &PMMRHandle, + trees: &TxHashSet, + inner: F, +) -> Result where F: FnOnce(&UTXOView<'_>) -> Result, { @@ -401,8 +434,7 @@ where { let output_pmmr = ReadonlyPMMR::at(&trees.output_pmmr_h.backend, trees.output_pmmr_h.last_pos); - let header_pmmr = - ReadonlyPMMR::at(&trees.header_pmmr_h.backend, trees.header_pmmr_h.last_pos); + let header_pmmr = ReadonlyPMMR::at(&handle.backend, handle.last_pos); // Create a new batch here to pass into the utxo_view. // Discard it (rollback) after we finish with the utxo_view. @@ -445,20 +477,20 @@ where /// If the closure returns an error, modifications are canceled and the unit /// of work is abandoned. Otherwise, the unit of work is permanently applied. pub fn extending<'a, F, T>( + header_pmmr: &'a mut PMMRHandle, trees: &'a mut TxHashSet, batch: &'a mut Batch<'_>, inner: F, ) -> Result where - F: FnOnce(&mut Extension<'_>) -> Result, + F: FnOnce(&mut ExtensionPair<'_>) -> Result, { - let sizes: (u64, u64, u64, u64); + let sizes: (u64, u64, u64); let res: Result; let rollback: bool; - // We want to use the current head of the most work chain unless - // we explicitly rewind the extension. let head = batch.head()?; + let header_head = batch.header_head()?; // create a child transaction so if the state is rolled back by itself, all // index saving can be undone @@ -466,17 +498,26 @@ where { trace!("Starting new txhashset extension."); + let header_pmmr = PMMR::at(&mut header_pmmr.backend, header_pmmr.last_pos); + let mut header_extension = HeaderExtension::new(header_pmmr, &child_batch, header_head); let mut extension = Extension::new(trees, &child_batch, head); - res = inner(&mut extension); + let mut extension_pair = ExtensionPair { + header_extension: &mut header_extension, + extension: &mut extension, + }; + res = inner(&mut extension_pair); - rollback = extension.rollback; - sizes = extension.sizes(); + rollback = extension_pair.extension.rollback; + sizes = extension_pair.extension.sizes(); } + // During an extension we do not want to modify the header_extension (and only read from it). + // So make sure we discard any changes to the header MMR backed. + header_pmmr.backend.discard(); + match res { Err(e) => { debug!("Error returned, discarding txhashset extension: {}", e); - trees.header_pmmr_h.backend.discard(); trees.output_pmmr_h.backend.discard(); trees.rproof_pmmr_h.backend.discard(); trees.kernel_pmmr_h.backend.discard(); @@ -485,21 +526,18 @@ where Ok(r) => { if rollback { trace!("Rollbacking txhashset extension. sizes {:?}", sizes); - trees.header_pmmr_h.backend.discard(); trees.output_pmmr_h.backend.discard(); trees.rproof_pmmr_h.backend.discard(); trees.kernel_pmmr_h.backend.discard(); } else { trace!("Committing txhashset extension. sizes {:?}", sizes); child_batch.commit()?; - // NOTE: The header MMR is readonly for a txhashset extension. - trees.header_pmmr_h.backend.discard(); trees.output_pmmr_h.backend.sync()?; trees.rproof_pmmr_h.backend.sync()?; trees.kernel_pmmr_h.backend.sync()?; - trees.output_pmmr_h.last_pos = sizes.1; - trees.rproof_pmmr_h.last_pos = sizes.2; - trees.kernel_pmmr_h.last_pos = sizes.3; + trees.output_pmmr_h.last_pos = sizes.0; + trees.rproof_pmmr_h.last_pos = sizes.1; + trees.kernel_pmmr_h.last_pos = sizes.2; } trace!("TxHashSet extension done."); @@ -508,70 +546,12 @@ where } } -/// Start a new sync MMR unit of work. This MMR tracks the sync_head. -/// This is used during header sync to validate batches of headers as they arrive -/// without needing to repeatedly rewind the header MMR that continues to track -/// the header_head as they diverge during sync. -pub fn sync_extending<'a, F, T>( - trees: &'a mut TxHashSet, - batch: &'a mut Batch<'_>, - inner: F, -) -> Result -where - F: FnOnce(&mut HeaderExtension<'_>) -> Result, -{ - let size: u64; - let res: Result; - let rollback: bool; - - // We want to use the current sync_head unless - // we explicitly rewind the extension. - let head = batch.get_sync_head()?; - - // create a child transaction so if the state is rolled back by itself, all - // index saving can be undone - let child_batch = batch.child()?; - { - trace!("Starting new txhashset sync_head extension."); - let pmmr = PMMR::at(&mut trees.sync_pmmr_h.backend, trees.sync_pmmr_h.last_pos); - let mut extension = HeaderExtension::new(pmmr, &child_batch, head); - - res = inner(&mut extension); - - rollback = extension.rollback; - size = extension.size(); - } - - match res { - Err(e) => { - debug!( - "Error returned, discarding txhashset sync_head extension: {}", - e - ); - trees.sync_pmmr_h.backend.discard(); - Err(e) - } - Ok(r) => { - if rollback { - trace!("Rollbacking txhashset sync_head extension. size {:?}", size); - trees.sync_pmmr_h.backend.discard(); - } else { - trace!("Committing txhashset sync_head extension. size {:?}", size); - child_batch.commit()?; - trees.sync_pmmr_h.backend.sync()?; - trees.sync_pmmr_h.last_pos = size; - } - trace!("TxHashSet sync_head extension done."); - Ok(r) - } - } -} - /// Start a new header MMR unit of work. This MMR tracks the header_head. /// This MMR can be extended individually beyond the other (output, rangeproof and kernel) MMRs /// to allow headers to be validated before we receive the full block data. pub fn header_extending<'a, F, T>( - trees: &'a mut TxHashSet, + handle: &'a mut PMMRHandle, + head: &Tip, batch: &'a mut Batch<'_>, inner: F, ) -> Result @@ -582,21 +562,12 @@ where let res: Result; let rollback: bool; - // We want to use the current head of header chain here. - // Caller is responsible for rewinding the header MMR back - // to a previous header as necessary when processing a fork. - let head = batch.header_head()?; - // create a child transaction so if the state is rolled back by itself, all // index saving can be undone let child_batch = batch.child()?; { - trace!("Starting new txhashset header extension."); - let pmmr = PMMR::at( - &mut trees.header_pmmr_h.backend, - trees.header_pmmr_h.last_pos, - ); - let mut extension = HeaderExtension::new(pmmr, &child_batch, head); + let pmmr = PMMR::at(&mut handle.backend, handle.last_pos); + let mut extension = HeaderExtension::new(pmmr, &child_batch, head.clone()); res = inner(&mut extension); rollback = extension.rollback; @@ -605,24 +576,17 @@ where match res { Err(e) => { - debug!( - "Error returned, discarding txhashset header extension: {}", - e - ); - trees.header_pmmr_h.backend.discard(); + handle.backend.discard(); Err(e) } Ok(r) => { if rollback { - trace!("Rollbacking txhashset header extension. size {:?}", size); - trees.header_pmmr_h.backend.discard(); + handle.backend.discard(); } else { - trace!("Committing txhashset header extension. size {:?}", size); child_batch.commit()?; - trees.header_pmmr_h.backend.sync()?; - trees.header_pmmr_h.last_pos = size; + handle.backend.sync()?; + handle.last_pos = size; } - trace!("TxHashSet header extension done."); Ok(r) } } @@ -674,8 +638,7 @@ impl<'a> HeaderExtension<'a> { pub fn get_header_by_height(&self, height: u64) -> Result { let pos = pmmr::insertion_to_pmmr_index(height + 1); if let Some(hash) = self.get_header_hash(pos) { - let header = self.batch.get_block_header(&hash)?; - Ok(header) + Ok(self.batch.get_block_header(&hash)?) } else { Err(ErrorKind::Other(format!("get header by height")).into()) } @@ -684,6 +647,9 @@ impl<'a> HeaderExtension<'a> { /// Compares the provided header to the header in the header MMR at that height. /// If these match we know the header is on the current chain. pub fn is_on_current_chain(&self, header: &BlockHeader) -> Result<(), Error> { + if header.height > self.head.height { + return Err(ErrorKind::Other(format!("not on current chain, out beyond")).into()); + } let chain_header = self.get_header_by_height(header.height)?; if chain_header.hash() == header.hash() { Ok(()) @@ -700,19 +666,21 @@ impl<'a> HeaderExtension<'a> { /// Apply a new header to the header MMR extension. /// This may be either the header MMR or the sync MMR depending on the /// extension. - pub fn apply_header(&mut self, header: &BlockHeader) -> Result { + pub fn apply_header(&mut self, header: &BlockHeader) -> Result<(), Error> { self.pmmr.push(header).map_err(&ErrorKind::TxHashSetErr)?; self.head = Tip::from_header(header); - Ok(self.root()?) + Ok(()) } /// Rewind the header extension to the specified header. /// Note the close relationship between header height and insertion index. pub fn rewind(&mut self, header: &BlockHeader) -> Result<(), Error> { debug!( - "Rewind header extension to {} at {}", + "Rewind header extension to {} at {} from {} at {}", header.hash(), - header.height + header.height, + self.head.hash(), + self.head.height, ); let header_pos = pmmr::insertion_to_pmmr_index(header.height + 1); @@ -726,61 +694,11 @@ impl<'a> HeaderExtension<'a> { Ok(()) } - /// Truncate the header MMR (rewind all the way back to pos 0). - /// Used when rebuilding the header MMR by reapplying all headers - /// including the genesis block header. - pub fn truncate(&mut self) -> Result<(), Error> { - debug!("Truncating header extension."); - self.pmmr.truncate().map_err(&ErrorKind::TxHashSetErr)?; - Ok(()) - } - /// The size of the header MMR. pub fn size(&self) -> u64 { self.pmmr.unpruned_size() } - /// TODO - think about how to optimize this. - /// Requires *all* header hashes to be iterated over in ascending order. - pub fn rebuild(&mut self, head: &Tip, genesis: &BlockHeader) -> Result<(), Error> { - debug!( - "About to rebuild header extension from {:?} to {:?}.", - genesis.hash(), - head.last_block_h, - ); - - let mut header_hashes = vec![]; - let mut current = self.batch.get_block_header(&head.last_block_h)?; - while current.height > 0 { - header_hashes.push(current.hash()); - current = self.batch.get_previous_header(¤t)?; - } - - header_hashes.reverse(); - - // Trucate the extension (back to pos 0). - self.truncate()?; - - // Re-apply the genesis header after truncation. - self.apply_header(&genesis)?; - - if header_hashes.len() > 0 { - debug!( - "Re-applying {} headers to extension, from {:?} to {:?}.", - header_hashes.len(), - header_hashes.first().unwrap(), - header_hashes.last().unwrap(), - ); - - for h in header_hashes { - let header = self.batch.get_block_header(&h)?; - self.validate_root(&header)?; - self.apply_header(&header)?; - } - } - Ok(()) - } - /// The root of the header MMR for convenience. pub fn root(&self) -> Result { Ok(self.pmmr.root().map_err(|_| ErrorKind::InvalidRoot)?) @@ -801,13 +719,28 @@ impl<'a> HeaderExtension<'a> { } } +/// An extension "pair" consisting of a txhashet extension (outputs, rangeproofs, kernels) +/// and the associated header extension. +pub struct ExtensionPair<'a> { + /// The header extension. + pub header_extension: &'a mut HeaderExtension<'a>, + /// The txhashset extension. + pub extension: &'a mut Extension<'a>, +} + +impl<'a> ExtensionPair<'a> { + /// Accessor for the batch associated with this extension pair. + pub fn batch(&mut self) -> &'a Batch<'a> { + self.extension.batch + } +} + /// Allows the application of new blocks on top of the sum trees in a /// reversible manner within a unit of work provided by the `extending` /// function. pub struct Extension<'a> { head: Tip, - header_pmmr: PMMR<'a, BlockHeader, PMMRBackend>, output_pmmr: PMMR<'a, Output, PMMRBackend>, rproof_pmmr: PMMR<'a, RangeProof, PMMRBackend>, kernel_pmmr: PMMR<'a, TxKernel, PMMRBackend>, @@ -853,10 +786,6 @@ impl<'a> Extension<'a> { fn new(trees: &'a mut TxHashSet, batch: &'a Batch<'_>, head: Tip) -> Extension<'a> { Extension { head, - header_pmmr: PMMR::at( - &mut trees.header_pmmr_h.backend, - trees.header_pmmr_h.last_pos, - ), output_pmmr: PMMR::at( &mut trees.output_pmmr_h.backend, trees.output_pmmr_h.last_pos, @@ -879,26 +808,18 @@ impl<'a> Extension<'a> { self.head.clone() } - /// Build a view of the current UTXO set based on the output PMMR. - pub fn utxo_view(&'a self) -> UTXOView<'a> { + /// Build a view of the current UTXO set based on the output PMMR + /// and the provided header extension. + pub fn utxo_view(&'a self, header_ext: &'a HeaderExtension<'a>) -> UTXOView<'a> { UTXOView::new( self.output_pmmr.readonly_pmmr(), - self.header_pmmr.readonly_pmmr(), + header_ext.pmmr.readonly_pmmr(), self.batch, ) } - /// Apply a new block to the existing state. - /// - /// Applies the following - - /// * header - /// * outputs - /// * inputs - /// * kernels - /// + /// Apply a new block to the current txhashet extension (output, rangeproof, kernel MMRs). pub fn apply_block(&mut self, b: &Block) -> Result<(), Error> { - self.apply_header(&b.header)?; - for out in b.outputs() { let pos = self.apply_output(out)?; // Update the (output_pos,height) index for the new output. @@ -988,7 +909,6 @@ impl<'a> Extension<'a> { ); } } - Ok(output_pos) } @@ -1000,42 +920,6 @@ impl<'a> Extension<'a> { Ok(()) } - fn apply_header(&mut self, header: &BlockHeader) -> Result<(), Error> { - self.header_pmmr - .push(header) - .map_err(&ErrorKind::TxHashSetErr)?; - Ok(()) - } - - /// Get the header hash for the specified pos from the underlying MMR backend. - fn get_header_hash(&self, pos: u64) -> Option { - self.header_pmmr.get_data(pos).map(|x| x.hash()) - } - - /// Get the header at the specified height based on the current state of the extension. - /// Derives the MMR pos from the height (insertion index) and retrieves the header hash. - /// Looks the header up in the db by hash. - pub fn get_header_by_height(&self, height: u64) -> Result { - let pos = pmmr::insertion_to_pmmr_index(height + 1); - if let Some(hash) = self.get_header_hash(pos) { - let header = self.batch.get_block_header(&hash)?; - Ok(header) - } else { - Err(ErrorKind::Other(format!("get header by height")).into()) - } - } - - /// Compares the provided header to the header in the header MMR at that height. - /// If these match we know the header is on the current chain. - pub fn is_on_current_chain(&self, header: &BlockHeader) -> Result<(), Error> { - let chain_header = self.get_header_by_height(header.height)?; - if chain_header.hash() == header.hash() { - Ok(()) - } else { - Err(ErrorKind::Other(format!("not on current chain")).into()) - } - } - /// Build a Merkle proof for the given output and the block /// this extension is currently referencing. /// Note: this relies on the MMR being stable even after pruning/compaction. @@ -1072,7 +956,13 @@ impl<'a> Extension<'a> { /// Rewinds the MMRs to the provided block, rewinding to the last output pos /// and last kernel pos of that block. pub fn rewind(&mut self, header: &BlockHeader) -> Result<(), Error> { - debug!("Rewind extension to {} at {}", header.hash(), header.height,); + debug!( + "Rewind extension to {} at {} from {} at {}", + header.hash(), + header.height, + self.head.hash(), + self.head.height + ); // We need to build bitmaps of added and removed output positions // so we can correctly rewind all operations applied to the output MMR @@ -1080,13 +970,10 @@ impl<'a> Extension<'a> { // undone during rewind). // Rewound output pos will be removed from the MMR. // Rewound input (spent) pos will be added back to the MMR. - let head_header = self.batch.get_block_header(&self.head.last_block_h)?; + let head_header = self.batch.get_block_header(&self.head.hash())?; let rewind_rm_pos = input_pos_to_rewind(header, &head_header, &self.batch)?; - let header_pos = pmmr::insertion_to_pmmr_index(header.height + 1); - self.rewind_to_pos( - header_pos, header.output_mmr_size, header.kernel_mmr_size, &rewind_rm_pos, @@ -1102,19 +989,10 @@ impl<'a> Extension<'a> { /// kernel we want to rewind to. fn rewind_to_pos( &mut self, - header_pos: u64, output_pos: u64, kernel_pos: u64, rewind_rm_pos: &Bitmap, ) -> Result<(), Error> { - debug!( - "txhashset: rewind_to_pos: header {}, output {}, kernel {}", - header_pos, output_pos, kernel_pos, - ); - - self.header_pmmr - .rewind(header_pos, &Bitmap::create()) - .map_err(&ErrorKind::TxHashSetErr)?; self.output_pmmr .rewind(output_pos, rewind_rm_pos) .map_err(&ErrorKind::TxHashSetErr)?; @@ -1131,10 +1009,6 @@ impl<'a> Extension<'a> { /// and kernel sum trees. pub fn roots(&self) -> Result { Ok(TxHashSetRoots { - header_root: self - .header_pmmr - .root() - .map_err(|_| ErrorKind::InvalidRoot)?, output_root: self .output_pmmr .root() @@ -1150,50 +1024,18 @@ impl<'a> Extension<'a> { }) } - /// Get the root of the current header MMR. - pub fn header_root(&self) -> Result { - Ok(self - .header_pmmr - .root() - .map_err(|_| ErrorKind::InvalidRoot)?) - } - - /// Validate the following MMR roots against the latest header applied - - /// * output - /// * rangeproof - /// * kernel - /// - /// Note we do not validate the header MMR root here as we need to validate - /// a header against the state of the MMR *prior* to applying it. - /// Each header commits to the root of the MMR of all previous headers, - /// not including the header itself. - /// + /// Validate the MMR (output, rangeproof, kernel) roots against the latest header. pub fn validate_roots(&self) -> Result<(), Error> { - // If we are validating the genesis block then we have no outputs or - // kernels. So we are done here. if self.head.height == 0 { return Ok(()); } - let head_header = self.batch.get_block_header(&self.head.last_block_h)?; - let roots = self.roots()?; - if roots.output_root != head_header.output_root - || roots.rproof_root != head_header.range_proof_root - || roots.kernel_root != head_header.kernel_root - { - Err(ErrorKind::InvalidRoot.into()) - } else { - Ok(()) - } - } - - /// Validate the provided header by comparing its prev_root to the - /// root of the current header MMR. - pub fn validate_header_root(&self, header: &BlockHeader) -> Result<(), Error> { - if header.height == 0 { - return Ok(()); - } - let roots = self.roots()?; - if roots.header_root != header.prev_root { + let head_header = self.batch.get_block_header(&self.head.hash())?; + let header_roots = TxHashSetRoots { + output_root: head_header.output_root, + rproof_root: head_header.range_proof_root, + kernel_root: head_header.kernel_root, + }; + if header_roots != self.roots()? { Err(ErrorKind::InvalidRoot.into()) } else { Ok(()) @@ -1202,24 +1044,16 @@ impl<'a> Extension<'a> { /// Validate the header, output and kernel MMR sizes against the block header. pub fn validate_sizes(&self) -> Result<(), Error> { - // If we are validating the genesis block then we have no outputs or - // kernels. So we are done here. if self.head.height == 0 { return Ok(()); } - let head_header = self.batch.get_block_header(&self.head.last_block_h)?; - let (header_mmr_size, output_mmr_size, rproof_mmr_size, kernel_mmr_size) = self.sizes(); - let expected_header_mmr_size = - pmmr::insertion_to_pmmr_index(self.head.height + 2).saturating_sub(1); - - if header_mmr_size != expected_header_mmr_size { - Err(ErrorKind::InvalidMMRSize.into()) - } else if output_mmr_size != head_header.output_mmr_size { - Err(ErrorKind::InvalidMMRSize.into()) - } else if kernel_mmr_size != head_header.kernel_mmr_size { - Err(ErrorKind::InvalidMMRSize.into()) - } else if output_mmr_size != rproof_mmr_size { + if ( + head_header.output_mmr_size, + head_header.output_mmr_size, + head_header.kernel_mmr_size, + ) != self.sizes() + { Err(ErrorKind::InvalidMMRSize.into()) } else { Ok(()) @@ -1230,9 +1064,6 @@ impl<'a> Extension<'a> { let now = Instant::now(); // validate all hashes and sums within the trees - if let Err(e) = self.header_pmmr.validate() { - return Err(ErrorKind::InvalidTxHashSet(e).into()); - } if let Err(e) = self.output_pmmr.validate() { return Err(ErrorKind::InvalidTxHashSet(e).into()); } @@ -1244,8 +1075,7 @@ impl<'a> Extension<'a> { } debug!( - "txhashset: validated the header {}, output {}, rproof {}, kernel {} mmrs, took {}s", - self.header_pmmr.unpruned_size(), + "txhashset: validated the output {}, rproof {}, kernel {} mmrs, took {}s", self.output_pmmr.unpruned_size(), self.rproof_pmmr.unpruned_size(), self.kernel_pmmr.unpruned_size(), @@ -1259,11 +1089,13 @@ impl<'a> Extension<'a> { /// This is an expensive operation as we need to retrieve all the UTXOs and kernels /// from the respective MMRs. /// For a significantly faster way of validating full kernel sums see BlockSums. - pub fn validate_kernel_sums(&self) -> Result<((Commitment, Commitment)), Error> { + pub fn validate_kernel_sums( + &self, + genesis: &BlockHeader, + ) -> Result<((Commitment, Commitment)), Error> { let now = Instant::now(); let head_header = self.batch.get_block_header(&self.head.last_block_h)?; - let genesis = self.get_header_by_height(0)?; let (utxo_sum, kernel_sum) = self.verify_kernel_sums( head_header.total_overage(genesis.kernel_mmr_size > 0), head_header.total_kernel_offset(), @@ -1281,6 +1113,7 @@ impl<'a> Extension<'a> { /// A "fast validation" will skip rangeproof verification and kernel signature verification. pub fn validate( &self, + genesis: &BlockHeader, fast_validation: bool, status: &dyn TxHashsetWriteStatus, ) -> Result<((Commitment, Commitment)), Error> { @@ -1295,7 +1128,7 @@ impl<'a> Extension<'a> { // The real magicking happens here. Sum of kernel excesses should equal // sum of unspent outputs minus total supply. - let (output_sum, kernel_sum) = self.validate_kernel_sums()?; + let (output_sum, kernel_sum) = self.validate_kernel_sums(genesis)?; // These are expensive verification step (skipped for "fast validation"). if !fast_validation { @@ -1336,57 +1169,6 @@ impl<'a> Extension<'a> { Ok(()) } - /// Rebuild the index of block height & MMR positions to the corresponding UTXOs. - /// This is a costly operation performed only when we receive a full new chain state. - /// Note: only called by compact. - pub fn rebuild_height_pos_index(&self) -> Result<(), Error> { - let now = Instant::now(); - - // clear it before rebuilding - self.batch.clear_output_pos_height()?; - - let mut outputs_pos: Vec<(Commitment, u64)> = vec![]; - for pos in self.output_pmmr.leaf_pos_iter() { - if let Some(out) = self.output_pmmr.get_data(pos) { - outputs_pos.push((out.commit, pos)); - } - } - let total_outputs = outputs_pos.len(); - if total_outputs == 0 { - debug!("rebuild_height_pos_index: nothing to be rebuilt"); - return Ok(()); - } else { - debug!( - "rebuild_height_pos_index: rebuilding {} outputs position & height...", - total_outputs - ); - } - - let max_height = self.head().height; - - let mut i = 0; - for search_height in 0..max_height { - let h = self.get_header_by_height(search_height + 1)?; - while i < total_outputs { - let (commit, pos) = outputs_pos[i]; - if pos > h.output_mmr_size { - // Note: MMR position is 1-based and not 0-based, so here must be '>' instead of '>=' - break; - } - self.batch.save_output_pos_height(&commit, pos, h.height)?; - trace!("rebuild_height_pos_index: {:?}", (commit, pos, h.height)); - i += 1; - } - } - - debug!( - "txhashset: rebuild_height_pos_index: {} UTXOs, took {}s", - total_outputs, - now.elapsed().as_secs(), - ); - Ok(()) - } - /// Force the rollback of this extension, no matter the result pub fn force_rollback(&mut self) { self.rollback = true; @@ -1416,9 +1198,8 @@ impl<'a> Extension<'a> { } /// Sizes of each of the sum trees - pub fn sizes(&self) -> (u64, u64, u64, u64) { + pub fn sizes(&self) -> (u64, u64, u64) { ( - self.header_pmmr.unpruned_size(), self.output_pmmr.unpruned_size(), self.rproof_pmmr.unpruned_size(), self.kernel_pmmr.unpruned_size(), @@ -1668,31 +1449,17 @@ pub fn clean_txhashset_folder(root_dir: &PathBuf) { } } -/// Clean the header folder -pub fn clean_header_folder(root_dir: &PathBuf) { - let header_path = root_dir.clone().join(HEADERHASHSET_SUBDIR); - if header_path.exists() { - if let Err(e) = fs::remove_dir_all(header_path.clone()) { - warn!("clean_header_folder: fail on {:?}. err: {}", header_path, e); - } - } -} - /// Given a block header to rewind to and the block header at the /// head of the current chain state, we need to calculate the positions /// of all inputs (spent outputs) we need to "undo" during a rewind. /// We do this by leveraging the "block_input_bitmap" cache and OR'ing /// the set of bitmaps together for the set of blocks being rewound. -pub fn input_pos_to_rewind( +fn input_pos_to_rewind( block_header: &BlockHeader, head_header: &BlockHeader, batch: &Batch<'_>, ) -> Result { - if head_header.height < block_header.height { - debug!( - "input_pos_to_rewind: {} < {}, nothing to rewind", - head_header.height, block_header.height - ); + if head_header.height <= block_header.height { return Ok(Bitmap::create()); } diff --git a/chain/src/txhashset/utxo_view.rs b/chain/src/txhashset/utxo_view.rs index 2823acbe8..28cb5504c 100644 --- a/chain/src/txhashset/utxo_view.rs +++ b/chain/src/txhashset/utxo_view.rs @@ -111,7 +111,7 @@ impl<'a> UTXOView<'a> { .unwrap_or(0); if pos > 0 { - // If we have not yet reached 1,000 / 1,440 blocks then + // If we have not yet reached 1440 blocks then // we can fail immediately as coinbase cannot be mature. if height < global::coinbase_maturity() { return Err(ErrorKind::ImmatureCoinbase.into()); diff --git a/chain/src/types.rs b/chain/src/types.rs index 0e54a22d5..c17f6ada7 100644 --- a/chain/src/types.rs +++ b/chain/src/types.rs @@ -205,10 +205,8 @@ impl TxHashsetWriteStatus for SyncState { /// A helper to hold the roots of the txhashset in order to keep them /// readable. -#[derive(Debug)] +#[derive(Debug, PartialEq)] pub struct TxHashSetRoots { - /// Header root - pub header_root: Hash, /// Output root pub output_root: Hash, /// Range Proof root diff --git a/chain/tests/store_indices.rs b/chain/tests/store_indices.rs index 36b010c2a..08a3c2a58 100644 --- a/chain/tests/store_indices.rs +++ b/chain/tests/store_indices.rs @@ -13,7 +13,6 @@ // limitations under the License. use self::core::core::hash::Hashed; -use grin_chain as chain; use grin_core as core; use grin_util as util; diff --git a/chain/tests/test_txhashset.rs b/chain/tests/test_txhashset.rs index 618747f85..419e5bfaf 100644 --- a/chain/tests/test_txhashset.rs +++ b/chain/tests/test_txhashset.rs @@ -84,8 +84,6 @@ fn test_unexpected_zip() { ); assert!(txhashset::zip_read(db_root.clone(), &head).is_ok()); - let txhashset_zip_path = - Path::new(&db_root).join(format!("txhashset_zip_{}", head.hash().to_string())); let _ = fs::remove_dir_all( Path::new(&db_root).join(format!("txhashset_zip_{}", head.hash().to_string())), ); diff --git a/core/src/core/pmmr/pmmr.rs b/core/src/core/pmmr/pmmr.rs index 6e19addf2..07380379e 100644 --- a/core/src/core/pmmr/pmmr.rs +++ b/core/src/core/pmmr/pmmr.rs @@ -216,13 +216,6 @@ where Ok(()) } - /// Truncate the MMR by rewinding back to empty state. - pub fn truncate(&mut self) -> Result<(), String> { - self.backend.rewind(0, &Bitmap::create())?; - self.last_pos = 0; - Ok(()) - } - /// Rewind the PMMR to a previous position, as if all push operations after /// that had been canceled. Expects a position in the PMMR to rewind and /// bitmaps representing the positions added and removed that we want to diff --git a/core/src/core/pmmr/readonly_pmmr.rs b/core/src/core/pmmr/readonly_pmmr.rs index 3a54a50f0..89b37e21e 100644 --- a/core/src/core/pmmr/readonly_pmmr.rs +++ b/core/src/core/pmmr/readonly_pmmr.rs @@ -86,6 +86,11 @@ where } } + /// Iterator over current (unpruned, unremoved) leaf positions. + pub fn leaf_pos_iter(&self) -> impl Iterator + '_ { + self.backend.leaf_pos_iter() + } + /// Is the MMR empty? pub fn is_empty(&self) -> bool { self.last_pos == 0 diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index 9d8a11850..aea5b9042 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -310,8 +310,8 @@ impl p2p::ChainAdapter for NetToChainAdapter { let max_height = self.chain().header_head()?.height; - let txhashset = self.chain().txhashset(); - let txhashset = txhashset.read(); + let header_pmmr = self.chain().header_pmmr(); + let header_pmmr = header_pmmr.read(); // looks like we know one, getting as many following headers as allowed let hh = header.height; @@ -321,7 +321,8 @@ impl p2p::ChainAdapter for NetToChainAdapter { break; } - if let Ok(header) = txhashset.get_header_by_height(h) { + if let Ok(hash) = header_pmmr.get_header_hash_by_height(h) { + let header = self.chain().get_block_header(&hash)?; headers.push(header); } else { error!("Failed to locate headers successfully."); @@ -498,14 +499,16 @@ impl NetToChainAdapter { // Find the first locator hash that refers to a known header on our main chain. fn find_common_header(&self, locator: &[Hash]) -> Option { - let txhashset = self.chain().txhashset(); - let txhashset = txhashset.read(); + let header_pmmr = self.chain().header_pmmr(); + let header_pmmr = header_pmmr.read(); for hash in locator { if let Ok(header) = self.chain().get_block_header(&hash) { - if let Ok(header_at_height) = txhashset.get_header_by_height(header.height) { - if header.hash() == header_at_height.hash() { - return Some(header); + if let Ok(hash_at_height) = header_pmmr.get_header_hash_by_height(header.height) { + if let Ok(header_at_height) = self.chain().get_block_header(&hash_at_height) { + if header.hash() == header_at_height.hash() { + return Some(header); + } } } } diff --git a/servers/src/grin/server.rs b/servers/src/grin/server.rs index a5e61e511..46319e23c 100644 --- a/servers/src/grin/server.rs +++ b/servers/src/grin/server.rs @@ -435,8 +435,8 @@ impl Server { let tip_height = self.head()?.height as i64; let mut height = tip_height as i64 - last_blocks.len() as i64 + 1; - let txhashset = self.chain.txhashset(); - let txhashset = txhashset.read(); + let header_pmmr = self.chain.header_pmmr(); + let header_pmmr = header_pmmr.read(); let diff_entries: Vec = last_blocks .windows(2) @@ -449,8 +449,8 @@ impl Server { // Use header hash if real header. // Default to "zero" hash if synthetic header_info. let hash = if height >= 0 { - if let Ok(header) = txhashset.get_header_by_height(height as u64) { - header.hash() + if let Ok(hash) = header_pmmr.get_header_hash_by_height(height as u64) { + hash } else { ZERO_HASH } diff --git a/src/bin/tui/ui.rs b/src/bin/tui/ui.rs index 60071f003..4b8f8a14e 100644 --- a/src/bin/tui/ui.rs +++ b/src/bin/tui/ui.rs @@ -182,9 +182,10 @@ impl Controller { } if Utc::now().timestamp() > next_stat_update { - let stats = server.get_server_stats().unwrap(); - self.ui.ui_tx.send(UIMessage::UpdateStatus(stats)).unwrap(); next_stat_update = Utc::now().timestamp() + stat_update_interval; + if let Ok(stats) = server.get_server_stats() { + self.ui.ui_tx.send(UIMessage::UpdateStatus(stats)).unwrap(); + } } } server.stop();