Simplify (and fix) output_pos cleanup during chain compaction (#2609)

* expose leaf pos iterator
use it for various things in txhashset when iterating over outputs

* fix

* cleanup

* rebuild output_pos index (and clear it out first) when compacting the chain

* fixup tests

* refactor to match on (output, proof) tuple

* add comments to compact() to explain what is going on.

* get rid of some boxing around the leaf_set iterator

* cleanup
This commit is contained in:
Antioch Peverell 2019-02-27 21:02:54 +00:00 committed by GitHub
parent fe9fa51f32
commit 27c43c42a2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 269 additions and 205 deletions

View file

@ -935,35 +935,22 @@ impl Chain {
Ok(())
}
fn compact_txhashset(&self) -> Result<(), Error> {
debug!("Starting txhashset compaction...");
{
// Note: We take a lock on the stop_state here and do not release it until
// we have finished processing this chain compaction operation.
let stop_lock = self.stop_state.lock();
if stop_lock.is_stopped() {
return Err(ErrorKind::Stopped.into());
}
let mut txhashset = self.txhashset.write();
txhashset.compact()?;
}
debug!("... finished txhashset compaction.");
Ok(())
}
/// Cleanup old blocks from the db.
/// Determine the cutoff height from the horizon and the current block height.
/// *Only* runs if we are not in archive mode.
fn compact_blocks_db(&self) -> Result<(), Error> {
fn remove_historical_blocks(
&self,
txhashset: &txhashset::TxHashSet,
batch: &mut store::Batch<'_>,
) -> Result<(), Error> {
if self.archive_mode {
return Ok(());
}
let horizon = global::cut_through_horizon() as u64;
let head = self.head()?;
let head = batch.head()?;
let tail = match self.tail() {
let tail = match batch.tail() {
Ok(tail) => tail,
Err(_) => Tip::from_header(&self.genesis),
};
@ -971,7 +958,7 @@ impl Chain {
let cutoff = head.height.saturating_sub(horizon);
debug!(
"compact_blocks_db: head height: {}, tail height: {}, horizon: {}, cutoff: {}",
"remove_historical_blocks: head height: {}, tail height: {}, horizon: {}, cutoff: {}",
head.height, tail.height, horizon, cutoff,
);
@ -981,10 +968,12 @@ impl Chain {
let mut count = 0;
let tail = self.get_header_by_height(head.height - horizon)?;
let mut current = self.get_header_by_height(head.height - horizon - 1)?;
let tail_hash = txhashset.get_header_hash_by_height(head.height - horizon)?;
let tail = batch.get_block_header(&tail_hash)?;
let current_hash = txhashset.get_header_hash_by_height(head.height - horizon - 1)?;
let mut current = batch.get_block_header(&current_hash)?;
let batch = self.store.batch()?;
loop {
// Go to the store directly so we can handle NotFoundErr robustly.
match self.store.get_block(&current.hash()) {
@ -1011,11 +1000,12 @@ impl Chain {
}
}
batch.save_body_tail(&Tip::from_header(&tail))?;
batch.commit()?;
debug!(
"compact_blocks_db: removed {} blocks. tail height: {}",
"remove_historical_blocks: removed {} blocks. tail height: {}",
count, tail.height
);
Ok(())
}
@ -1025,12 +1015,35 @@ impl Chain {
/// * removes historical blocks and associated data from the db (unless archive mode)
///
pub fn compact(&self) -> Result<(), Error> {
self.compact_txhashset()?;
if !self.archive_mode {
self.compact_blocks_db()?;
// Note: We take a lock on the stop_state here and do not release it until
// we have finished processing this chain compaction operation.
// We want to avoid shutting the node down in the middle of compacting the data.
let stop_lock = self.stop_state.lock();
if stop_lock.is_stopped() {
return Err(ErrorKind::Stopped.into());
}
// Take a write lock on the txhashet and start a new writeable db batch.
let mut txhashset = self.txhashset.write();
let mut batch = self.store.batch()?;
// Compact the txhashset itself (rewriting the pruned backend files).
txhashset.compact(&mut batch)?;
// Rebuild our output_pos index in the db based on current UTXO set.
txhashset::extending(&mut txhashset, &mut batch, |extension| {
extension.rebuild_index()?;
Ok(())
})?;
// If we are not in archival mode remove historical blocks from the db.
if !self.archive_mode {
self.remove_historical_blocks(&txhashset, &mut batch)?;
}
// Commit all the above db changes.
batch.commit()?;
Ok(())
}
@ -1143,12 +1156,20 @@ impl Chain {
}
/// Gets the block header at the provided height.
/// Note: This takes a read lock on the txhashset.
/// Note: Takes a read lock on the txhashset.
/// Take care not to call this repeatedly in a tight loop.
pub fn get_header_by_height(&self, height: u64) -> Result<BlockHeader, Error> {
let hash = self.get_header_hash_by_height(height)?;
self.get_block_header(&hash)
}
/// Gets the header hash at the provided height.
/// Note: Takes a read lock on the txhashset.
/// Take care not to call this repeatedly in a tight loop.
fn get_header_hash_by_height(&self, height: u64) -> Result<Hash, Error> {
let txhashset = self.txhashset.read();
let header = txhashset.get_header_by_height(height)?;
Ok(header)
let hash = txhashset.get_header_hash_by_height(height)?;
Ok(hash)
}
/// Gets the block header in which a given output appears in the txhashset.

View file

@ -89,9 +89,15 @@ pub enum ErrorKind {
/// Error validating a Merkle proof (coinbase output)
#[fail(display = "Error validating merkle proof")]
MerkleProof,
/// output not found
/// Output not found
#[fail(display = "Output not found")]
OutputNotFound,
/// Rangeproof not found
#[fail(display = "Rangeproof not found")]
RangeproofNotFound,
/// Tx kernel not found
#[fail(display = "Tx kernel not found")]
TxKernelNotFound,
/// output spent
#[fail(display = "Output is spent")]
OutputSpent,

View file

@ -51,12 +51,13 @@ impl ChainStore {
}
}
#[allow(missing_docs)]
impl ChainStore {
/// The current chain head.
pub fn head(&self) -> Result<Tip, Error> {
option_to_not_found(self.db.get_ser(&vec![HEAD_PREFIX]), "HEAD")
}
/// The current chain "tail" (earliest block in the store).
pub fn tail(&self) -> Result<Tip, Error> {
option_to_not_found(self.db.get_ser(&vec![TAIL_PREFIX]), "TAIL")
}
@ -71,10 +72,12 @@ impl ChainStore {
option_to_not_found(self.db.get_ser(&vec![HEADER_HEAD_PREFIX]), "HEADER_HEAD")
}
/// The "sync" head.
pub fn get_sync_head(&self) -> Result<Tip, Error> {
option_to_not_found(self.db.get_ser(&vec![SYNC_HEAD_PREFIX]), "SYNC_HEAD")
}
/// Get full block.
pub fn get_block(&self, h: &Hash) -> Result<Block, Error> {
option_to_not_found(
self.db.get_ser(&to_key(BLOCK_PREFIX, &mut h.to_vec())),
@ -82,10 +85,12 @@ impl ChainStore {
)
}
/// Does this full block exist?
pub fn block_exists(&self, h: &Hash) -> Result<bool, Error> {
self.db.exists(&to_key(BLOCK_PREFIX, &mut h.to_vec()))
}
/// Get block_sums for the block hash.
pub fn get_block_sums(&self, h: &Hash) -> Result<BlockSums, Error> {
option_to_not_found(
self.db.get_ser(&to_key(BLOCK_SUMS_PREFIX, &mut h.to_vec())),
@ -93,10 +98,12 @@ impl ChainStore {
)
}
/// Get previous header.
pub fn get_previous_header(&self, header: &BlockHeader) -> Result<BlockHeader, Error> {
self.get_block_header(&header.prev_hash)
}
/// Get block header.
pub fn get_block_header(&self, h: &Hash) -> Result<BlockHeader, Error> {
option_to_not_found(
self.db
@ -105,6 +112,7 @@ impl ChainStore {
)
}
/// Get PMMR pos for the given output commitment.
pub fn get_output_pos(&self, commit: &Commitment) -> Result<u64, Error> {
option_to_not_found(
self.db
@ -127,12 +135,13 @@ pub struct Batch<'a> {
db: store::Batch<'a>,
}
#[allow(missing_docs)]
impl<'a> Batch<'a> {
/// The head.
pub fn head(&self) -> Result<Tip, Error> {
option_to_not_found(self.db.get_ser(&vec![HEAD_PREFIX]), "HEAD")
}
/// The tail.
pub fn tail(&self) -> Result<Tip, Error> {
option_to_not_found(self.db.get_ser(&vec![TAIL_PREFIX]), "TAIL")
}
@ -147,27 +156,33 @@ impl<'a> Batch<'a> {
option_to_not_found(self.db.get_ser(&vec![HEADER_HEAD_PREFIX]), "HEADER_HEAD")
}
/// Get "sync" head.
pub fn get_sync_head(&self) -> Result<Tip, Error> {
option_to_not_found(self.db.get_ser(&vec![SYNC_HEAD_PREFIX]), "SYNC_HEAD")
}
/// Save head to db.
pub fn save_head(&self, t: &Tip) -> Result<(), Error> {
self.db.put_ser(&vec![HEAD_PREFIX], t)?;
self.db.put_ser(&vec![HEADER_HEAD_PREFIX], t)
}
/// Save body head to db.
pub fn save_body_head(&self, t: &Tip) -> Result<(), Error> {
self.db.put_ser(&vec![HEAD_PREFIX], t)
}
/// Save body "tail" to db.
pub fn save_body_tail(&self, t: &Tip) -> Result<(), Error> {
self.db.put_ser(&vec![TAIL_PREFIX], t)
}
/// Save header_head to db.
pub fn save_header_head(&self, t: &Tip) -> Result<(), Error> {
self.db.put_ser(&vec![HEADER_HEAD_PREFIX], t)
}
/// Save "sync" head to db.
pub fn save_sync_head(&self, t: &Tip) -> Result<(), Error> {
self.db.put_ser(&vec![SYNC_HEAD_PREFIX], t)
}
@ -192,6 +207,7 @@ impl<'a> Batch<'a> {
)
}
/// Does the block exist?
pub fn block_exists(&self, h: &Hash) -> Result<bool, Error> {
self.db.exists(&to_key(BLOCK_PREFIX, &mut h.to_vec()))
}
@ -225,6 +241,7 @@ impl<'a> Batch<'a> {
Ok(())
}
/// Save block header to db.
pub fn save_block_header(&self, header: &BlockHeader) -> Result<(), Error> {
let hash = header.hash();
@ -235,6 +252,7 @@ impl<'a> Batch<'a> {
Ok(())
}
/// Save output_pos to index.
pub fn save_output_pos(&self, commit: &Commitment, pos: u64) -> Result<(), Error> {
self.db.put_ser(
&to_key(COMMIT_POS_PREFIX, &mut commit.as_ref().to_vec())[..],
@ -242,6 +260,7 @@ impl<'a> Batch<'a> {
)
}
/// Get output_pos from index.
pub fn get_output_pos(&self, commit: &Commitment) -> Result<u64, Error> {
option_to_not_found(
self.db
@ -250,15 +269,21 @@ impl<'a> Batch<'a> {
)
}
pub fn delete_output_pos(&self, commit: &[u8]) -> Result<(), Error> {
self.db
.delete(&to_key(COMMIT_POS_PREFIX, &mut commit.to_vec()))
/// Clear all entries from the output_pos index (must be rebuilt after).
pub fn clear_output_pos(&self) -> Result<(), Error> {
let key = to_key(COMMIT_POS_PREFIX, &mut "".to_string().into_bytes());
for (k, _) in self.db.iter::<u64>(&key).unwrap() {
self.db.delete(&k)?;
}
Ok(())
}
/// Get the previous header.
pub fn get_previous_header(&self, header: &BlockHeader) -> Result<BlockHeader, Error> {
self.get_block_header(&header.prev_hash)
}
/// Get block header.
pub fn get_block_header(&self, h: &Hash) -> Result<BlockHeader, Error> {
option_to_not_found(
self.db
@ -267,6 +292,7 @@ impl<'a> Batch<'a> {
)
}
/// Save the input bitmap for the block.
fn save_block_input_bitmap(&self, bh: &Hash, bm: &Bitmap) -> Result<(), Error> {
self.db.put(
&to_key(BLOCK_INPUT_BITMAP_PREFIX, &mut bh.to_vec())[..],
@ -274,16 +300,19 @@ impl<'a> Batch<'a> {
)
}
/// Delete the block input bitmap.
fn delete_block_input_bitmap(&self, bh: &Hash) -> Result<(), Error> {
self.db
.delete(&to_key(BLOCK_INPUT_BITMAP_PREFIX, &mut bh.to_vec()))
}
/// Save block_sums for the block.
pub fn save_block_sums(&self, h: &Hash, sums: &BlockSums) -> Result<(), Error> {
self.db
.put_ser(&to_key(BLOCK_SUMS_PREFIX, &mut h.to_vec())[..], &sums)
}
/// Get block_sums for the block.
pub fn get_block_sums(&self, h: &Hash) -> Result<BlockSums, Error> {
option_to_not_found(
self.db.get_ser(&to_key(BLOCK_SUMS_PREFIX, &mut h.to_vec())),
@ -291,10 +320,12 @@ impl<'a> Batch<'a> {
)
}
/// Delete the block_sums for the block.
fn delete_block_sums(&self, bh: &Hash) -> Result<(), Error> {
self.db.delete(&to_key(BLOCK_SUMS_PREFIX, &mut bh.to_vec()))
}
/// Build the input bitmap for the given block.
fn build_block_input_bitmap(&self, block: &Block) -> Result<Bitmap, Error> {
let bitmap = block
.inputs()
@ -305,6 +336,7 @@ impl<'a> Batch<'a> {
Ok(bitmap)
}
/// Build and store the input bitmap for the given block.
fn build_and_store_block_input_bitmap(&self, block: &Block) -> Result<Bitmap, Error> {
// Build the bitmap.
let bitmap = self.build_block_input_bitmap(block)?;
@ -315,8 +347,8 @@ impl<'a> Batch<'a> {
Ok(bitmap)
}
// Get the block input bitmap from the db or build the bitmap from
// the full block from the db (if the block is found).
/// Get the block input bitmap from the db or build the bitmap from
/// the full block from the db (if the block is found).
pub fn get_block_input_bitmap(&self, bh: &Hash) -> Result<Bitmap, Error> {
if let Ok(Some(bytes)) = self
.db

View file

@ -33,7 +33,6 @@ use crate::util::{file, secp_static, zip};
use croaring::Bitmap;
use grin_store;
use grin_store::pmmr::{PMMRBackend, PMMR_FILES};
use grin_store::types::prune_noop;
use std::collections::HashSet;
use std::fs::{self, File};
use std::path::{Path, PathBuf};
@ -206,21 +205,27 @@ impl TxHashSet {
.get_last_n_insertions(distance)
}
/// Get the header at the specified height based on the current state of the txhashset.
/// Derives the MMR pos from the height (insertion index) and retrieves the header hash.
/// Looks the header up in the db by hash.
pub fn get_header_by_height(&self, height: u64) -> Result<BlockHeader, Error> {
/// Get the header hash at the specified height based on the current state of the txhashset.
pub fn get_header_hash_by_height(&self, height: u64) -> Result<Hash, Error> {
let pos = pmmr::insertion_to_pmmr_index(height + 1);
let header_pmmr =
ReadonlyPMMR::at(&self.header_pmmr_h.backend, self.header_pmmr_h.last_pos);
if let Some(entry) = header_pmmr.get_data(pos) {
let header = self.commit_index.get_block_header(&entry.hash())?;
Ok(header)
Ok(entry.hash())
} else {
Err(ErrorKind::Other(format!("get header by height")).into())
Err(ErrorKind::Other(format!("get header hash by height")).into())
}
}
/// Get the header at the specified height based on the current state of the txhashset.
/// Derives the MMR pos from the height (insertion index) and retrieves the header hash.
/// Looks the header up in the db by hash.
pub fn get_header_by_height(&self, height: u64) -> Result<BlockHeader, Error> {
let hash = self.get_header_hash_by_height(height)?;
let header = self.commit_index.get_block_header(&hash)?;
Ok(header)
}
/// returns outputs from the given insertion (leaf) index up to the
/// specified limit. Also returns the last index actually populated
pub fn outputs_by_insertion_index(
@ -280,39 +285,30 @@ impl TxHashSet {
}
/// Compact the MMR data files and flush the rm logs
pub fn compact(&mut self) -> Result<(), Error> {
let commit_index = self.commit_index.clone();
let head_header = commit_index.head_header()?;
pub fn compact(&mut self, batch: &mut Batch<'_>) -> Result<(), Error> {
debug!("txhashset: starting compaction...");
let head_header = batch.head_header()?;
let current_height = head_header.height;
// horizon for compacting is based on current_height
let horizon = current_height.saturating_sub(global::cut_through_horizon().into());
let horizon_header = self.get_header_by_height(horizon)?;
let horizon_height = current_height.saturating_sub(global::cut_through_horizon().into());
let horizon_hash = self.get_header_hash_by_height(horizon_height)?;
let horizon_header = batch.get_block_header(&horizon_hash)?;
let batch = self.commit_index.batch()?;
let rewind_rm_pos = input_pos_to_rewind(&horizon_header, &head_header, batch)?;
let rewind_rm_pos = input_pos_to_rewind(&horizon_header, &head_header, &batch)?;
debug!("txhashset: check_compact output mmr backend...");
self.output_pmmr_h
.backend
.check_compact(horizon_header.output_mmr_size, &rewind_rm_pos)?;
{
let clean_output_index = |commit: &[u8]| {
let _ = batch.delete_output_pos(commit);
};
debug!("txhashset: check_compact rangeproof mmr backend...");
self.rproof_pmmr_h
.backend
.check_compact(horizon_header.output_mmr_size, &rewind_rm_pos)?;
self.output_pmmr_h.backend.check_compact(
horizon_header.output_mmr_size,
&rewind_rm_pos,
clean_output_index,
)?;
self.rproof_pmmr_h.backend.check_compact(
horizon_header.output_mmr_size,
&rewind_rm_pos,
&prune_noop,
)?;
}
// Finally commit the batch, saving everything to the db.
batch.commit()?;
debug!("txhashset: ... compaction finished");
Ok(())
}
@ -797,13 +793,11 @@ impl<'a> Committed for Extension<'a> {
fn outputs_committed(&self) -> Vec<Commitment> {
let mut commitments = vec![];
for n in 1..self.output_pmmr.unpruned_size() + 1 {
if pmmr::is_leaf(n) {
if let Some(out) = self.output_pmmr.get_data(n) {
for pos in self.output_pmmr.leaf_pos_iter() {
if let Some(out) = self.output_pmmr.get_data(pos) {
commitments.push(out.commit);
}
}
}
commitments
}
@ -1259,20 +1253,18 @@ impl<'a> Extension<'a> {
pub fn rebuild_index(&self) -> Result<(), Error> {
let now = Instant::now();
let mut count = 0;
self.batch.clear_output_pos()?;
for n in 1..self.output_pmmr.unpruned_size() + 1 {
// non-pruned leaves only
if pmmr::bintree_postorder_height(n) == 0 {
if let Some(out) = self.output_pmmr.get_data(n) {
self.batch.save_output_pos(&out.commit, n)?;
let mut count = 0;
for pos in self.output_pmmr.leaf_pos_iter() {
if let Some(out) = self.output_pmmr.get_data(pos) {
self.batch.save_output_pos(&out.commit, pos)?;
count += 1;
}
}
}
debug!(
"txhashset: rebuild_index ({} UTXOs), took {}s",
"txhashset: rebuild_index: {} UTXOs, took {}s",
count,
now.elapsed().as_secs(),
);
@ -1325,14 +1317,24 @@ impl<'a> Extension<'a> {
let total_kernels = pmmr::n_leaves(self.kernel_pmmr.unpruned_size());
for n in 1..self.kernel_pmmr.unpruned_size() + 1 {
if pmmr::is_leaf(n) {
if let Some(kernel) = self.kernel_pmmr.get_data(n) {
let kernel = self
.kernel_pmmr
.get_data(n)
.ok_or::<Error>(ErrorKind::TxKernelNotFound.into())?;
kernel.verify()?;
kern_count += 1;
}
}
if n % 20 == 0 {
if kern_count % 20 == 0 {
status.on_validation(kern_count, total_kernels, 0, 0);
}
if kern_count % 1_000 == 0 {
debug!(
"txhashset: verify_kernel_signatures: verified {} signatures",
kern_count,
);
}
}
}
debug!(
@ -1353,19 +1355,24 @@ impl<'a> Extension<'a> {
let mut proof_count = 0;
let total_rproofs = pmmr::n_leaves(self.output_pmmr.unpruned_size());
for n in 1..self.output_pmmr.unpruned_size() + 1 {
if pmmr::is_leaf(n) {
if let Some(out) = self.output_pmmr.get_data(n) {
if let Some(rp) = self.rproof_pmmr.get_data(n) {
commits.push(out.commit);
proofs.push(rp);
} else {
// TODO - rangeproof not found
return Err(ErrorKind::OutputNotFound.into());
for pos in self.output_pmmr.leaf_pos_iter() {
let output = self.output_pmmr.get_data(pos);
let proof = self.rproof_pmmr.get_data(pos);
// Output and corresponding rangeproof *must* exist.
// It is invalid for either to be missing and we fail immediately in this case.
match (output, proof) {
(None, _) => return Err(ErrorKind::OutputNotFound.into()),
(_, None) => return Err(ErrorKind::RangeproofNotFound.into()),
(Some(output), Some(proof)) => {
commits.push(output.commit);
proofs.push(proof);
}
}
proof_count += 1;
if proofs.len() >= 1000 {
if proofs.len() >= 1_000 {
Output::batch_verify_proofs(&commits, &proofs)?;
commits.clear();
proofs.clear();
@ -1374,9 +1381,8 @@ impl<'a> Extension<'a> {
proof_count,
);
}
}
}
if n % 20 == 0 {
if proof_count % 20 == 0 {
status.on_validation(0, 0, proof_count, total_rproofs);
}
}

View file

@ -51,6 +51,9 @@ pub trait Backend<T: PMMRable> {
/// (ignoring the remove log).
fn get_data_from_file(&self, position: u64) -> Option<T::E>;
/// Iterator over current (unpruned, unremoved) leaf positions.
fn leaf_pos_iter(&self) -> Box<Iterator<Item = u64> + '_>;
/// Remove Hash by insertion position. An index is also provided so the
/// underlying backend can implement some rollback of positions up to a
/// given index (practically the index is the height of a block that

View file

@ -75,6 +75,11 @@ where
ReadonlyPMMR::at(&self.backend, self.last_pos)
}
/// Iterator over current (unpruned, unremoved) leaf positions.
pub fn leaf_pos_iter(&self) -> impl Iterator<Item = u64> + '_ {
self.backend.leaf_pos_iter()
}
/// Returns a vec of the peaks of this MMR.
pub fn peaks(&self) -> Vec<Hash> {
let peaks_pos = peaks(self.last_pos);

View file

@ -104,6 +104,10 @@ impl<T: PMMRable> Backend<T> for VecBackend<T> {
Some(data.as_elmt())
}
fn leaf_pos_iter(&self) -> Box<Iterator<Item = u64> + '_> {
unimplemented!()
}
fn remove(&mut self, position: u64) -> Result<(), String> {
self.remove_list.push(position);
Ok(())

View file

@ -157,6 +157,7 @@ impl PeerStore {
let mut peers = self
.db
.iter::<PeerData>(&to_key(PEER_PREFIX, &mut "".to_string().into_bytes()))?
.map(|(_, v)| v)
.filter(|p| p.flags == state && p.capabilities.contains(cap))
.collect::<Vec<_>>();
thread_rng().shuffle(&mut peers[..]);
@ -167,7 +168,10 @@ impl PeerStore {
/// Used for /v1/peers/all api endpoint
pub fn all_peers(&self) -> Result<Vec<PeerData>, Error> {
let key = to_key(PEER_PREFIX, &mut "".to_string().into_bytes());
Ok(self.db.iter::<PeerData>(&key)?.collect::<Vec<_>>())
Ok(self.db
.iter::<PeerData>(&key)?
.map(|(_, v)| v)
.collect::<Vec<_>>())
}
/// Convenience method to load a peer data, update its status and save it

View file

@ -199,4 +199,9 @@ impl LeafSet {
pub fn is_empty(&self) -> bool {
self.len() == 0
}
/// Iterator over positionns in the leaf_set (all leaf positions).
pub fn iter(&self) -> impl Iterator<Item = u64> + '_ {
self.bitmap.iter().map(|x| x as u64)
}
}

View file

@ -162,8 +162,8 @@ impl Store {
res.to_opt().map(|r| r.is_some()).map_err(From::from)
}
/// Produces an iterator of `Readable` types moving forward from the
/// provided key.
/// Produces an iterator of (key, value) pairs, where values are `Readable` types
/// moving forward from the provided key.
pub fn iter<T: ser::Readable>(&self, from: &[u8]) -> Result<SerIterator<T>, Error> {
let tx = Arc::new(lmdb::ReadTransaction::new(self.env.clone())?);
let cursor = Arc::new(tx.cursor(self.db.clone())?);
@ -273,9 +273,9 @@ impl<T> Iterator for SerIterator<T>
where
T: ser::Readable,
{
type Item = T;
type Item = (Vec<u8>, T);
fn next(&mut self) -> Option<T> {
fn next(&mut self) -> Option<(Vec<u8>, T)> {
let access = self.tx.access();
let kv = if self.seek {
Arc::get_mut(&mut self.cursor).unwrap().next(&access)
@ -285,7 +285,10 @@ where
.unwrap()
.seek_range_k(&access, &self.prefix[..])
};
self.deser_if_prefix_match(kv)
match kv {
Ok((k, v)) => self.deser_if_prefix_match(k, v),
Err(_) => None,
}
}
}
@ -293,17 +296,16 @@ impl<T> SerIterator<T>
where
T: ser::Readable,
{
fn deser_if_prefix_match(&self, kv: Result<(&[u8], &[u8]), lmdb::Error>) -> Option<T> {
match kv {
Ok((k, v)) => {
fn deser_if_prefix_match(&self, key: &[u8], value: &[u8]) -> Option<(Vec<u8>, T)> {
let plen = self.prefix.len();
if plen == 0 || k[0..plen] == self.prefix[..] {
ser::deserialize(&mut &v[..]).ok()
if plen == 0 || key[0..plen] == self.prefix[..] {
if let Ok(value) = ser::deserialize(&mut &value[..]) {
Some((key.to_vec(), value))
} else {
None
}
} else {
None
}
}
Err(_) => None,
}
}
}

View file

@ -21,7 +21,7 @@ use crate::core::core::BlockHeader;
use crate::core::ser::PMMRable;
use crate::leaf_set::LeafSet;
use crate::prune_list::PruneList;
use crate::types::{prune_noop, DataFile};
use crate::types::DataFile;
use croaring::Bitmap;
use std::path::{Path, PathBuf};
@ -121,6 +121,17 @@ impl<T: PMMRable> Backend<T> for PMMRBackend<T> {
self.get_data_from_file(pos)
}
/// Returns an iterator over all the leaf positions.
/// for a prunable PMMR this is an iterator over the leaf_set bitmap.
/// For a non-prunable PMMR this is *all* leaves (this is not yet implemented).
fn leaf_pos_iter(&self) -> Box<Iterator<Item = u64> + '_> {
if self.prunable {
Box::new(self.leaf_set.iter())
} else {
panic!("leaf_pos_iter not implemented for non-prunable PMMR")
}
}
/// Rewind the PMMR backend to the given position.
fn rewind(&mut self, position: u64, rewind_rm_pos: &Bitmap) -> Result<(), String> {
// First rewind the leaf_set with the necessary added and removed positions.
@ -278,15 +289,7 @@ impl<T: PMMRable> PMMRBackend<T> {
/// aligned. The block_marker in the db/index for the particular block
/// will have a suitable output_pos. This is used to enforce a horizon
/// after which the local node should have all the data to allow rewinding.
pub fn check_compact<P>(
&mut self,
cutoff_pos: u64,
rewind_rm_pos: &Bitmap,
prune_cb: P,
) -> io::Result<bool>
where
P: Fn(&[u8]),
{
pub fn check_compact(&mut self, cutoff_pos: u64, rewind_rm_pos: &Bitmap) -> io::Result<bool> {
assert!(self.prunable, "Trying to compact a non-prunable PMMR");
// Paths for tmp hash and data files.
@ -306,7 +309,7 @@ impl<T: PMMRable> PMMRBackend<T> {
});
self.hash_file
.save_prune(&tmp_prune_file_hash, &off_to_rm, &prune_noop)?;
.save_prune(&tmp_prune_file_hash, &off_to_rm)?;
}
// 2. Save compact copy of the data file, skipping removed leaves.
@ -324,7 +327,7 @@ impl<T: PMMRable> PMMRBackend<T> {
});
self.data_file
.save_prune(&tmp_prune_file_data, &off_to_rm, prune_cb)?;
.save_prune(&tmp_prune_file_data, &off_to_rm)?;
}
// 3. Update the prune list and write to disk.

View file

@ -20,9 +20,6 @@ use std::io::{self, BufWriter, ErrorKind, Read, Write};
use std::marker;
use std::path::{Path, PathBuf};
/// A no-op function for doing nothing with some pruned data.
pub fn prune_noop(_pruned_data: &[u8]) {}
/// Data file (MMR) wrapper around an append only file.
pub struct DataFile<T> {
file: AppendOnlyFile,
@ -113,16 +110,13 @@ where
}
/// Write the file out to disk, pruning removed elements.
pub fn save_prune<F>(&self, target: &str, prune_offs: &[u64], prune_cb: F) -> io::Result<()>
where
F: Fn(&[u8]),
{
pub fn save_prune(&self, target: &str, prune_offs: &[u64]) -> io::Result<()> {
let prune_offs = prune_offs
.iter()
.map(|x| x * T::LEN as u64)
.collect::<Vec<_>>();
self.file
.save_prune(target, prune_offs.as_slice(), T::LEN as u64, prune_cb)
.save_prune(target, prune_offs.as_slice(), T::LEN as u64)
}
}
@ -294,15 +288,8 @@ impl AppendOnlyFile {
/// Saves a copy of the current file content, skipping data at the provided
/// prune indices. The prune Vec must be ordered.
pub fn save_prune<T, P>(
&self,
target: P,
prune_offs: &[u64],
prune_len: u64,
prune_cb: T,
) -> io::Result<()>
pub fn save_prune<P>(&self, target: P, prune_offs: &[u64], prune_len: u64) -> io::Result<()>
where
T: Fn(&[u8]),
P: AsRef<Path>,
{
if prune_offs.is_empty() {
@ -332,8 +319,6 @@ impl AppendOnlyFile {
let prune_at = (prune_offs[prune_pos] - read) as usize;
if prune_at != buf_start {
writer.write_all(&buf[buf_start..prune_at])?;
} else {
prune_cb(&buf[buf_start..prune_at]);
}
buf_start = prune_at + (prune_len as usize);
if prune_offs.len() > prune_pos + 1 {

View file

@ -26,7 +26,6 @@ use crate::core::core::pmmr::{Backend, PMMR};
use crate::core::ser::{
Error, FixedLength, PMMRIndexHashable, PMMRable, Readable, Reader, Writeable, Writer,
};
use crate::store::types::prune_noop;
#[test]
fn pmmr_append() {
@ -128,9 +127,7 @@ fn pmmr_compact_leaf_sibling() {
assert_eq!(backend.get_from_file(1).unwrap(), pos_1_hash);
// aggressively compact the PMMR files
backend
.check_compact(1, &Bitmap::create(), &prune_noop)
.unwrap();
backend.check_compact(1, &Bitmap::create()).unwrap();
// check pos 1, 2, 3 are in the state we expect after compacting
{
@ -188,9 +185,7 @@ fn pmmr_prune_compact() {
}
// compact
backend
.check_compact(2, &Bitmap::create(), &prune_noop)
.unwrap();
backend.check_compact(2, &Bitmap::create()).unwrap();
// recheck the root and stored data
{
@ -236,9 +231,7 @@ fn pmmr_reload() {
backend.sync().unwrap();
// now check and compact the backend
backend
.check_compact(1, &Bitmap::create(), &prune_noop)
.unwrap();
backend.check_compact(1, &Bitmap::create()).unwrap();
backend.sync().unwrap();
// prune another node to force compact to actually do something
@ -249,9 +242,7 @@ fn pmmr_reload() {
}
backend.sync().unwrap();
backend
.check_compact(4, &Bitmap::create(), &prune_noop)
.unwrap();
backend.check_compact(4, &Bitmap::create()).unwrap();
backend.sync().unwrap();
assert_eq!(backend.unpruned_size(), mmr_size);
@ -351,9 +342,7 @@ fn pmmr_rewind() {
}
// and compact the MMR to remove the pruned elements
backend
.check_compact(6, &Bitmap::create(), &prune_noop)
.unwrap();
backend.check_compact(6, &Bitmap::create()).unwrap();
backend.sync().unwrap();
println!("after compacting - ");
@ -453,9 +442,7 @@ fn pmmr_compact_single_leaves() {
backend.sync().unwrap();
// compact
backend
.check_compact(2, &Bitmap::create(), &prune_noop)
.unwrap();
backend.check_compact(2, &Bitmap::create()).unwrap();
{
let mut pmmr: PMMR<'_, TestElem, _> = PMMR::at(&mut backend, mmr_size);
@ -466,9 +453,7 @@ fn pmmr_compact_single_leaves() {
backend.sync().unwrap();
// compact
backend
.check_compact(2, &Bitmap::create(), &prune_noop)
.unwrap();
backend.check_compact(2, &Bitmap::create()).unwrap();
}
teardown(data_dir);
@ -499,9 +484,7 @@ fn pmmr_compact_entire_peak() {
backend.sync().unwrap();
// compact
backend
.check_compact(2, &Bitmap::create(), &prune_noop)
.unwrap();
backend.check_compact(2, &Bitmap::create()).unwrap();
// now check we have pruned up to and including the peak at pos 7
// hash still available in underlying hash file
@ -587,9 +570,7 @@ fn pmmr_compact_horizon() {
}
// compact
backend
.check_compact(4, &Bitmap::of(&vec![1, 2]), &prune_noop)
.unwrap();
backend.check_compact(4, &Bitmap::of(&vec![1, 2])).unwrap();
backend.sync().unwrap();
// check we can read a hash by pos correctly after compaction
@ -644,9 +625,7 @@ fn pmmr_compact_horizon() {
}
// compact some more
backend
.check_compact(9, &Bitmap::create(), &prune_noop)
.unwrap();
backend.check_compact(9, &Bitmap::create()).unwrap();
}
// recheck stored data
@ -711,9 +690,7 @@ fn compact_twice() {
}
// compact
backend
.check_compact(2, &Bitmap::create(), &prune_noop)
.unwrap();
backend.check_compact(2, &Bitmap::create()).unwrap();
// recheck the root and stored data
{
@ -740,9 +717,7 @@ fn compact_twice() {
}
// compact
backend
.check_compact(2, &Bitmap::create(), &prune_noop)
.unwrap();
backend.check_compact(2, &Bitmap::create()).unwrap();
// recheck the root and stored data
{

View file

@ -242,7 +242,7 @@ where
}
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = OutputData> + 'a> {
Box::new(self.db.iter(&[OUTPUT_PREFIX]).unwrap())
Box::new(self.db.iter(&[OUTPUT_PREFIX]).unwrap().map(|(_, v)| v))
}
fn get_tx_log_entry(&self, u: &Uuid) -> Result<Option<TxLogEntry>, Error> {
@ -251,7 +251,12 @@ where
}
fn tx_log_iter<'a>(&'a self) -> Box<dyn Iterator<Item = TxLogEntry> + 'a> {
Box::new(self.db.iter(&[TX_LOG_ENTRY_PREFIX]).unwrap())
Box::new(
self.db
.iter(&[TX_LOG_ENTRY_PREFIX])
.unwrap()
.map(|(_, v)| v),
)
}
fn get_private_context(&mut self, slate_id: &[u8]) -> Result<Context, Error> {
@ -272,7 +277,12 @@ where
}
fn acct_path_iter<'a>(&'a self) -> Box<dyn Iterator<Item = AcctPathMapping> + 'a> {
Box::new(self.db.iter(&[ACCOUNT_PATH_MAPPING_PREFIX]).unwrap())
Box::new(
self.db
.iter(&[ACCOUNT_PATH_MAPPING_PREFIX])
.unwrap()
.map(|(_, v)| v),
)
}
fn get_acct_path(&self, label: String) -> Result<Option<AcctPathMapping>, Error> {
@ -418,7 +428,8 @@ where
.as_ref()
.unwrap()
.iter(&[OUTPUT_PREFIX])
.unwrap(),
.unwrap()
.map(|(_, v)| v),
)
}
@ -456,7 +467,8 @@ where
.as_ref()
.unwrap()
.iter(&[TX_LOG_ENTRY_PREFIX])
.unwrap(),
.unwrap()
.map(|(_, v)| v),
)
}
@ -525,7 +537,8 @@ where
.as_ref()
.unwrap()
.iter(&[ACCOUNT_PATH_MAPPING_PREFIX])
.unwrap(),
.unwrap()
.map(|(_, v)| v),
)
}