diff --git a/chain/src/chain.rs b/chain/src/chain.rs index 80906fe79..cb76e2d7c 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -935,35 +935,22 @@ impl Chain { Ok(()) } - fn compact_txhashset(&self) -> Result<(), Error> { - debug!("Starting txhashset compaction..."); - { - // Note: We take a lock on the stop_state here and do not release it until - // we have finished processing this chain compaction operation. - let stop_lock = self.stop_state.lock(); - if stop_lock.is_stopped() { - return Err(ErrorKind::Stopped.into()); - } - - let mut txhashset = self.txhashset.write(); - txhashset.compact()?; - } - debug!("... finished txhashset compaction."); - Ok(()) - } - /// Cleanup old blocks from the db. /// Determine the cutoff height from the horizon and the current block height. /// *Only* runs if we are not in archive mode. - fn compact_blocks_db(&self) -> Result<(), Error> { + fn remove_historical_blocks( + &self, + txhashset: &txhashset::TxHashSet, + batch: &mut store::Batch<'_>, + ) -> Result<(), Error> { if self.archive_mode { return Ok(()); } let horizon = global::cut_through_horizon() as u64; - let head = self.head()?; + let head = batch.head()?; - let tail = match self.tail() { + let tail = match batch.tail() { Ok(tail) => tail, Err(_) => Tip::from_header(&self.genesis), }; @@ -971,7 +958,7 @@ impl Chain { let cutoff = head.height.saturating_sub(horizon); debug!( - "compact_blocks_db: head height: {}, tail height: {}, horizon: {}, cutoff: {}", + "remove_historical_blocks: head height: {}, tail height: {}, horizon: {}, cutoff: {}", head.height, tail.height, horizon, cutoff, ); @@ -981,10 +968,12 @@ impl Chain { let mut count = 0; - let tail = self.get_header_by_height(head.height - horizon)?; - let mut current = self.get_header_by_height(head.height - horizon - 1)?; + let tail_hash = txhashset.get_header_hash_by_height(head.height - horizon)?; + let tail = batch.get_block_header(&tail_hash)?; + + let current_hash = txhashset.get_header_hash_by_height(head.height - horizon - 1)?; + let mut current = batch.get_block_header(¤t_hash)?; - let batch = self.store.batch()?; loop { // Go to the store directly so we can handle NotFoundErr robustly. match self.store.get_block(¤t.hash()) { @@ -1011,11 +1000,12 @@ impl Chain { } } batch.save_body_tail(&Tip::from_header(&tail))?; - batch.commit()?; + debug!( - "compact_blocks_db: removed {} blocks. tail height: {}", + "remove_historical_blocks: removed {} blocks. tail height: {}", count, tail.height ); + Ok(()) } @@ -1025,12 +1015,35 @@ impl Chain { /// * removes historical blocks and associated data from the db (unless archive mode) /// pub fn compact(&self) -> Result<(), Error> { - self.compact_txhashset()?; - - if !self.archive_mode { - self.compact_blocks_db()?; + // Note: We take a lock on the stop_state here and do not release it until + // we have finished processing this chain compaction operation. + // We want to avoid shutting the node down in the middle of compacting the data. + let stop_lock = self.stop_state.lock(); + if stop_lock.is_stopped() { + return Err(ErrorKind::Stopped.into()); } + // Take a write lock on the txhashet and start a new writeable db batch. + let mut txhashset = self.txhashset.write(); + let mut batch = self.store.batch()?; + + // Compact the txhashset itself (rewriting the pruned backend files). + txhashset.compact(&mut batch)?; + + // Rebuild our output_pos index in the db based on current UTXO set. + txhashset::extending(&mut txhashset, &mut batch, |extension| { + extension.rebuild_index()?; + Ok(()) + })?; + + // If we are not in archival mode remove historical blocks from the db. + if !self.archive_mode { + self.remove_historical_blocks(&txhashset, &mut batch)?; + } + + // Commit all the above db changes. + batch.commit()?; + Ok(()) } @@ -1143,12 +1156,20 @@ impl Chain { } /// Gets the block header at the provided height. - /// Note: This takes a read lock on the txhashset. + /// Note: Takes a read lock on the txhashset. /// Take care not to call this repeatedly in a tight loop. pub fn get_header_by_height(&self, height: u64) -> Result { + let hash = self.get_header_hash_by_height(height)?; + self.get_block_header(&hash) + } + + /// Gets the header hash at the provided height. + /// Note: Takes a read lock on the txhashset. + /// Take care not to call this repeatedly in a tight loop. + fn get_header_hash_by_height(&self, height: u64) -> Result { let txhashset = self.txhashset.read(); - let header = txhashset.get_header_by_height(height)?; - Ok(header) + let hash = txhashset.get_header_hash_by_height(height)?; + Ok(hash) } /// Gets the block header in which a given output appears in the txhashset. diff --git a/chain/src/error.rs b/chain/src/error.rs index b39404423..4ba115269 100644 --- a/chain/src/error.rs +++ b/chain/src/error.rs @@ -89,9 +89,15 @@ pub enum ErrorKind { /// Error validating a Merkle proof (coinbase output) #[fail(display = "Error validating merkle proof")] MerkleProof, - /// output not found + /// Output not found #[fail(display = "Output not found")] OutputNotFound, + /// Rangeproof not found + #[fail(display = "Rangeproof not found")] + RangeproofNotFound, + /// Tx kernel not found + #[fail(display = "Tx kernel not found")] + TxKernelNotFound, /// output spent #[fail(display = "Output is spent")] OutputSpent, diff --git a/chain/src/store.rs b/chain/src/store.rs index 1b0265316..e1d602265 100644 --- a/chain/src/store.rs +++ b/chain/src/store.rs @@ -51,12 +51,13 @@ impl ChainStore { } } -#[allow(missing_docs)] impl ChainStore { + /// The current chain head. pub fn head(&self) -> Result { option_to_not_found(self.db.get_ser(&vec![HEAD_PREFIX]), "HEAD") } + /// The current chain "tail" (earliest block in the store). pub fn tail(&self) -> Result { option_to_not_found(self.db.get_ser(&vec![TAIL_PREFIX]), "TAIL") } @@ -71,10 +72,12 @@ impl ChainStore { option_to_not_found(self.db.get_ser(&vec![HEADER_HEAD_PREFIX]), "HEADER_HEAD") } + /// The "sync" head. pub fn get_sync_head(&self) -> Result { option_to_not_found(self.db.get_ser(&vec![SYNC_HEAD_PREFIX]), "SYNC_HEAD") } + /// Get full block. pub fn get_block(&self, h: &Hash) -> Result { option_to_not_found( self.db.get_ser(&to_key(BLOCK_PREFIX, &mut h.to_vec())), @@ -82,10 +85,12 @@ impl ChainStore { ) } + /// Does this full block exist? pub fn block_exists(&self, h: &Hash) -> Result { self.db.exists(&to_key(BLOCK_PREFIX, &mut h.to_vec())) } + /// Get block_sums for the block hash. pub fn get_block_sums(&self, h: &Hash) -> Result { option_to_not_found( self.db.get_ser(&to_key(BLOCK_SUMS_PREFIX, &mut h.to_vec())), @@ -93,10 +98,12 @@ impl ChainStore { ) } + /// Get previous header. pub fn get_previous_header(&self, header: &BlockHeader) -> Result { self.get_block_header(&header.prev_hash) } + /// Get block header. pub fn get_block_header(&self, h: &Hash) -> Result { option_to_not_found( self.db @@ -105,6 +112,7 @@ impl ChainStore { ) } + /// Get PMMR pos for the given output commitment. pub fn get_output_pos(&self, commit: &Commitment) -> Result { option_to_not_found( self.db @@ -127,12 +135,13 @@ pub struct Batch<'a> { db: store::Batch<'a>, } -#[allow(missing_docs)] impl<'a> Batch<'a> { + /// The head. pub fn head(&self) -> Result { option_to_not_found(self.db.get_ser(&vec![HEAD_PREFIX]), "HEAD") } + /// The tail. pub fn tail(&self) -> Result { option_to_not_found(self.db.get_ser(&vec![TAIL_PREFIX]), "TAIL") } @@ -147,27 +156,33 @@ impl<'a> Batch<'a> { option_to_not_found(self.db.get_ser(&vec![HEADER_HEAD_PREFIX]), "HEADER_HEAD") } + /// Get "sync" head. pub fn get_sync_head(&self) -> Result { option_to_not_found(self.db.get_ser(&vec![SYNC_HEAD_PREFIX]), "SYNC_HEAD") } + /// Save head to db. pub fn save_head(&self, t: &Tip) -> Result<(), Error> { self.db.put_ser(&vec![HEAD_PREFIX], t)?; self.db.put_ser(&vec![HEADER_HEAD_PREFIX], t) } + /// Save body head to db. pub fn save_body_head(&self, t: &Tip) -> Result<(), Error> { self.db.put_ser(&vec![HEAD_PREFIX], t) } + /// Save body "tail" to db. pub fn save_body_tail(&self, t: &Tip) -> Result<(), Error> { self.db.put_ser(&vec![TAIL_PREFIX], t) } + /// Save header_head to db. pub fn save_header_head(&self, t: &Tip) -> Result<(), Error> { self.db.put_ser(&vec![HEADER_HEAD_PREFIX], t) } + /// Save "sync" head to db. pub fn save_sync_head(&self, t: &Tip) -> Result<(), Error> { self.db.put_ser(&vec![SYNC_HEAD_PREFIX], t) } @@ -192,6 +207,7 @@ impl<'a> Batch<'a> { ) } + /// Does the block exist? pub fn block_exists(&self, h: &Hash) -> Result { self.db.exists(&to_key(BLOCK_PREFIX, &mut h.to_vec())) } @@ -225,6 +241,7 @@ impl<'a> Batch<'a> { Ok(()) } + /// Save block header to db. pub fn save_block_header(&self, header: &BlockHeader) -> Result<(), Error> { let hash = header.hash(); @@ -235,6 +252,7 @@ impl<'a> Batch<'a> { Ok(()) } + /// Save output_pos to index. pub fn save_output_pos(&self, commit: &Commitment, pos: u64) -> Result<(), Error> { self.db.put_ser( &to_key(COMMIT_POS_PREFIX, &mut commit.as_ref().to_vec())[..], @@ -242,6 +260,7 @@ impl<'a> Batch<'a> { ) } + /// Get output_pos from index. pub fn get_output_pos(&self, commit: &Commitment) -> Result { option_to_not_found( self.db @@ -250,15 +269,21 @@ impl<'a> Batch<'a> { ) } - pub fn delete_output_pos(&self, commit: &[u8]) -> Result<(), Error> { - self.db - .delete(&to_key(COMMIT_POS_PREFIX, &mut commit.to_vec())) + /// Clear all entries from the output_pos index (must be rebuilt after). + pub fn clear_output_pos(&self) -> Result<(), Error> { + let key = to_key(COMMIT_POS_PREFIX, &mut "".to_string().into_bytes()); + for (k, _) in self.db.iter::(&key).unwrap() { + self.db.delete(&k)?; + } + Ok(()) } + /// Get the previous header. pub fn get_previous_header(&self, header: &BlockHeader) -> Result { self.get_block_header(&header.prev_hash) } + /// Get block header. pub fn get_block_header(&self, h: &Hash) -> Result { option_to_not_found( self.db @@ -267,6 +292,7 @@ impl<'a> Batch<'a> { ) } + /// Save the input bitmap for the block. fn save_block_input_bitmap(&self, bh: &Hash, bm: &Bitmap) -> Result<(), Error> { self.db.put( &to_key(BLOCK_INPUT_BITMAP_PREFIX, &mut bh.to_vec())[..], @@ -274,16 +300,19 @@ impl<'a> Batch<'a> { ) } + /// Delete the block input bitmap. fn delete_block_input_bitmap(&self, bh: &Hash) -> Result<(), Error> { self.db .delete(&to_key(BLOCK_INPUT_BITMAP_PREFIX, &mut bh.to_vec())) } + /// Save block_sums for the block. pub fn save_block_sums(&self, h: &Hash, sums: &BlockSums) -> Result<(), Error> { self.db .put_ser(&to_key(BLOCK_SUMS_PREFIX, &mut h.to_vec())[..], &sums) } + /// Get block_sums for the block. pub fn get_block_sums(&self, h: &Hash) -> Result { option_to_not_found( self.db.get_ser(&to_key(BLOCK_SUMS_PREFIX, &mut h.to_vec())), @@ -291,10 +320,12 @@ impl<'a> Batch<'a> { ) } + /// Delete the block_sums for the block. fn delete_block_sums(&self, bh: &Hash) -> Result<(), Error> { self.db.delete(&to_key(BLOCK_SUMS_PREFIX, &mut bh.to_vec())) } + /// Build the input bitmap for the given block. fn build_block_input_bitmap(&self, block: &Block) -> Result { let bitmap = block .inputs() @@ -305,6 +336,7 @@ impl<'a> Batch<'a> { Ok(bitmap) } + /// Build and store the input bitmap for the given block. fn build_and_store_block_input_bitmap(&self, block: &Block) -> Result { // Build the bitmap. let bitmap = self.build_block_input_bitmap(block)?; @@ -315,8 +347,8 @@ impl<'a> Batch<'a> { Ok(bitmap) } - // Get the block input bitmap from the db or build the bitmap from - // the full block from the db (if the block is found). + /// Get the block input bitmap from the db or build the bitmap from + /// the full block from the db (if the block is found). pub fn get_block_input_bitmap(&self, bh: &Hash) -> Result { if let Ok(Some(bytes)) = self .db diff --git a/chain/src/txhashset/txhashset.rs b/chain/src/txhashset/txhashset.rs index 31fc9f7e0..866b91818 100644 --- a/chain/src/txhashset/txhashset.rs +++ b/chain/src/txhashset/txhashset.rs @@ -33,7 +33,6 @@ use crate::util::{file, secp_static, zip}; use croaring::Bitmap; use grin_store; use grin_store::pmmr::{PMMRBackend, PMMR_FILES}; -use grin_store::types::prune_noop; use std::collections::HashSet; use std::fs::{self, File}; use std::path::{Path, PathBuf}; @@ -206,21 +205,27 @@ impl TxHashSet { .get_last_n_insertions(distance) } - /// Get the header at the specified height based on the current state of the txhashset. - /// Derives the MMR pos from the height (insertion index) and retrieves the header hash. - /// Looks the header up in the db by hash. - pub fn get_header_by_height(&self, height: u64) -> Result { + /// Get the header hash at the specified height based on the current state of the txhashset. + pub fn get_header_hash_by_height(&self, height: u64) -> Result { let pos = pmmr::insertion_to_pmmr_index(height + 1); let header_pmmr = ReadonlyPMMR::at(&self.header_pmmr_h.backend, self.header_pmmr_h.last_pos); if let Some(entry) = header_pmmr.get_data(pos) { - let header = self.commit_index.get_block_header(&entry.hash())?; - Ok(header) + Ok(entry.hash()) } else { - Err(ErrorKind::Other(format!("get header by height")).into()) + Err(ErrorKind::Other(format!("get header hash by height")).into()) } } + /// Get the header at the specified height based on the current state of the txhashset. + /// Derives the MMR pos from the height (insertion index) and retrieves the header hash. + /// Looks the header up in the db by hash. + pub fn get_header_by_height(&self, height: u64) -> Result { + let hash = self.get_header_hash_by_height(height)?; + let header = self.commit_index.get_block_header(&hash)?; + Ok(header) + } + /// returns outputs from the given insertion (leaf) index up to the /// specified limit. Also returns the last index actually populated pub fn outputs_by_insertion_index( @@ -280,39 +285,30 @@ impl TxHashSet { } /// Compact the MMR data files and flush the rm logs - pub fn compact(&mut self) -> Result<(), Error> { - let commit_index = self.commit_index.clone(); - let head_header = commit_index.head_header()?; + pub fn compact(&mut self, batch: &mut Batch<'_>) -> Result<(), Error> { + debug!("txhashset: starting compaction..."); + + let head_header = batch.head_header()?; let current_height = head_header.height; // horizon for compacting is based on current_height - let horizon = current_height.saturating_sub(global::cut_through_horizon().into()); - let horizon_header = self.get_header_by_height(horizon)?; + let horizon_height = current_height.saturating_sub(global::cut_through_horizon().into()); + let horizon_hash = self.get_header_hash_by_height(horizon_height)?; + let horizon_header = batch.get_block_header(&horizon_hash)?; - let batch = self.commit_index.batch()?; + let rewind_rm_pos = input_pos_to_rewind(&horizon_header, &head_header, batch)?; - let rewind_rm_pos = input_pos_to_rewind(&horizon_header, &head_header, &batch)?; + debug!("txhashset: check_compact output mmr backend..."); + self.output_pmmr_h + .backend + .check_compact(horizon_header.output_mmr_size, &rewind_rm_pos)?; - { - let clean_output_index = |commit: &[u8]| { - let _ = batch.delete_output_pos(commit); - }; + debug!("txhashset: check_compact rangeproof mmr backend..."); + self.rproof_pmmr_h + .backend + .check_compact(horizon_header.output_mmr_size, &rewind_rm_pos)?; - self.output_pmmr_h.backend.check_compact( - horizon_header.output_mmr_size, - &rewind_rm_pos, - clean_output_index, - )?; - - self.rproof_pmmr_h.backend.check_compact( - horizon_header.output_mmr_size, - &rewind_rm_pos, - &prune_noop, - )?; - } - - // Finally commit the batch, saving everything to the db. - batch.commit()?; + debug!("txhashset: ... compaction finished"); Ok(()) } @@ -797,11 +793,9 @@ impl<'a> Committed for Extension<'a> { fn outputs_committed(&self) -> Vec { let mut commitments = vec![]; - for n in 1..self.output_pmmr.unpruned_size() + 1 { - if pmmr::is_leaf(n) { - if let Some(out) = self.output_pmmr.get_data(n) { - commitments.push(out.commit); - } + for pos in self.output_pmmr.leaf_pos_iter() { + if let Some(out) = self.output_pmmr.get_data(pos) { + commitments.push(out.commit); } } commitments @@ -1259,20 +1253,18 @@ impl<'a> Extension<'a> { pub fn rebuild_index(&self) -> Result<(), Error> { let now = Instant::now(); - let mut count = 0; + self.batch.clear_output_pos()?; - for n in 1..self.output_pmmr.unpruned_size() + 1 { - // non-pruned leaves only - if pmmr::bintree_postorder_height(n) == 0 { - if let Some(out) = self.output_pmmr.get_data(n) { - self.batch.save_output_pos(&out.commit, n)?; - count += 1; - } + let mut count = 0; + for pos in self.output_pmmr.leaf_pos_iter() { + if let Some(out) = self.output_pmmr.get_data(pos) { + self.batch.save_output_pos(&out.commit, pos)?; + count += 1; } } debug!( - "txhashset: rebuild_index ({} UTXOs), took {}s", + "txhashset: rebuild_index: {} UTXOs, took {}s", count, now.elapsed().as_secs(), ); @@ -1325,13 +1317,23 @@ impl<'a> Extension<'a> { let total_kernels = pmmr::n_leaves(self.kernel_pmmr.unpruned_size()); for n in 1..self.kernel_pmmr.unpruned_size() + 1 { if pmmr::is_leaf(n) { - if let Some(kernel) = self.kernel_pmmr.get_data(n) { - kernel.verify()?; - kern_count += 1; + let kernel = self + .kernel_pmmr + .get_data(n) + .ok_or::(ErrorKind::TxKernelNotFound.into())?; + + kernel.verify()?; + kern_count += 1; + + if kern_count % 20 == 0 { + status.on_validation(kern_count, total_kernels, 0, 0); + } + if kern_count % 1_000 == 0 { + debug!( + "txhashset: verify_kernel_signatures: verified {} signatures", + kern_count, + ); } - } - if n % 20 == 0 { - status.on_validation(kern_count, total_kernels, 0, 0); } } @@ -1353,30 +1355,34 @@ impl<'a> Extension<'a> { let mut proof_count = 0; let total_rproofs = pmmr::n_leaves(self.output_pmmr.unpruned_size()); - for n in 1..self.output_pmmr.unpruned_size() + 1 { - if pmmr::is_leaf(n) { - if let Some(out) = self.output_pmmr.get_data(n) { - if let Some(rp) = self.rproof_pmmr.get_data(n) { - commits.push(out.commit); - proofs.push(rp); - } else { - // TODO - rangeproof not found - return Err(ErrorKind::OutputNotFound.into()); - } - proof_count += 1; + for pos in self.output_pmmr.leaf_pos_iter() { + let output = self.output_pmmr.get_data(pos); + let proof = self.rproof_pmmr.get_data(pos); - if proofs.len() >= 1000 { - Output::batch_verify_proofs(&commits, &proofs)?; - commits.clear(); - proofs.clear(); - debug!( - "txhashset: verify_rangeproofs: verified {} rangeproofs", - proof_count, - ); - } + // Output and corresponding rangeproof *must* exist. + // It is invalid for either to be missing and we fail immediately in this case. + match (output, proof) { + (None, _) => return Err(ErrorKind::OutputNotFound.into()), + (_, None) => return Err(ErrorKind::RangeproofNotFound.into()), + (Some(output), Some(proof)) => { + commits.push(output.commit); + proofs.push(proof); } } - if n % 20 == 0 { + + proof_count += 1; + + if proofs.len() >= 1_000 { + Output::batch_verify_proofs(&commits, &proofs)?; + commits.clear(); + proofs.clear(); + debug!( + "txhashset: verify_rangeproofs: verified {} rangeproofs", + proof_count, + ); + } + + if proof_count % 20 == 0 { status.on_validation(0, 0, proof_count, total_rproofs); } } diff --git a/core/src/core/pmmr/backend.rs b/core/src/core/pmmr/backend.rs index e4e0e9d27..c92dd7f31 100644 --- a/core/src/core/pmmr/backend.rs +++ b/core/src/core/pmmr/backend.rs @@ -51,6 +51,9 @@ pub trait Backend { /// (ignoring the remove log). fn get_data_from_file(&self, position: u64) -> Option; + /// Iterator over current (unpruned, unremoved) leaf positions. + fn leaf_pos_iter(&self) -> Box + '_>; + /// Remove Hash by insertion position. An index is also provided so the /// underlying backend can implement some rollback of positions up to a /// given index (practically the index is the height of a block that diff --git a/core/src/core/pmmr/pmmr.rs b/core/src/core/pmmr/pmmr.rs index 6df1a46f6..7367f0eef 100644 --- a/core/src/core/pmmr/pmmr.rs +++ b/core/src/core/pmmr/pmmr.rs @@ -75,6 +75,11 @@ where ReadonlyPMMR::at(&self.backend, self.last_pos) } + /// Iterator over current (unpruned, unremoved) leaf positions. + pub fn leaf_pos_iter(&self) -> impl Iterator + '_ { + self.backend.leaf_pos_iter() + } + /// Returns a vec of the peaks of this MMR. pub fn peaks(&self) -> Vec { let peaks_pos = peaks(self.last_pos); diff --git a/core/tests/vec_backend.rs b/core/tests/vec_backend.rs index e7bae7cba..73f5e3a7e 100644 --- a/core/tests/vec_backend.rs +++ b/core/tests/vec_backend.rs @@ -104,6 +104,10 @@ impl Backend for VecBackend { Some(data.as_elmt()) } + fn leaf_pos_iter(&self) -> Box + '_> { + unimplemented!() + } + fn remove(&mut self, position: u64) -> Result<(), String> { self.remove_list.push(position); Ok(()) diff --git a/p2p/src/store.rs b/p2p/src/store.rs index dc857f189..7bb88ebf7 100644 --- a/p2p/src/store.rs +++ b/p2p/src/store.rs @@ -157,6 +157,7 @@ impl PeerStore { let mut peers = self .db .iter::(&to_key(PEER_PREFIX, &mut "".to_string().into_bytes()))? + .map(|(_, v)| v) .filter(|p| p.flags == state && p.capabilities.contains(cap)) .collect::>(); thread_rng().shuffle(&mut peers[..]); @@ -167,7 +168,10 @@ impl PeerStore { /// Used for /v1/peers/all api endpoint pub fn all_peers(&self) -> Result, Error> { let key = to_key(PEER_PREFIX, &mut "".to_string().into_bytes()); - Ok(self.db.iter::(&key)?.collect::>()) + Ok(self.db + .iter::(&key)? + .map(|(_, v)| v) + .collect::>()) } /// Convenience method to load a peer data, update its status and save it diff --git a/store/src/leaf_set.rs b/store/src/leaf_set.rs index f75d4e148..b6646eb74 100644 --- a/store/src/leaf_set.rs +++ b/store/src/leaf_set.rs @@ -199,4 +199,9 @@ impl LeafSet { pub fn is_empty(&self) -> bool { self.len() == 0 } + + /// Iterator over positionns in the leaf_set (all leaf positions). + pub fn iter(&self) -> impl Iterator + '_ { + self.bitmap.iter().map(|x| x as u64) + } } diff --git a/store/src/lmdb.rs b/store/src/lmdb.rs index be50a2539..7ab1027df 100644 --- a/store/src/lmdb.rs +++ b/store/src/lmdb.rs @@ -162,8 +162,8 @@ impl Store { res.to_opt().map(|r| r.is_some()).map_err(From::from) } - /// Produces an iterator of `Readable` types moving forward from the - /// provided key. + /// Produces an iterator of (key, value) pairs, where values are `Readable` types + /// moving forward from the provided key. pub fn iter(&self, from: &[u8]) -> Result, Error> { let tx = Arc::new(lmdb::ReadTransaction::new(self.env.clone())?); let cursor = Arc::new(tx.cursor(self.db.clone())?); @@ -273,9 +273,9 @@ impl Iterator for SerIterator where T: ser::Readable, { - type Item = T; + type Item = (Vec, T); - fn next(&mut self) -> Option { + fn next(&mut self) -> Option<(Vec, T)> { let access = self.tx.access(); let kv = if self.seek { Arc::get_mut(&mut self.cursor).unwrap().next(&access) @@ -285,7 +285,10 @@ where .unwrap() .seek_range_k(&access, &self.prefix[..]) }; - self.deser_if_prefix_match(kv) + match kv { + Ok((k, v)) => self.deser_if_prefix_match(k, v), + Err(_) => None, + } } } @@ -293,17 +296,16 @@ impl SerIterator where T: ser::Readable, { - fn deser_if_prefix_match(&self, kv: Result<(&[u8], &[u8]), lmdb::Error>) -> Option { - match kv { - Ok((k, v)) => { - let plen = self.prefix.len(); - if plen == 0 || k[0..plen] == self.prefix[..] { - ser::deserialize(&mut &v[..]).ok() - } else { - None - } + fn deser_if_prefix_match(&self, key: &[u8], value: &[u8]) -> Option<(Vec, T)> { + let plen = self.prefix.len(); + if plen == 0 || key[0..plen] == self.prefix[..] { + if let Ok(value) = ser::deserialize(&mut &value[..]) { + Some((key.to_vec(), value)) + } else { + None } - Err(_) => None, + } else { + None } } } diff --git a/store/src/pmmr.rs b/store/src/pmmr.rs index 200dbae9c..f57867c46 100644 --- a/store/src/pmmr.rs +++ b/store/src/pmmr.rs @@ -21,7 +21,7 @@ use crate::core::core::BlockHeader; use crate::core::ser::PMMRable; use crate::leaf_set::LeafSet; use crate::prune_list::PruneList; -use crate::types::{prune_noop, DataFile}; +use crate::types::DataFile; use croaring::Bitmap; use std::path::{Path, PathBuf}; @@ -121,6 +121,17 @@ impl Backend for PMMRBackend { self.get_data_from_file(pos) } + /// Returns an iterator over all the leaf positions. + /// for a prunable PMMR this is an iterator over the leaf_set bitmap. + /// For a non-prunable PMMR this is *all* leaves (this is not yet implemented). + fn leaf_pos_iter(&self) -> Box + '_> { + if self.prunable { + Box::new(self.leaf_set.iter()) + } else { + panic!("leaf_pos_iter not implemented for non-prunable PMMR") + } + } + /// Rewind the PMMR backend to the given position. fn rewind(&mut self, position: u64, rewind_rm_pos: &Bitmap) -> Result<(), String> { // First rewind the leaf_set with the necessary added and removed positions. @@ -278,15 +289,7 @@ impl PMMRBackend { /// aligned. The block_marker in the db/index for the particular block /// will have a suitable output_pos. This is used to enforce a horizon /// after which the local node should have all the data to allow rewinding. - pub fn check_compact

( - &mut self, - cutoff_pos: u64, - rewind_rm_pos: &Bitmap, - prune_cb: P, - ) -> io::Result - where - P: Fn(&[u8]), - { + pub fn check_compact(&mut self, cutoff_pos: u64, rewind_rm_pos: &Bitmap) -> io::Result { assert!(self.prunable, "Trying to compact a non-prunable PMMR"); // Paths for tmp hash and data files. @@ -306,7 +309,7 @@ impl PMMRBackend { }); self.hash_file - .save_prune(&tmp_prune_file_hash, &off_to_rm, &prune_noop)?; + .save_prune(&tmp_prune_file_hash, &off_to_rm)?; } // 2. Save compact copy of the data file, skipping removed leaves. @@ -324,7 +327,7 @@ impl PMMRBackend { }); self.data_file - .save_prune(&tmp_prune_file_data, &off_to_rm, prune_cb)?; + .save_prune(&tmp_prune_file_data, &off_to_rm)?; } // 3. Update the prune list and write to disk. diff --git a/store/src/types.rs b/store/src/types.rs index 379749a31..bfe53bf5d 100644 --- a/store/src/types.rs +++ b/store/src/types.rs @@ -20,9 +20,6 @@ use std::io::{self, BufWriter, ErrorKind, Read, Write}; use std::marker; use std::path::{Path, PathBuf}; -/// A no-op function for doing nothing with some pruned data. -pub fn prune_noop(_pruned_data: &[u8]) {} - /// Data file (MMR) wrapper around an append only file. pub struct DataFile { file: AppendOnlyFile, @@ -113,16 +110,13 @@ where } /// Write the file out to disk, pruning removed elements. - pub fn save_prune(&self, target: &str, prune_offs: &[u64], prune_cb: F) -> io::Result<()> - where - F: Fn(&[u8]), - { + pub fn save_prune(&self, target: &str, prune_offs: &[u64]) -> io::Result<()> { let prune_offs = prune_offs .iter() .map(|x| x * T::LEN as u64) .collect::>(); self.file - .save_prune(target, prune_offs.as_slice(), T::LEN as u64, prune_cb) + .save_prune(target, prune_offs.as_slice(), T::LEN as u64) } } @@ -294,15 +288,8 @@ impl AppendOnlyFile { /// Saves a copy of the current file content, skipping data at the provided /// prune indices. The prune Vec must be ordered. - pub fn save_prune( - &self, - target: P, - prune_offs: &[u64], - prune_len: u64, - prune_cb: T, - ) -> io::Result<()> + pub fn save_prune

(&self, target: P, prune_offs: &[u64], prune_len: u64) -> io::Result<()> where - T: Fn(&[u8]), P: AsRef, { if prune_offs.is_empty() { @@ -332,8 +319,6 @@ impl AppendOnlyFile { let prune_at = (prune_offs[prune_pos] - read) as usize; if prune_at != buf_start { writer.write_all(&buf[buf_start..prune_at])?; - } else { - prune_cb(&buf[buf_start..prune_at]); } buf_start = prune_at + (prune_len as usize); if prune_offs.len() > prune_pos + 1 { diff --git a/store/tests/pmmr.rs b/store/tests/pmmr.rs index 00abd7a5f..c6585e37a 100644 --- a/store/tests/pmmr.rs +++ b/store/tests/pmmr.rs @@ -26,7 +26,6 @@ use crate::core::core::pmmr::{Backend, PMMR}; use crate::core::ser::{ Error, FixedLength, PMMRIndexHashable, PMMRable, Readable, Reader, Writeable, Writer, }; -use crate::store::types::prune_noop; #[test] fn pmmr_append() { @@ -128,9 +127,7 @@ fn pmmr_compact_leaf_sibling() { assert_eq!(backend.get_from_file(1).unwrap(), pos_1_hash); // aggressively compact the PMMR files - backend - .check_compact(1, &Bitmap::create(), &prune_noop) - .unwrap(); + backend.check_compact(1, &Bitmap::create()).unwrap(); // check pos 1, 2, 3 are in the state we expect after compacting { @@ -188,9 +185,7 @@ fn pmmr_prune_compact() { } // compact - backend - .check_compact(2, &Bitmap::create(), &prune_noop) - .unwrap(); + backend.check_compact(2, &Bitmap::create()).unwrap(); // recheck the root and stored data { @@ -236,9 +231,7 @@ fn pmmr_reload() { backend.sync().unwrap(); // now check and compact the backend - backend - .check_compact(1, &Bitmap::create(), &prune_noop) - .unwrap(); + backend.check_compact(1, &Bitmap::create()).unwrap(); backend.sync().unwrap(); // prune another node to force compact to actually do something @@ -249,9 +242,7 @@ fn pmmr_reload() { } backend.sync().unwrap(); - backend - .check_compact(4, &Bitmap::create(), &prune_noop) - .unwrap(); + backend.check_compact(4, &Bitmap::create()).unwrap(); backend.sync().unwrap(); assert_eq!(backend.unpruned_size(), mmr_size); @@ -351,9 +342,7 @@ fn pmmr_rewind() { } // and compact the MMR to remove the pruned elements - backend - .check_compact(6, &Bitmap::create(), &prune_noop) - .unwrap(); + backend.check_compact(6, &Bitmap::create()).unwrap(); backend.sync().unwrap(); println!("after compacting - "); @@ -453,9 +442,7 @@ fn pmmr_compact_single_leaves() { backend.sync().unwrap(); // compact - backend - .check_compact(2, &Bitmap::create(), &prune_noop) - .unwrap(); + backend.check_compact(2, &Bitmap::create()).unwrap(); { let mut pmmr: PMMR<'_, TestElem, _> = PMMR::at(&mut backend, mmr_size); @@ -466,9 +453,7 @@ fn pmmr_compact_single_leaves() { backend.sync().unwrap(); // compact - backend - .check_compact(2, &Bitmap::create(), &prune_noop) - .unwrap(); + backend.check_compact(2, &Bitmap::create()).unwrap(); } teardown(data_dir); @@ -499,9 +484,7 @@ fn pmmr_compact_entire_peak() { backend.sync().unwrap(); // compact - backend - .check_compact(2, &Bitmap::create(), &prune_noop) - .unwrap(); + backend.check_compact(2, &Bitmap::create()).unwrap(); // now check we have pruned up to and including the peak at pos 7 // hash still available in underlying hash file @@ -587,9 +570,7 @@ fn pmmr_compact_horizon() { } // compact - backend - .check_compact(4, &Bitmap::of(&vec![1, 2]), &prune_noop) - .unwrap(); + backend.check_compact(4, &Bitmap::of(&vec![1, 2])).unwrap(); backend.sync().unwrap(); // check we can read a hash by pos correctly after compaction @@ -644,9 +625,7 @@ fn pmmr_compact_horizon() { } // compact some more - backend - .check_compact(9, &Bitmap::create(), &prune_noop) - .unwrap(); + backend.check_compact(9, &Bitmap::create()).unwrap(); } // recheck stored data @@ -711,9 +690,7 @@ fn compact_twice() { } // compact - backend - .check_compact(2, &Bitmap::create(), &prune_noop) - .unwrap(); + backend.check_compact(2, &Bitmap::create()).unwrap(); // recheck the root and stored data { @@ -740,9 +717,7 @@ fn compact_twice() { } // compact - backend - .check_compact(2, &Bitmap::create(), &prune_noop) - .unwrap(); + backend.check_compact(2, &Bitmap::create()).unwrap(); // recheck the root and stored data { diff --git a/wallet/src/lmdb_wallet.rs b/wallet/src/lmdb_wallet.rs index 39eabdea8..6a8493f50 100644 --- a/wallet/src/lmdb_wallet.rs +++ b/wallet/src/lmdb_wallet.rs @@ -242,7 +242,7 @@ where } fn iter<'a>(&'a self) -> Box + 'a> { - Box::new(self.db.iter(&[OUTPUT_PREFIX]).unwrap()) + Box::new(self.db.iter(&[OUTPUT_PREFIX]).unwrap().map(|(_, v)| v)) } fn get_tx_log_entry(&self, u: &Uuid) -> Result, Error> { @@ -251,7 +251,12 @@ where } fn tx_log_iter<'a>(&'a self) -> Box + 'a> { - Box::new(self.db.iter(&[TX_LOG_ENTRY_PREFIX]).unwrap()) + Box::new( + self.db + .iter(&[TX_LOG_ENTRY_PREFIX]) + .unwrap() + .map(|(_, v)| v), + ) } fn get_private_context(&mut self, slate_id: &[u8]) -> Result { @@ -272,7 +277,12 @@ where } fn acct_path_iter<'a>(&'a self) -> Box + 'a> { - Box::new(self.db.iter(&[ACCOUNT_PATH_MAPPING_PREFIX]).unwrap()) + Box::new( + self.db + .iter(&[ACCOUNT_PATH_MAPPING_PREFIX]) + .unwrap() + .map(|(_, v)| v), + ) } fn get_acct_path(&self, label: String) -> Result, Error> { @@ -418,7 +428,8 @@ where .as_ref() .unwrap() .iter(&[OUTPUT_PREFIX]) - .unwrap(), + .unwrap() + .map(|(_, v)| v), ) } @@ -456,7 +467,8 @@ where .as_ref() .unwrap() .iter(&[TX_LOG_ENTRY_PREFIX]) - .unwrap(), + .unwrap() + .map(|(_, v)| v), ) } @@ -525,7 +537,8 @@ where .as_ref() .unwrap() .iter(&[ACCOUNT_PATH_MAPPING_PREFIX]) - .unwrap(), + .unwrap() + .map(|(_, v)| v), ) }