From 8b7a20f8b03488e1e60df005068715bfb943b088 Mon Sep 17 00:00:00 2001 From: Antioch Peverell Date: Mon, 15 Oct 2018 17:16:34 +0100 Subject: [PATCH] The Header MMR (One MMR To Rule Them All) (#1716) * header MMR in use within txhashset itself works with fast sync not yet in place for initial header sync * add the (currently unused) sync_head mmr * use sync MMR during fast sync rebuild header MMR after we validate full txhashset after download * support missing header MMR (rebuild as necessary) for legacy nodes * rustfmt * comments/docs * rustfmt * cleanup DBBackend * cleanup DBBackend * cleanup * rename to HashOnly * rustfmt * cleanup backend.append() * simply pmmr append api no need to pass position when appending * cleanup * simplify vec_backend to match simpler append api * rustfmt * docs/comments * rustfmt * cleanup --- api/src/types.rs | 6 +- chain/src/chain.rs | 161 +++++--- chain/src/pipe.rs | 48 ++- chain/src/txhashset/txhashset.rs | 508 +++++++++++++++++++++++--- chain/src/types.rs | 4 +- core/src/core/block.rs | 10 +- core/src/core/hash.rs | 6 +- core/src/core/pmmr/backend.rs | 10 +- core/src/core/pmmr/db_pmmr.rs | 173 +++++++++ core/src/core/pmmr/mod.rs | 2 + core/src/core/pmmr/pmmr.rs | 9 +- core/src/core/pmmr/rewindable_pmmr.rs | 2 +- core/tests/pmmr.rs | 16 +- core/tests/vec_backend/mod.rs | 56 +-- servers/src/grin/sync/header_sync.rs | 6 + store/src/lib.rs | 25 -- store/src/pmmr.rs | 100 +++-- store/src/rm_log.rs | 3 +- store/src/types.rs | 70 ++++ 19 files changed, 978 insertions(+), 237 deletions(-) create mode 100644 core/src/core/pmmr/db_pmmr.rs diff --git a/api/src/types.rs b/api/src/types.rs index 1e3818d84..eefd39896 100644 --- a/api/src/types.rs +++ b/api/src/types.rs @@ -97,9 +97,9 @@ impl TxHashSet { pub fn from_head(head: Arc) -> TxHashSet { let roots = head.get_txhashset_roots(); TxHashSet { - output_root_hash: roots.0.to_hex(), - range_proof_root_hash: roots.1.to_hex(), - kernel_root_hash: roots.2.to_hex(), + output_root_hash: roots.output_root.to_hex(), + range_proof_root_hash: roots.rproof_root.to_hex(), + kernel_root_hash: roots.kernel_root.to_hex(), } } } diff --git a/chain/src/chain.rs b/chain/src/chain.rs index 53d31f952..948ab4fa7 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -35,7 +35,7 @@ use grin_store::Error::NotFoundErr; use pipe; use store; use txhashset; -use types::{ChainAdapter, NoStatus, Options, Tip, TxHashsetWriteStatus}; +use types::{ChainAdapter, NoStatus, Options, Tip, TxHashSetRoots, TxHashsetWriteStatus}; use util::secp::pedersen::{Commitment, RangeProof}; use util::LOGGER; @@ -153,6 +153,7 @@ pub struct Chain { // POW verification function pow_verifier: fn(&BlockHeader, u8) -> Result<(), pow::Error>, archive_mode: bool, + genesis: BlockHeader, } unsafe impl Sync for Chain {} @@ -178,7 +179,7 @@ impl Chain { // open the txhashset, creating a new one if necessary let mut txhashset = txhashset::TxHashSet::open(db_root.clone(), store.clone(), None)?; - setup_head(genesis, store.clone(), &mut txhashset)?; + setup_head(genesis.clone(), store.clone(), &mut txhashset)?; let head = store.head()?; debug!( @@ -199,6 +200,7 @@ impl Chain { verifier_cache, block_hashes_cache: Arc::new(RwLock::new(LruCache::new(HASHES_CACHE_SIZE))), archive_mode, + genesis: genesis.header.clone(), }) } @@ -246,54 +248,52 @@ impl Chain { Ok(head) } - Err(e) => { - match e.kind() { - ErrorKind::Orphan => { - let block_hash = b.hash(); - let orphan = Orphan { - block: b, - opts: opts, - added: Instant::now(), - }; + Err(e) => match e.kind() { + ErrorKind::Orphan => { + let block_hash = b.hash(); + let orphan = Orphan { + block: b, + opts: opts, + added: Instant::now(), + }; - &self.orphans.add(orphan); + &self.orphans.add(orphan); - debug!( - LOGGER, - "process_block: orphan: {:?}, # orphans {}{}", - block_hash, - self.orphans.len(), - if self.orphans.len_evicted() > 0 { - format!(", # evicted {}", self.orphans.len_evicted()) - } else { - String::new() - }, - ); - Err(ErrorKind::Orphan.into()) - } - ErrorKind::Unfit(ref msg) => { - debug!( - LOGGER, - "Block {} at {} is unfit at this time: {}", - b.hash(), - b.header.height, - msg - ); - Err(ErrorKind::Unfit(msg.clone()).into()) - } - _ => { - info!( - LOGGER, - "Rejected block {} at {}: {:?}", - b.hash(), - b.header.height, - e - ); - add_to_hash_cache(b.hash()); - Err(ErrorKind::Other(format!("{:?}", e).to_owned()).into()) - } + debug!( + LOGGER, + "process_block: orphan: {:?}, # orphans {}{}", + block_hash, + self.orphans.len(), + if self.orphans.len_evicted() > 0 { + format!(", # evicted {}", self.orphans.len_evicted()) + } else { + String::new() + }, + ); + Err(ErrorKind::Orphan.into()) } - } + ErrorKind::Unfit(ref msg) => { + debug!( + LOGGER, + "Block {} at {} is unfit at this time: {}", + b.hash(), + b.header.height, + msg + ); + Err(ErrorKind::Unfit(msg.clone()).into()) + } + _ => { + info!( + LOGGER, + "Rejected block {} at {}: {:?}", + b.hash(), + b.header.height, + e + ); + add_to_hash_cache(b.hash()); + Err(ErrorKind::Other(format!("{:?}", e).to_owned()).into()) + } + }, } } @@ -494,11 +494,15 @@ impl Chain { Ok((extension.roots(), extension.sizes())) })?; + // Carefully destructure these correctly... + // TODO - Maybe sizes should be a struct to add some type safety here... + let (_, output_mmr_size, _, kernel_mmr_size) = sizes; + b.header.output_root = roots.output_root; b.header.range_proof_root = roots.rproof_root; b.header.kernel_root = roots.kernel_root; - b.header.output_mmr_size = sizes.0; - b.header.kernel_mmr_size = sizes.2; + b.header.output_mmr_size = output_mmr_size; + b.header.kernel_mmr_size = kernel_mmr_size; Ok(()) } @@ -526,7 +530,7 @@ impl Chain { } /// Returns current txhashset roots - pub fn get_txhashset_roots(&self) -> (Hash, Hash, Hash) { + pub fn get_txhashset_roots(&self) -> TxHashSetRoots { let mut txhashset = self.txhashset.write().unwrap(); txhashset.roots() } @@ -594,6 +598,40 @@ impl Chain { Ok(()) } + /// Rebuild the sync MMR based on current header_head. + /// We rebuild the sync MMR when first entering sync mode so ensure we + /// have an MMR we can safely rewind based on the headers received from a peer. + /// TODO - think about how to optimize this. + pub fn rebuild_sync_mmr(&self, head: &Tip) -> Result<(), Error> { + let mut txhashset = self.txhashset.write().unwrap(); + let mut batch = self.store.batch()?; + txhashset::sync_extending(&mut txhashset, &mut batch, |extension| { + extension.rebuild(head, &self.genesis)?; + Ok(()) + })?; + batch.commit()?; + Ok(()) + } + + /// Rebuild the header MMR based on current header_head. + /// We rebuild the header MMR after receiving a txhashset from a peer. + /// The txhashset contains output, rangeproof and kernel MMRs but we construct + /// the header MMR locally based on headers from our db. + /// TODO - think about how to optimize this. + fn rebuild_header_mmr( + &self, + head: &Tip, + txhashset: &mut txhashset::TxHashSet, + ) -> Result<(), Error> { + let mut batch = self.store.batch()?; + txhashset::header_extending(txhashset, &mut batch, |extension| { + extension.rebuild(head, &self.genesis)?; + Ok(()) + })?; + batch.commit()?; + Ok(()) + } + /// Writes a reading view on a txhashset state that's been provided to us. /// If we're willing to accept that new state, the data stream will be /// read as a zip file, unzipped and the resulting state files should be @@ -621,6 +659,10 @@ impl Chain { let mut txhashset = txhashset::TxHashSet::open(self.db_root.clone(), self.store.clone(), Some(&header))?; + // The txhashset.zip contains the output, rangeproof and kernel MMRs. + // We must rebuild the header MMR ourselves based on the headers in our db. + self.rebuild_header_mmr(&Tip::from_block(&header), &mut txhashset)?; + // Validate the full kernel history (kernel MMR root for every block header). self.validate_kernel_history(&header, &txhashset)?; @@ -983,6 +1025,15 @@ fn setup_head( // to match the provided block header. let header = batch.get_block_header(&head.last_block_h)?; + // If we have no header MMR then rebuild as necessary. + // Supports old nodes with no header MMR. + txhashset::header_extending(txhashset, &mut batch, |extension| { + if extension.size() == 0 { + extension.rebuild(&head, &genesis.header)?; + } + Ok(()) + })?; + let res = txhashset::extending(txhashset, &mut batch, |extension| { extension.rewind(&header)?; extension.validate_roots()?; @@ -1042,17 +1093,15 @@ fn setup_head( batch.save_head(&tip)?; batch.setup_height(&genesis.header, &tip)?; + // Apply the genesis block to our empty MMRs. txhashset::extending(txhashset, &mut batch, |extension| { extension.apply_block(&genesis)?; - - // Save the block_sums to the db for use later. - extension - .batch - .save_block_sums(&genesis.hash(), &BlockSums::default())?; - Ok(()) })?; + // Save the block_sums to the db for use later. + batch.save_block_sums(&genesis.hash(), &BlockSums::default())?; + info!(LOGGER, "chain: init: saved genesis: {:?}", genesis.hash()); } Err(e) => return Err(ErrorKind::StoreErr(e, "chain init load head".to_owned()))?, diff --git a/chain/src/pipe.rs b/chain/src/pipe.rs index 92839e894..4530a6cd0 100644 --- a/chain/src/pipe.rs +++ b/chain/src/pipe.rs @@ -91,10 +91,19 @@ pub fn process_block(b: &Block, ctx: &mut BlockContext) -> Result, E // Check if this block is already know due it being in the current set of orphan blocks. check_known_orphans(&b.header, ctx)?; + + // Check we have *this* block in the store. + // Stop if we have processed this block previously (it is in the store). + // This is more expensive than the earlier check_known() as we hit the store. + check_known_store(&b.header, ctx)?; } // Header specific processing. - handle_block_header(&b.header, ctx)?; + { + validate_header(&b.header, ctx)?; + add_block_header(&b.header, ctx)?; + update_header_head(&b.header, ctx)?; + } // Check if are processing the "next" block relative to the current chain head. let head = ctx.batch.head()?; @@ -104,11 +113,6 @@ pub fn process_block(b: &Block, ctx: &mut BlockContext) -> Result, E // * special case where this is the first fast sync full block // Either way we can proceed (and we know the block is new and unprocessed). } else { - // Check we have *this* block in the store. - // Stop if we have processed this block previously (it is in the store). - // This is more expensive than the earlier check_known() as we hit the store. - check_known_store(&b.header, ctx)?; - // At this point it looks like this is a new block that we have not yet processed. // Check we have the *previous* block in the store. // If we do not then treat this block as an orphan. @@ -128,7 +132,7 @@ pub fn process_block(b: &Block, ctx: &mut BlockContext) -> Result, E if is_next_block(&b.header, &head) { // No need to rewind if we are processing the next block. } else { - // Rewind the re-apply blocks on the forked chain to + // Rewind and re-apply blocks on the forked chain to // put the txhashset in the correct forked state // (immediately prior to this new block). rewind_and_apply_fork(b, extension)?; @@ -174,12 +178,8 @@ pub fn process_block(b: &Block, ctx: &mut BlockContext) -> Result, E // Add the newly accepted block and header to our index. add_block(b, ctx)?; - // Update the chain head (and header_head) if total work is increased. - let res = { - let _ = update_header_head(&b.header, ctx)?; - let res = update_head(b, ctx)?; - res - }; + // Update the chain head if total work is increased. + let res = update_head(b, ctx)?; Ok(res) } @@ -209,8 +209,22 @@ pub fn sync_block_headers( if !all_known { for header in headers { - handle_block_header(header, ctx)?; + validate_header(header, ctx)?; + add_block_header(header, ctx)?; } + + let first_header = headers.first().unwrap(); + let prev_header = ctx.batch.get_block_header(&first_header.previous)?; + txhashset::sync_extending(&mut ctx.txhashset, &mut ctx.batch, |extension| { + // Optimize this if "next" header + extension.rewind(&prev_header)?; + + for header in headers { + extension.apply_header(header)?; + } + + Ok(()) + })?; } // Update header_head (if most work) and sync_head (regardless) in all cases, @@ -231,12 +245,6 @@ pub fn sync_block_headers( } } -fn handle_block_header(header: &BlockHeader, ctx: &mut BlockContext) -> Result<(), Error> { - validate_header(header, ctx)?; - add_block_header(header, ctx)?; - Ok(()) -} - /// Process block header as part of "header first" block propagation. /// We validate the header but we do not store it or update header head based /// on this. We will update these once we get the block back after requesting diff --git a/chain/src/txhashset/txhashset.rs b/chain/src/txhashset/txhashset.rs index a5eaa48dc..7670fbfd3 100644 --- a/chain/src/txhashset/txhashset.rs +++ b/chain/src/txhashset/txhashset.rs @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Utility structs to handle the 3 hashtrees (output, range proof, -//! kernel) more conveniently and transactionally. +//! Utility structs to handle the 3 MMRs (output, rangeproof, +//! kernel) along the overall header MMR conveniently and transactionally. use std::collections::HashSet; use std::fs::{self, File}; @@ -28,26 +28,47 @@ use util::secp::pedersen::{Commitment, RangeProof}; use core::core::committed::Committed; use core::core::hash::{Hash, Hashed}; use core::core::merkle_proof::MerkleProof; -use core::core::pmmr::{self, ReadonlyPMMR, RewindablePMMR, PMMR}; +use core::core::pmmr::{self, ReadonlyPMMR, RewindablePMMR, DBPMMR, PMMR}; use core::core::{Block, BlockHeader, Input, Output, OutputFeatures, OutputIdentifier, TxKernel}; use core::global; use core::ser::{PMMRIndexHashable, PMMRable}; use error::{Error, ErrorKind}; use grin_store; -use grin_store::pmmr::{PMMRBackend, PMMR_FILES}; +use grin_store::pmmr::{HashOnlyMMRBackend, PMMRBackend, PMMR_FILES}; use grin_store::types::prune_noop; use store::{Batch, ChainStore}; use txhashset::{RewindableKernelView, UTXOView}; -use types::{TxHashSetRoots, TxHashsetWriteStatus}; +use types::{Tip, TxHashSetRoots, TxHashsetWriteStatus}; use util::{file, secp_static, zip, LOGGER}; +const HEADERHASHSET_SUBDIR: &'static str = "header"; const TXHASHSET_SUBDIR: &'static str = "txhashset"; + +const HEADER_HEAD_SUBDIR: &'static str = "header_head"; +const SYNC_HEAD_SUBDIR: &'static str = "sync_head"; + const OUTPUT_SUBDIR: &'static str = "output"; const RANGE_PROOF_SUBDIR: &'static str = "rangeproof"; const KERNEL_SUBDIR: &'static str = "kernel"; + const TXHASHSET_ZIP: &'static str = "txhashset_snapshot.zip"; +struct HashOnlyMMRHandle { + backend: HashOnlyMMRBackend, + last_pos: u64, +} + +impl HashOnlyMMRHandle { + fn new(root_dir: &str, sub_dir: &str, file_name: &str) -> Result { + let path = Path::new(root_dir).join(sub_dir).join(file_name); + fs::create_dir_all(path.clone())?; + let backend = HashOnlyMMRBackend::new(path.to_str().unwrap().to_string())?; + let last_pos = backend.unpruned_size()?; + Ok(HashOnlyMMRHandle { backend, last_pos }) + } +} + struct PMMRHandle where T: PMMRable, @@ -61,19 +82,17 @@ where T: PMMRable + ::std::fmt::Debug, { fn new( - root_dir: String, + root_dir: &str, + sub_dir: &str, file_name: &str, prunable: bool, header: Option<&BlockHeader>, ) -> Result, Error> { - let path = Path::new(&root_dir).join(TXHASHSET_SUBDIR).join(file_name); + let path = Path::new(root_dir).join(sub_dir).join(file_name); fs::create_dir_all(path.clone())?; - let be = PMMRBackend::new(path.to_str().unwrap().to_string(), prunable, header)?; - let sz = be.unpruned_size()?; - Ok(PMMRHandle { - backend: be, - last_pos: sz, - }) + let backend = PMMRBackend::new(path.to_str().unwrap().to_string(), prunable, header)?; + let last_pos = backend.unpruned_size()?; + Ok(PMMRHandle { backend, last_pos }) } } @@ -86,8 +105,24 @@ where /// guaranteed to indicate whether an output is spent or not. The index /// may have commitments that have already been spent, even with /// pruning enabled. - pub struct TxHashSet { + /// Header MMR to support the header_head chain. + /// This is rewound and applied transactionally with the + /// output, rangeproof and kernel MMRs during an extension or a + /// readonly_extension. + /// It can also be rewound and applied separately via a header_extension. + /// Note: the header MMR is backed by the database maintains just the hash file. + header_pmmr_h: HashOnlyMMRHandle, + + /// Header MMR to support exploratory sync_head. + /// The header_head and sync_head chains can diverge so we need to maintain + /// multiple header MMRs during the sync process. + /// + /// Note: this is rewound and applied separately to the other MMRs + /// via a "sync_extension". + /// Note: the sync MMR is backed by the database and maintains just the hash file. + sync_pmmr_h: HashOnlyMMRHandle, + output_pmmr_h: PMMRHandle, rproof_pmmr_h: PMMRHandle, kernel_pmmr_h: PMMRHandle, @@ -104,9 +139,33 @@ impl TxHashSet { header: Option<&BlockHeader>, ) -> Result { Ok(TxHashSet { - output_pmmr_h: PMMRHandle::new(root_dir.clone(), OUTPUT_SUBDIR, true, header)?, - rproof_pmmr_h: PMMRHandle::new(root_dir.clone(), RANGE_PROOF_SUBDIR, true, header)?, - kernel_pmmr_h: PMMRHandle::new(root_dir.clone(), KERNEL_SUBDIR, false, None)?, + header_pmmr_h: HashOnlyMMRHandle::new( + &root_dir, + HEADERHASHSET_SUBDIR, + HEADER_HEAD_SUBDIR, + )?, + sync_pmmr_h: HashOnlyMMRHandle::new(&root_dir, HEADERHASHSET_SUBDIR, SYNC_HEAD_SUBDIR)?, + output_pmmr_h: PMMRHandle::new( + &root_dir, + TXHASHSET_SUBDIR, + OUTPUT_SUBDIR, + true, + header, + )?, + rproof_pmmr_h: PMMRHandle::new( + &root_dir, + TXHASHSET_SUBDIR, + RANGE_PROOF_SUBDIR, + true, + header, + )?, + kernel_pmmr_h: PMMRHandle::new( + &root_dir, + TXHASHSET_SUBDIR, + KERNEL_SUBDIR, + false, + None, + )?, commit_index, }) } @@ -186,16 +245,23 @@ impl TxHashSet { rproof_pmmr.elements_from_insertion_index(start_index, max_count) } - /// Get sum tree roots - /// TODO: Return data instead of hashes - pub fn roots(&mut self) -> (Hash, Hash, Hash) { + /// Get MMR roots. + pub fn roots(&mut self) -> TxHashSetRoots { + let header_pmmr: DBPMMR = + DBPMMR::at(&mut self.header_pmmr_h.backend, self.header_pmmr_h.last_pos); let output_pmmr: PMMR = PMMR::at(&mut self.output_pmmr_h.backend, self.output_pmmr_h.last_pos); let rproof_pmmr: PMMR = PMMR::at(&mut self.rproof_pmmr_h.backend, self.rproof_pmmr_h.last_pos); let kernel_pmmr: PMMR = PMMR::at(&mut self.kernel_pmmr_h.backend, self.kernel_pmmr_h.last_pos); - (output_pmmr.root(), rproof_pmmr.root(), kernel_pmmr.root()) + + TxHashSetRoots { + header_root: header_pmmr.root(), + output_root: output_pmmr.root(), + rproof_root: rproof_pmmr.root(), + kernel_root: kernel_pmmr.root(), + } } /// build a new merkle proof for the given position @@ -255,23 +321,28 @@ pub fn extending_readonly<'a, F, T>(trees: &'a mut TxHashSet, inner: F) -> Resul where F: FnOnce(&mut Extension) -> Result, { - let res: Result; - { - let commit_index = trees.commit_index.clone(); - let batch = commit_index.batch()?; + let commit_index = trees.commit_index.clone(); + let batch = commit_index.batch()?; - // We want to use the current head of the most work chain unless - // we explicitly rewind the extension. - let header = batch.head_header()?; + // We want to use the current head of the most work chain unless + // we explicitly rewind the extension. + let header = batch.head_header()?; - trace!(LOGGER, "Starting new txhashset (readonly) extension."); + trace!(LOGGER, "Starting new txhashset (readonly) extension."); + + let res = { let mut extension = Extension::new(trees, &batch, header); extension.force_rollback(); - res = inner(&mut extension); - } + + // TODO - header_mmr may be out ahead via the header_head + // TODO - do we need to handle this via an explicit rewind on the header_mmr? + + inner(&mut extension) + }; trace!(LOGGER, "Rollbacking txhashset (readonly) extension."); + trees.header_pmmr_h.backend.discard(); trees.output_pmmr_h.backend.discard(); trees.rproof_pmmr_h.backend.discard(); trees.kernel_pmmr_h.backend.discard(); @@ -340,7 +411,7 @@ pub fn extending<'a, F, T>( where F: FnOnce(&mut Extension) -> Result, { - let sizes: (u64, u64, u64); + let sizes: (u64, u64, u64, u64); let res: Result; let rollback: bool; @@ -353,6 +424,9 @@ where let child_batch = batch.child()?; { trace!(LOGGER, "Starting new txhashset extension."); + + // TODO - header_mmr may be out ahead via the header_head + // TODO - do we need to handle this via an explicit rewind on the header_mmr? let mut extension = Extension::new(trees, &child_batch, header); res = inner(&mut extension); @@ -366,6 +440,7 @@ where LOGGER, "Error returned, discarding txhashset extension: {}", e ); + trees.header_pmmr_h.backend.discard(); trees.output_pmmr_h.backend.discard(); trees.rproof_pmmr_h.backend.discard(); trees.kernel_pmmr_h.backend.discard(); @@ -374,18 +449,21 @@ where Ok(r) => { if rollback { trace!(LOGGER, "Rollbacking txhashset extension. sizes {:?}", sizes); + trees.header_pmmr_h.backend.discard(); trees.output_pmmr_h.backend.discard(); trees.rproof_pmmr_h.backend.discard(); trees.kernel_pmmr_h.backend.discard(); } else { trace!(LOGGER, "Committing txhashset extension. sizes {:?}", sizes); child_batch.commit()?; + trees.header_pmmr_h.backend.sync()?; trees.output_pmmr_h.backend.sync()?; trees.rproof_pmmr_h.backend.sync()?; trees.kernel_pmmr_h.backend.sync()?; - trees.output_pmmr_h.last_pos = sizes.0; - trees.rproof_pmmr_h.last_pos = sizes.1; - trees.kernel_pmmr_h.last_pos = sizes.2; + trees.header_pmmr_h.last_pos = sizes.0; + trees.output_pmmr_h.last_pos = sizes.1; + trees.rproof_pmmr_h.last_pos = sizes.2; + trees.kernel_pmmr_h.last_pos = sizes.3; } trace!(LOGGER, "TxHashSet extension done."); @@ -394,12 +472,266 @@ where } } +/// Start a new sync MMR unit of work. This MMR tracks the sync_head. +/// This is used during header sync to validate batches of headers as they arrive +/// without needing to repeatedly rewind the header MMR that continues to track +/// the header_head as they diverge during sync. +pub fn sync_extending<'a, F, T>( + trees: &'a mut TxHashSet, + batch: &'a mut Batch, + inner: F, +) -> Result +where + F: FnOnce(&mut HeaderExtension) -> Result, +{ + let size: u64; + let res: Result; + let rollback: bool; + + // We want to use the current sync_head unless + // we explicitly rewind the extension. + let head = batch.get_sync_head()?; + let header = batch.get_block_header(&head.last_block_h)?; + + // create a child transaction so if the state is rolled back by itself, all + // index saving can be undone + let child_batch = batch.child()?; + { + trace!(LOGGER, "Starting new txhashset sync_head extension."); + let pmmr = DBPMMR::at(&mut trees.sync_pmmr_h.backend, trees.sync_pmmr_h.last_pos); + let mut extension = HeaderExtension::new(pmmr, &child_batch, header); + + res = inner(&mut extension); + + rollback = extension.rollback; + size = extension.size(); + } + + match res { + Err(e) => { + debug!( + LOGGER, + "Error returned, discarding txhashset sync_head extension: {}", e + ); + trees.sync_pmmr_h.backend.discard(); + Err(e) + } + Ok(r) => { + if rollback { + trace!( + LOGGER, + "Rollbacking txhashset sync_head extension. size {:?}", + size + ); + trees.sync_pmmr_h.backend.discard(); + } else { + trace!( + LOGGER, + "Committing txhashset sync_head extension. size {:?}", + size + ); + child_batch.commit()?; + trees.sync_pmmr_h.backend.sync()?; + trees.sync_pmmr_h.last_pos = size; + } + trace!(LOGGER, "TxHashSet sync_head extension done."); + Ok(r) + } + } +} + +/// Start a new header MMR unit of work. This MMR tracks the header_head. +/// This MMR can be extended individually beyond the other (output, rangeproof and kernel) MMRs +/// to allow headers to be validated before we receive the full block data. +pub fn header_extending<'a, F, T>( + trees: &'a mut TxHashSet, + batch: &'a mut Batch, + inner: F, +) -> Result +where + F: FnOnce(&mut HeaderExtension) -> Result, +{ + let size: u64; + let res: Result; + let rollback: bool; + + // We want to use the current head of the header chain unless + // we explicitly rewind the extension. + let head = batch.header_head()?; + let header = batch.get_block_header(&head.last_block_h)?; + + // create a child transaction so if the state is rolled back by itself, all + // index saving can be undone + let child_batch = batch.child()?; + { + trace!(LOGGER, "Starting new txhashset header extension."); + let pmmr = DBPMMR::at( + &mut trees.header_pmmr_h.backend, + trees.header_pmmr_h.last_pos, + ); + let mut extension = HeaderExtension::new(pmmr, &child_batch, header); + res = inner(&mut extension); + + rollback = extension.rollback; + size = extension.size(); + } + + match res { + Err(e) => { + debug!( + LOGGER, + "Error returned, discarding txhashset header extension: {}", e + ); + trees.header_pmmr_h.backend.discard(); + Err(e) + } + Ok(r) => { + if rollback { + trace!( + LOGGER, + "Rollbacking txhashset header extension. size {:?}", + size + ); + trees.header_pmmr_h.backend.discard(); + } else { + trace!( + LOGGER, + "Committing txhashset header extension. size {:?}", + size + ); + child_batch.commit()?; + trees.header_pmmr_h.backend.sync()?; + trees.header_pmmr_h.last_pos = size; + } + trace!(LOGGER, "TxHashSet header extension done."); + Ok(r) + } + } +} + +/// A header extension to allow the header MMR to extend beyond the other MMRs individually. +/// This is to allow headers to be validated against the MMR before we have the full block data. +pub struct HeaderExtension<'a> { + header: BlockHeader, + + pmmr: DBPMMR<'a, BlockHeader, HashOnlyMMRBackend>, + + /// Rollback flag. + rollback: bool, + + /// Batch in which the extension occurs, public so it can be used within + /// an `extending` closure. Just be careful using it that way as it will + /// get rolled back with the extension (i.e on a losing fork). + pub batch: &'a Batch<'a>, +} + +impl<'a> HeaderExtension<'a> { + fn new( + pmmr: DBPMMR<'a, BlockHeader, HashOnlyMMRBackend>, + batch: &'a Batch, + header: BlockHeader, + ) -> HeaderExtension<'a> { + HeaderExtension { + header, + pmmr, + rollback: false, + batch, + } + } + + /// Apply a new header to the header MMR extension. + /// This may be either the header MMR or the sync MMR depending on the + /// extension. + pub fn apply_header(&mut self, header: &BlockHeader) -> Result<(), Error> { + self.pmmr + .push(header.clone()) + .map_err(&ErrorKind::TxHashSetErr)?; + self.header = header.clone(); + Ok(()) + } + + /// Rewind the header extension to the specified header. + /// Note the close relationship between header height and insertion index. + pub fn rewind(&mut self, header: &BlockHeader) -> Result<(), Error> { + debug!( + LOGGER, + "Rewind header extension to {} at {}", + header.hash(), + header.height + ); + + let header_pos = pmmr::insertion_to_pmmr_index(header.height + 1); + self.pmmr + .rewind(header_pos) + .map_err(&ErrorKind::TxHashSetErr)?; + + // Update our header to reflect the one we rewound to. + self.header = header.clone(); + + Ok(()) + } + + /// Truncate the header MMR (rewind all the way back to pos 0). + /// Used when rebuilding the header MMR by reapplying all headers + /// including the genesis block header. + pub fn truncate(&mut self) -> Result<(), Error> { + debug!(LOGGER, "Truncating header extension."); + self.pmmr.rewind(0).map_err(&ErrorKind::TxHashSetErr)?; + Ok(()) + } + + /// The size of the header MMR. + pub fn size(&self) -> u64 { + self.pmmr.unpruned_size() + } + + /// TODO - think about how to optimize this. + /// Requires *all* header hashes to be iterated over in ascending order. + pub fn rebuild(&mut self, head: &Tip, genesis: &BlockHeader) -> Result<(), Error> { + debug!( + LOGGER, + "About to rebuild header extension from {:?} to {:?}.", + genesis.hash(), + head.last_block_h, + ); + + let mut header_hashes = vec![]; + let mut current = self.batch.get_block_header(&head.last_block_h)?; + while current.height > 0 { + header_hashes.push(current.hash()); + current = self.batch.get_block_header(¤t.previous)?; + } + // Include the genesis header as we will re-apply it after truncating the extension. + header_hashes.push(genesis.hash()); + header_hashes.reverse(); + + // Trucate the extension (back to pos 0). + self.truncate()?; + + debug!( + LOGGER, + "Re-applying {} headers to extension, from {:?} to {:?}.", + header_hashes.len(), + header_hashes.first().unwrap(), + header_hashes.last().unwrap(), + ); + + for h in header_hashes { + let header = self.batch.get_block_header(&h)?; + // self.validate_header_root()?; + self.apply_header(&header)?; + } + Ok(()) + } +} + /// Allows the application of new blocks on top of the sum trees in a /// reversible manner within a unit of work provided by the `extending` /// function. pub struct Extension<'a> { header: BlockHeader, + header_pmmr: DBPMMR<'a, BlockHeader, HashOnlyMMRBackend>, output_pmmr: PMMR<'a, OutputIdentifier, PMMRBackend>, rproof_pmmr: PMMR<'a, RangeProof, PMMRBackend>, kernel_pmmr: PMMR<'a, TxKernel, PMMRBackend>, @@ -447,6 +779,10 @@ impl<'a> Extension<'a> { fn new(trees: &'a mut TxHashSet, batch: &'a Batch, header: BlockHeader) -> Extension<'a> { Extension { header, + header_pmmr: DBPMMR::at( + &mut trees.header_pmmr_h.backend, + trees.header_pmmr_h.last_pos, + ), output_pmmr: PMMR::at( &mut trees.output_pmmr_h.backend, trees.output_pmmr_h.last_pos, @@ -508,7 +844,16 @@ impl<'a> Extension<'a> { } /// Apply a new block to the existing state. + /// + /// Applies the following - + /// * header + /// * outputs + /// * inputs + /// * kernels + /// pub fn apply_block(&mut self, b: &Block) -> Result<(), Error> { + self.apply_header(&b.header)?; + for out in b.outputs() { let pos = self.apply_output(out)?; // Update the output_pos index for the new output. @@ -606,12 +951,18 @@ impl<'a> Extension<'a> { Ok(output_pos) } + /// Push kernel onto MMR (hash and data files). fn apply_kernel(&mut self, kernel: &TxKernel) -> Result<(), Error> { - // push kernels in their MMR and file self.kernel_pmmr .push(kernel.clone()) .map_err(&ErrorKind::TxHashSetErr)?; + Ok(()) + } + fn apply_header(&mut self, header: &BlockHeader) -> Result<(), Error> { + self.header_pmmr + .push(header.clone()) + .map_err(&ErrorKind::TxHashSetErr)?; Ok(()) } @@ -653,12 +1004,12 @@ impl<'a> Extension<'a> { /// Rewinds the MMRs to the provided block, rewinding to the last output pos /// and last kernel pos of that block. - pub fn rewind(&mut self, block_header: &BlockHeader) -> Result<(), Error> { - trace!( + pub fn rewind(&mut self, header: &BlockHeader) -> Result<(), Error> { + debug!( LOGGER, - "Rewind to header {} @ {}", - block_header.height, - block_header.hash(), + "Rewind to header {} at {}", + header.hash(), + header.height, ); // We need to build bitmaps of added and removed output positions @@ -667,16 +1018,19 @@ impl<'a> Extension<'a> { // undone during rewind). // Rewound output pos will be removed from the MMR. // Rewound input (spent) pos will be added back to the MMR. - let rewind_rm_pos = input_pos_to_rewind(block_header, &self.header, &self.batch)?; + let rewind_rm_pos = input_pos_to_rewind(header, &self.header, &self.batch)?; + + let header_pos = pmmr::insertion_to_pmmr_index(header.height + 1); self.rewind_to_pos( - block_header.output_mmr_size, - block_header.kernel_mmr_size, + header_pos, + header.output_mmr_size, + header.kernel_mmr_size, &rewind_rm_pos, )?; // Update our header to reflect the one we rewound to. - self.header = block_header.clone(); + self.header = header.clone(); Ok(()) } @@ -685,17 +1039,22 @@ impl<'a> Extension<'a> { /// kernel we want to rewind to. fn rewind_to_pos( &mut self, + header_pos: u64, output_pos: u64, kernel_pos: u64, rewind_rm_pos: &Bitmap, ) -> Result<(), Error> { - trace!( + debug!( LOGGER, - "Rewind txhashset to output {}, kernel {}", + "txhashset: rewind_to_pos: header {}, output {}, kernel {}", + header_pos, output_pos, kernel_pos, ); + self.header_pmmr + .rewind(header_pos) + .map_err(&ErrorKind::TxHashSetErr)?; self.output_pmmr .rewind(output_pos, rewind_rm_pos) .map_err(&ErrorKind::TxHashSetErr)?; @@ -712,13 +1071,23 @@ impl<'a> Extension<'a> { /// and kernel sum trees. pub fn roots(&self) -> TxHashSetRoots { TxHashSetRoots { + header_root: self.header_pmmr.root(), output_root: self.output_pmmr.root(), rproof_root: self.rproof_pmmr.root(), kernel_root: self.kernel_pmmr.root(), } } - /// Validate the various MMR roots against the block header. + /// Validate the following MMR roots against the latest header applied - + /// * output + /// * rangeproof + /// * kernel + /// + /// Note we do not validate the header MMR roots here as we need to validate + /// a header against the state of the MMR *prior* to applying it as + /// each header commits to the root of the MMR of all previous headers, + /// not including the header itself. + /// pub fn validate_roots(&self) -> Result<(), Error> { // If we are validating the genesis block then we have no outputs or // kernels. So we are done here. @@ -727,6 +1096,7 @@ impl<'a> Extension<'a> { } let roots = self.roots(); + if roots.output_root != self.header.output_root || roots.rproof_root != self.header.range_proof_root || roots.kernel_root != self.header.kernel_root @@ -737,7 +1107,26 @@ impl<'a> Extension<'a> { } } - /// Validate the output and kernel MMR sizes against the block header. + /// Validate the provided header by comparing its "prev_root" to the + /// root of the current header MMR. + /// + /// TODO - Implement this once we commit to prev_root in block headers. + /// + pub fn validate_header_root(&self, _header: &BlockHeader) -> Result<(), Error> { + if self.header.height == 0 { + return Ok(()); + } + + let _roots = self.roots(); + + // TODO - validate once we commit to header MMR root in the header + // (not just previous hash) + // if roots.header_root != header.prev_root + + Ok(()) + } + + /// Validate the header, output and kernel MMR sizes against the block header. pub fn validate_sizes(&self) -> Result<(), Error> { // If we are validating the genesis block then we have no outputs or // kernels. So we are done here. @@ -745,10 +1134,14 @@ impl<'a> Extension<'a> { return Ok(()); } - let (output_mmr_size, rproof_mmr_size, kernel_mmr_size) = self.sizes(); - if output_mmr_size != self.header.output_mmr_size - || kernel_mmr_size != self.header.kernel_mmr_size - { + let (header_mmr_size, output_mmr_size, rproof_mmr_size, kernel_mmr_size) = self.sizes(); + let expected_header_mmr_size = pmmr::insertion_to_pmmr_index(self.header.height + 2) - 1; + + if header_mmr_size != expected_header_mmr_size { + Err(ErrorKind::InvalidMMRSize.into()) + } else if output_mmr_size != self.header.output_mmr_size { + Err(ErrorKind::InvalidMMRSize.into()) + } else if kernel_mmr_size != self.header.kernel_mmr_size { Err(ErrorKind::InvalidMMRSize.into()) } else if output_mmr_size != rproof_mmr_size { Err(ErrorKind::InvalidMMRSize.into()) @@ -761,6 +1154,9 @@ impl<'a> Extension<'a> { let now = Instant::now(); // validate all hashes and sums within the trees + if let Err(e) = self.header_pmmr.validate() { + return Err(ErrorKind::InvalidTxHashSet(e).into()); + } if let Err(e) = self.output_pmmr.validate() { return Err(ErrorKind::InvalidTxHashSet(e).into()); } @@ -773,7 +1169,8 @@ impl<'a> Extension<'a> { debug!( LOGGER, - "txhashset: validated the output {}, rproof {}, kernel {} mmrs, took {}s", + "txhashset: validated the header {}, output {}, rproof {}, kernel {} mmrs, took {}s", + self.header_pmmr.unpruned_size(), self.output_pmmr.unpruned_size(), self.rproof_pmmr.unpruned_size(), self.kernel_pmmr.unpruned_size(), @@ -871,8 +1268,9 @@ impl<'a> Extension<'a> { } /// Sizes of each of the sum trees - pub fn sizes(&self) -> (u64, u64, u64) { + pub fn sizes(&self) -> (u64, u64, u64, u64) { ( + self.header_pmmr.unpruned_size(), self.output_pmmr.unpruned_size(), self.rproof_pmmr.unpruned_size(), self.kernel_pmmr.unpruned_size(), diff --git a/chain/src/types.rs b/chain/src/types.rs index 9779cd838..560458d01 100644 --- a/chain/src/types.rs +++ b/chain/src/types.rs @@ -34,9 +34,11 @@ bitflags! { } /// A helper to hold the roots of the txhashset in order to keep them -/// readable +/// readable. #[derive(Debug)] pub struct TxHashSetRoots { + /// Header root + pub header_root: Hash, /// Output root pub output_root: Hash, /// Range Proof root diff --git a/core/src/core/block.rs b/core/src/core/block.rs index 51267f45a..880ef453f 100644 --- a/core/src/core/block.rs +++ b/core/src/core/block.rs @@ -34,7 +34,7 @@ use core::{ use global; use keychain::{self, BlindingFactor}; use pow::{Difficulty, Proof, ProofOfWork}; -use ser::{self, Readable, Reader, Writeable, Writer}; +use ser::{self, PMMRable, Readable, Reader, Writeable, Writer}; use util::{secp, secp_static, static_secp_instance, LOGGER}; /// Errors thrown by Block validation @@ -196,6 +196,14 @@ impl Default for BlockHeader { } } +/// Block header hashes are maintained in the header MMR +/// but we store the data itself in the db. +impl PMMRable for BlockHeader { + fn len() -> usize { + 0 + } +} + /// Serialization of a block header impl Writeable for BlockHeader { fn write(&self, writer: &mut W) -> Result<(), ser::Error> { diff --git a/core/src/core/hash.rs b/core/src/core/hash.rs index 842db1464..873a3b001 100644 --- a/core/src/core/hash.rs +++ b/core/src/core/hash.rs @@ -53,11 +53,13 @@ impl fmt::Display for Hash { } impl Hash { + pub const SIZE: usize = 32; + /// Builds a Hash from a byte vector. If the vector is too short, it will be /// completed by zeroes. If it's too long, it will be truncated. pub fn from_vec(v: &[u8]) -> Hash { - let mut h = [0; 32]; - let copy_size = min(v.len(), 32); + let mut h = [0; Hash::SIZE]; + let copy_size = min(v.len(), Hash::SIZE); h[..copy_size].copy_from_slice(&v[..copy_size]); Hash(h) } diff --git a/core/src/core/pmmr/backend.rs b/core/src/core/pmmr/backend.rs index 2907a53a5..5ce9e0b19 100644 --- a/core/src/core/pmmr/backend.rs +++ b/core/src/core/pmmr/backend.rs @@ -18,6 +18,14 @@ use core::hash::Hash; use core::BlockHeader; use ser::PMMRable; +pub trait HashOnlyBackend { + fn append(&mut self, data: Vec) -> Result<(), String>; + + fn rewind(&mut self, position: u64) -> Result<(), String>; + + fn get_hash(&self, position: u64) -> Option; +} + /// Storage backend for the MMR, just needs to be indexed by order of insertion. /// The PMMR itself does not need the Backend to be accurate on the existence /// of an element (i.e. remove could be a no-op) but layers above can @@ -30,7 +38,7 @@ where /// associated data element to flatfile storage (for leaf nodes only). The /// position of the first element of the Vec in the MMR is provided to /// help the implementation. - fn append(&mut self, position: u64, data: Vec<(Hash, Option)>) -> Result<(), String>; + fn append(&mut self, data: T, hashes: Vec) -> Result<(), String>; /// Rewind the backend state to a previous position, as if all append /// operations after that had been canceled. Expects a position in the PMMR diff --git a/core/src/core/pmmr/db_pmmr.rs b/core/src/core/pmmr/db_pmmr.rs new file mode 100644 index 000000000..58a8904f8 --- /dev/null +++ b/core/src/core/pmmr/db_pmmr.rs @@ -0,0 +1,173 @@ +// Copyright 2018 The Grin Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Database backed MMR. + +use std::marker; + +use core::hash::Hash; +use core::pmmr::{bintree_postorder_height, is_leaf, peak_map_height, peaks, HashOnlyBackend}; +use ser::{PMMRIndexHashable, PMMRable}; + +/// Database backed MMR. +pub struct DBPMMR<'a, T, B> +where + T: PMMRable, + B: 'a + HashOnlyBackend, +{ + /// The last position in the PMMR + last_pos: u64, + /// The backend for this readonly PMMR + backend: &'a mut B, + // only needed to parameterise Backend + _marker: marker::PhantomData, +} + +impl<'a, T, B> DBPMMR<'a, T, B> +where + T: PMMRable + ::std::fmt::Debug, + B: 'a + HashOnlyBackend, +{ + /// Build a new db backed MMR. + pub fn new(backend: &'a mut B) -> DBPMMR { + DBPMMR { + last_pos: 0, + backend: backend, + _marker: marker::PhantomData, + } + } + + /// Build a new db backed MMR initialized to + /// last_pos with the provided db backend. + pub fn at(backend: &'a mut B, last_pos: u64) -> DBPMMR { + DBPMMR { + last_pos: last_pos, + backend: backend, + _marker: marker::PhantomData, + } + } + + pub fn unpruned_size(&self) -> u64 { + self.last_pos + } + + pub fn is_empty(&self) -> bool { + self.last_pos == 0 + } + + pub fn rewind(&mut self, position: u64) -> Result<(), String> { + // Identify which actual position we should rewind to as the provided + // position is a leaf. We traverse the MMR to include any parent(s) that + // need to be included for the MMR to be valid. + let mut pos = position; + while bintree_postorder_height(pos + 1) > 0 { + pos += 1; + } + self.backend.rewind(pos)?; + self.last_pos = pos; + Ok(()) + } + + /// Get the hash element at provided position in the MMR. + pub fn get_hash(&self, pos: u64) -> Option { + if pos > self.last_pos { + // If we are beyond the rhs of the MMR return None. + None + } else if is_leaf(pos) { + // If we are a leaf then get data from the backend. + self.backend.get_hash(pos) + } else { + // If we are not a leaf then return None as only leaves have data. + None + } + } + + /// Push a new element into the MMR. Computes new related peaks at + /// the same time if applicable. + pub fn push(&mut self, elmt: T) -> Result { + let elmt_pos = self.last_pos + 1; + let mut current_hash = elmt.hash_with_index(elmt_pos - 1); + + let mut to_append = vec![current_hash]; + let mut pos = elmt_pos; + + let (peak_map, height) = peak_map_height(pos - 1); + if height != 0 { + return Err(format!("bad mmr size {}", pos - 1)); + } + // hash with all immediately preceding peaks, as indicated by peak map + let mut peak = 1; + while (peak_map & peak) != 0 { + let left_sibling = pos + 1 - 2 * peak; + let left_hash = self + .backend + .get_hash(left_sibling) + .ok_or("missing left sibling in tree, should not have been pruned")?; + peak *= 2; + pos += 1; + current_hash = (left_hash, current_hash).hash_with_index(pos - 1); + to_append.push(current_hash); + } + + // append all the new nodes and update the MMR index + self.backend.append(to_append)?; + self.last_pos = pos; + Ok(elmt_pos) + } + + pub fn peaks(&self) -> Vec { + let peaks_pos = peaks(self.last_pos); + peaks_pos + .into_iter() + .filter_map(|pi| self.backend.get_hash(pi)) + .collect() + } + + pub fn root(&self) -> Hash { + let mut res = None; + for peak in self.peaks().iter().rev() { + res = match res { + None => Some(*peak), + Some(rhash) => Some((*peak, rhash).hash_with_index(self.unpruned_size())), + } + } + res.expect("no root, invalid tree") + } + + pub fn validate(&self) -> Result<(), String> { + // iterate on all parent nodes + for n in 1..(self.last_pos + 1) { + let height = bintree_postorder_height(n); + if height > 0 { + if let Some(hash) = self.get_hash(n) { + let left_pos = n - (1 << height); + let right_pos = n - 1; + if let Some(left_child_hs) = self.get_hash(left_pos) { + if let Some(right_child_hs) = self.get_hash(right_pos) { + // hash the two child nodes together with parent_pos and compare + if (left_child_hs, right_child_hs).hash_with_index(n - 1) != hash { + return Err(format!( + "Invalid MMR, hash of parent at {} does \ + not match children.", + n + )); + } + } + } + } + } + } + Ok(()) + } +} diff --git a/core/src/core/pmmr/mod.rs b/core/src/core/pmmr/mod.rs index 22f468f84..a15cbfce9 100644 --- a/core/src/core/pmmr/mod.rs +++ b/core/src/core/pmmr/mod.rs @@ -37,11 +37,13 @@ //! either be a simple Vec or a database. mod backend; +mod db_pmmr; mod pmmr; mod readonly_pmmr; mod rewindable_pmmr; pub use self::backend::*; +pub use self::db_pmmr::*; pub use self::pmmr::*; pub use self::readonly_pmmr::*; pub use self::rewindable_pmmr::*; diff --git a/core/src/core/pmmr/pmmr.rs b/core/src/core/pmmr/pmmr.rs index 46e0e89c9..4efc3f50b 100644 --- a/core/src/core/pmmr/pmmr.rs +++ b/core/src/core/pmmr/pmmr.rs @@ -173,7 +173,7 @@ where let elmt_pos = self.last_pos + 1; let mut current_hash = elmt.hash_with_index(elmt_pos - 1); - let mut to_append = vec![(current_hash, Some(elmt))]; + let mut to_append = vec![current_hash]; let mut pos = elmt_pos; let (peak_map, height) = peak_map_height(pos - 1); @@ -191,11 +191,11 @@ where peak *= 2; pos += 1; current_hash = (left_hash, current_hash).hash_with_index(pos - 1); - to_append.push((current_hash, None)); + to_append.push(current_hash); } // append all the new nodes and update the MMR index - self.backend.append(elmt_pos, to_append)?; + self.backend.append(elmt, to_append)?; self.last_pos = pos; Ok(elmt_pos) } @@ -463,6 +463,9 @@ pub fn n_leaves(size: u64) -> u64 { /// Returns the pmmr index of the nth inserted element pub fn insertion_to_pmmr_index(mut sz: u64) -> u64 { + if sz == 0 { + return 0; + } // 1 based pmmrs sz -= 1; 2 * sz - sz.count_ones() as u64 + 1 diff --git a/core/src/core/pmmr/rewindable_pmmr.rs b/core/src/core/pmmr/rewindable_pmmr.rs index 43dc0f40b..eded64035 100644 --- a/core/src/core/pmmr/rewindable_pmmr.rs +++ b/core/src/core/pmmr/rewindable_pmmr.rs @@ -19,7 +19,7 @@ use std::marker; use core::hash::Hash; use core::pmmr::{bintree_postorder_height, is_leaf, peaks, Backend}; -use ser::{PMMRable, PMMRIndexHashable}; +use ser::{PMMRIndexHashable, PMMRable}; /// Rewindable (but still readonly) view of a PMMR. pub struct RewindablePMMR<'a, T, B> diff --git a/core/tests/pmmr.rs b/core/tests/pmmr.rs index 3860a831e..11e6b9801 100644 --- a/core/tests/pmmr.rs +++ b/core/tests/pmmr.rs @@ -438,7 +438,7 @@ fn pmmr_prune() { } // First check the initial numbers of elements. - assert_eq!(ba.elems.len(), 16); + assert_eq!(ba.hashes.len(), 16); assert_eq!(ba.remove_list.len(), 0); // pruning a leaf with no parent should do nothing @@ -447,7 +447,7 @@ fn pmmr_prune() { pmmr.prune(16).unwrap(); assert_eq!(orig_root, pmmr.root()); } - assert_eq!(ba.elems.len(), 16); + assert_eq!(ba.hashes.len(), 16); assert_eq!(ba.remove_list.len(), 1); // pruning leaves with no shared parent just removes 1 element @@ -456,7 +456,7 @@ fn pmmr_prune() { pmmr.prune(2).unwrap(); assert_eq!(orig_root, pmmr.root()); } - assert_eq!(ba.elems.len(), 16); + assert_eq!(ba.hashes.len(), 16); assert_eq!(ba.remove_list.len(), 2); { @@ -464,7 +464,7 @@ fn pmmr_prune() { pmmr.prune(4).unwrap(); assert_eq!(orig_root, pmmr.root()); } - assert_eq!(ba.elems.len(), 16); + assert_eq!(ba.hashes.len(), 16); assert_eq!(ba.remove_list.len(), 3); // pruning a non-leaf node has no effect @@ -473,7 +473,7 @@ fn pmmr_prune() { pmmr.prune(3).unwrap_err(); assert_eq!(orig_root, pmmr.root()); } - assert_eq!(ba.elems.len(), 16); + assert_eq!(ba.hashes.len(), 16); assert_eq!(ba.remove_list.len(), 3); // TODO - no longer true (leaves only now) - pruning sibling removes subtree @@ -482,7 +482,7 @@ fn pmmr_prune() { pmmr.prune(5).unwrap(); assert_eq!(orig_root, pmmr.root()); } - assert_eq!(ba.elems.len(), 16); + assert_eq!(ba.hashes.len(), 16); assert_eq!(ba.remove_list.len(), 4); // TODO - no longer true (leaves only now) - pruning all leaves under level >1 @@ -492,7 +492,7 @@ fn pmmr_prune() { pmmr.prune(1).unwrap(); assert_eq!(orig_root, pmmr.root()); } - assert_eq!(ba.elems.len(), 16); + assert_eq!(ba.hashes.len(), 16); assert_eq!(ba.remove_list.len(), 5); // pruning everything should only leave us with a single peak @@ -503,7 +503,7 @@ fn pmmr_prune() { } assert_eq!(orig_root, pmmr.root()); } - assert_eq!(ba.elems.len(), 16); + assert_eq!(ba.hashes.len(), 16); assert_eq!(ba.remove_list.len(), 9); } diff --git a/core/tests/vec_backend/mod.rs b/core/tests/vec_backend/mod.rs index 93f1d32a9..c51de08e8 100644 --- a/core/tests/vec_backend/mod.rs +++ b/core/tests/vec_backend/mod.rs @@ -17,7 +17,7 @@ extern crate croaring; use croaring::Bitmap; use core::core::hash::Hash; -use core::core::pmmr::Backend; +use core::core::pmmr::{self, Backend}; use core::core::BlockHeader; use core::ser; use core::ser::{PMMRable, Readable, Reader, Writeable, Writer}; @@ -59,7 +59,8 @@ where T: PMMRable, { /// Backend elements - pub elems: Vec)>>, + pub data: Vec, + pub hashes: Vec, /// Positions of removed elements pub remove_list: Vec, } @@ -68,8 +69,9 @@ impl Backend for VecBackend where T: PMMRable, { - fn append(&mut self, _position: u64, data: Vec<(Hash, Option)>) -> Result<(), String> { - self.elems.append(&mut map_vec!(data, |d| Some(d.clone()))); + fn append(&mut self, data: T, hashes: Vec) -> Result<(), String> { + self.data.push(data); + self.hashes.append(&mut hashes.clone()); Ok(()) } @@ -77,11 +79,7 @@ where if self.remove_list.contains(&position) { None } else { - if let Some(ref elem) = self.elems[(position - 1) as usize] { - Some(elem.0) - } else { - None - } + self.get_from_file(position) } } @@ -89,28 +87,19 @@ where if self.remove_list.contains(&position) { None } else { - if let Some(ref elem) = self.elems[(position - 1) as usize] { - elem.1.clone() - } else { - None - } + self.get_data_from_file(position) } } fn get_from_file(&self, position: u64) -> Option { - if let Some(ref x) = self.elems[(position - 1) as usize] { - Some(x.0) - } else { - None - } + let hash = &self.hashes[(position - 1) as usize]; + Some(hash.clone()) } fn get_data_from_file(&self, position: u64) -> Option { - if let Some(ref x) = self.elems[(position - 1) as usize] { - x.1.clone() - } else { - None - } + let idx = pmmr::n_leaves(position); + let data = &self.data[(idx - 1) as usize]; + Some(data.clone()) } fn remove(&mut self, position: u64) -> Result<(), String> { @@ -119,7 +108,9 @@ where } fn rewind(&mut self, position: u64, _rewind_rm_pos: &Bitmap) -> Result<(), String> { - self.elems = self.elems[0..(position as usize) + 1].to_vec(); + let idx = pmmr::n_leaves(position); + self.data = self.data[0..(idx as usize) + 1].to_vec(); + self.hashes = self.hashes[0..(position as usize) + 1].to_vec(); Ok(()) } @@ -141,20 +132,9 @@ where /// Instantiates a new VecBackend pub fn new() -> VecBackend { VecBackend { - elems: vec![], + data: vec![], + hashes: vec![], remove_list: vec![], } } - - // /// Current number of elements in the underlying Vec. - // pub fn used_size(&self) -> usize { - // let mut usz = self.elems.len(); - // for (idx, _) in self.elems.iter().enumerate() { - // let idx = idx as u64; - // if self.remove_list.contains(&idx) { - // usz -= 1; - // } - // } - // usz - // } } diff --git a/servers/src/grin/sync/header_sync.rs b/servers/src/grin/sync/header_sync.rs index 8200dbf4a..1053ab8b0 100644 --- a/servers/src/grin/sync/header_sync.rs +++ b/servers/src/grin/sync/header_sync.rs @@ -67,7 +67,13 @@ impl HeaderSync { header_head.hash(), header_head.height, ); + + // Reset sync_head to the same as current header_head. self.chain.reset_sync_head(&header_head).unwrap(); + + // Rebuild the sync MMR to match our updates sync_head. + self.chain.rebuild_sync_mmr(&header_head).unwrap(); + self.history_locators.clear(); true } diff --git a/store/src/lib.rs b/store/src/lib.rs index 05870792c..57c563798 100644 --- a/store/src/lib.rs +++ b/store/src/lib.rs @@ -50,31 +50,6 @@ use byteorder::{BigEndian, WriteBytesExt}; pub use lmdb::*; -/// An iterator thad produces Readable instances back. Wraps the lower level -/// DBIterator and deserializes the returned values. -// pub struct SerIterator -// where -// T: ser::Readable, -// { -// iter: DBIterator, -// _marker: marker::PhantomData, -// } -// -// impl Iterator for SerIterator -// where -// T: ser::Readable, -// { -// type Item = T; -// -// fn next(&mut self) -> Option { -// let next = self.iter.next(); -// next.and_then(|r| { -// let (_, v) = r; -// ser::deserialize(&mut &v[..]).ok() -// }) -// } -// } - /// Build a db key from a prefix and a byte vector identifier. pub fn to_key(prefix: u8, k: &mut Vec) -> Vec { let mut res = Vec::with_capacity(k.len() + 2); diff --git a/store/src/pmmr.rs b/store/src/pmmr.rs index 369aece3c..5ffe95881 100644 --- a/store/src/pmmr.rs +++ b/store/src/pmmr.rs @@ -18,12 +18,12 @@ use std::{fs, io, marker}; use croaring::Bitmap; use core::core::hash::{Hash, Hashed}; -use core::core::pmmr::{self, family, Backend}; +use core::core::pmmr::{self, family, Backend, HashOnlyBackend}; use core::core::BlockHeader; use core::ser::{self, PMMRable}; use leaf_set::LeafSet; use prune_list::PruneList; -use types::{prune_noop, AppendOnlyFile}; +use types::{prune_noop, AppendOnlyFile, HashFile}; use util::LOGGER; const PMMR_HASH_FILE: &'static str = "pmmr_hash.bin"; @@ -67,19 +67,19 @@ impl Backend for PMMRBackend where T: PMMRable + ::std::fmt::Debug, { - /// Append the provided Hashes to the backend storage. + /// Append the provided data and hashes to the backend storage. + /// Add the new leaf pos to our leaf_set if this is a prunable MMR. #[allow(unused_variables)] - fn append(&mut self, position: u64, data: Vec<(Hash, Option)>) -> Result<(), String> { - for d in data { - self.hash_file.append(&mut ser::ser_vec(&d.0).unwrap()); - if let Some(elem) = d.1 { - self.data_file.append(&mut ser::ser_vec(&elem).unwrap()); - - if self.prunable { - // Add the new position to our leaf_set. - self.leaf_set.add(position); - } - } + fn append(&mut self, data: T, hashes: Vec) -> Result<(), String> { + if self.prunable { + let record_len = Hash::SIZE as u64; + let shift = self.prune_list.get_total_shift(); + let position = (self.hash_file.size_unsync() / record_len) + shift + 1; + self.leaf_set.add(position); + } + self.data_file.append(&mut ser::ser_vec(&data).unwrap()); + for ref h in hashes { + self.hash_file.append(&mut ser::ser_vec(h).unwrap()); } Ok(()) } @@ -96,7 +96,7 @@ where let pos = position - 1; // Must be on disk, doing a read at the correct position - let hash_record_len = 32; + let hash_record_len = Hash::SIZE; let file_offset = ((pos - shift) as usize) * hash_record_len; let data = self.hash_file.read(file_offset, hash_record_len); match ser::deserialize(&mut &data[..]) { @@ -165,7 +165,7 @@ where // Rewind the hash file accounting for pruned/compacted pos let shift = self.prune_list.get_shift(position); - let record_len = 32 as u64; + let record_len = Hash::SIZE as u64; let file_pos = (position - shift) * record_len; self.hash_file.rewind(file_pos); @@ -265,7 +265,7 @@ where pub fn unpruned_size(&self) -> io::Result { let total_shift = self.prune_list.get_total_shift(); - let record_len = 32; + let record_len = Hash::SIZE as u64; let sz = self.hash_file.size()?; Ok(sz / record_len + total_shift) } @@ -280,7 +280,7 @@ where /// Size of the underlying hashed data. Extremely dependent on pruning /// and compaction. pub fn hash_size(&self) -> io::Result { - self.hash_file.size().map(|sz| sz / 32) + self.hash_file.size().map(|sz| sz / Hash::SIZE as u64) } /// Syncs all files to disk. A call to sync is required to ensure all the @@ -350,7 +350,7 @@ where // 1. Save compact copy of the hash file, skipping removed data. { - let record_len = 32; + let record_len = Hash::SIZE as u64; let off_to_rm = map_vec!(pos_to_rm, |pos| { let shift = self.prune_list.get_shift(pos.into()); @@ -451,6 +451,65 @@ where } } +/// Simple MMR Backend for hashes only (data maintained in the db). +pub struct HashOnlyMMRBackend { + /// The hash file underlying this MMR backend. + hash_file: HashFile, +} + +impl HashOnlyBackend for HashOnlyMMRBackend { + fn append(&mut self, hashes: Vec) -> Result<(), String> { + for ref h in hashes { + self.hash_file + .append(h) + .map_err(|e| format!("Failed to append to backend, {:?}", e))?; + } + Ok(()) + } + + fn rewind(&mut self, position: u64) -> Result<(), String> { + self.hash_file + .rewind(position) + .map_err(|e| format!("Failed to rewind backend, {:?}", e))?; + Ok(()) + } + + fn get_hash(&self, position: u64) -> Option { + self.hash_file.read(position) + } +} + +impl HashOnlyMMRBackend { + /// Instantiates a new PMMR backend. + /// Use the provided dir to store its files. + pub fn new(data_dir: String) -> io::Result { + let hash_file = HashFile::open(format!("{}/{}", data_dir, PMMR_HASH_FILE))?; + Ok(HashOnlyMMRBackend { hash_file }) + } + + /// The unpruned size of this MMR backend. + pub fn unpruned_size(&self) -> io::Result { + let sz = self.hash_file.size()?; + Ok(sz / Hash::SIZE as u64) + } + + /// Discard any pending changes to this MMR backend. + pub fn discard(&mut self) { + self.hash_file.discard(); + } + + /// Sync pending changes to the backend file on disk. + pub fn sync(&mut self) -> io::Result<()> { + if let Err(e) = self.hash_file.flush() { + return Err(io::Error::new( + io::ErrorKind::Interrupted, + format!("Could not write to hash storage, disk full? {:?}", e), + )); + } + Ok(()) + } +} + /// Filter remove list to exclude roots. /// We want to keep roots around so we have hashes for Merkle proofs. fn removed_excl_roots(removed: Bitmap) -> Bitmap { @@ -459,6 +518,5 @@ fn removed_excl_roots(removed: Bitmap) -> Bitmap { .filter(|pos| { let (parent_pos, _) = family(*pos as u64); removed.contains(parent_pos as u32) - }) - .collect() + }).collect() } diff --git a/store/src/rm_log.rs b/store/src/rm_log.rs index 0f0fc1709..e3b8de09f 100644 --- a/store/src/rm_log.rs +++ b/store/src/rm_log.rs @@ -134,8 +134,7 @@ impl RemoveLog { None } }, - ) - .collect() + ).collect() } } diff --git a/store/src/types.rs b/store/src/types.rs index af9366688..b914ec186 100644 --- a/store/src/types.rs +++ b/store/src/types.rs @@ -25,11 +25,76 @@ use libc::{ftruncate as ftruncate64, off_t as off64_t}; #[cfg(any(target_os = "linux"))] use libc::{ftruncate64, off64_t}; +use core::core::hash::Hash; use core::ser; +use util::LOGGER; /// A no-op function for doing nothing with some pruned data. pub fn prune_noop(_pruned_data: &[u8]) {} +/// Hash file (MMR) wrapper around an append only file. +pub struct HashFile { + file: AppendOnlyFile, +} + +impl HashFile { + /// Open (or create) a hash file at the provided path on disk. + pub fn open(path: String) -> io::Result { + let file = AppendOnlyFile::open(path)?; + Ok(HashFile { file }) + } + + /// Append a hash to this hash file. + /// Will not be written to disk until flush() is subsequently called. + /// Alternatively discard() may be called to discard any pending changes. + pub fn append(&mut self, hash: &Hash) -> io::Result<()> { + let mut bytes = ser::ser_vec(hash).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + self.file.append(&mut bytes); + Ok(()) + } + + /// Read a hash from the hash file by position. + pub fn read(&self, position: u64) -> Option { + // The MMR starts at 1, our binary backend starts at 0. + let pos = position - 1; + + // Must be on disk, doing a read at the correct position + let file_offset = (pos as usize) * Hash::SIZE; + let data = self.file.read(file_offset, Hash::SIZE); + match ser::deserialize(&mut &data[..]) { + Ok(h) => Some(h), + Err(e) => { + error!( + LOGGER, + "Corrupted storage, could not read an entry from hash file: {:?}", e + ); + return None; + } + } + } + + /// Rewind the backend file to the specified position. + pub fn rewind(&mut self, position: u64) -> io::Result<()> { + self.file.rewind(position * Hash::SIZE as u64); + Ok(()) + } + + /// Flush unsynced changes to the hash file to disk. + pub fn flush(&mut self) -> io::Result<()> { + self.file.flush() + } + + /// Discard any unsynced changes to the hash file. + pub fn discard(&mut self) { + self.file.discard() + } + + /// Size of the hash file in bytes. + pub fn size(&self) -> io::Result { + self.file.size() + } +} + /// Wrapper for a file that can be read at any position (random read) but for /// which writes are append only. Reads are backed by a memory map (mmap(2)), /// relying on the operating system for fast access and caching. The memory @@ -246,6 +311,11 @@ impl AppendOnlyFile { fs::metadata(&self.path).map(|md| md.len()) } + /// Current size of the (unsynced) file in bytes. + pub fn size_unsync(&self) -> u64 { + (self.buffer_start + self.buffer.len()) as u64 + } + /// Path of the underlying file pub fn path(&self) -> String { self.path.clone()