From 405a4bc9855ec13edddc846122c737363c98d393 Mon Sep 17 00:00:00 2001 From: Ignotus Peverell Date: Tue, 5 Sep 2017 05:50:25 +0000 Subject: [PATCH] Prunable MMR storage (#112) * Base MMR storage structures Implementations of the MMR append-only file structure and its remove log. The append-only file is backed by a mmap for read access. The remove log is stored in memory for quick checking and backed by a simple file to persist it. * Add PMMR backend buffer, make PMMR Backend mutable * The Backend trait now has &mut self methods, and an &mut reference in PMMR. This simplifies the implementation of all backends by not forcing them to be interior mutable. Slight drawback is that a backend can't be used directly as long as it's used by a PMMR instance. * Introduced a buffer in the PMMR persistent backend to allow reads before the underlying files are fully flushed. Implemented with a temporary VecBackend. * Implement a prune list to use with dense backends The PruneList is useful when implementing compact backends for a PMMR (for example a single large byte array or a file). As nodes get pruned and removed from the backend to free space, the backend will get more compact but positions of a node within the PMMR will not match positions in the backend storage anymore. The PruneList accounts for that mismatch and does the position translation. * PMMR store compaction Implement actual pruning of the underlying PMMR storage by flushing the remove log. This triggers a rewrite of the PMMR nodes data (hashes and sums), removing pruned nodes. The information of what has been removed is kept in a prune list and the remove log is truncated. * PMMR store pruning tests and fixes --- core/src/core/pmmr.rs | 406 ++++++++++++++++++++++++++++----------- core/src/ser.rs | 30 ++- store/Cargo.toml | 7 + store/src/lib.rs | 25 ++- store/src/sumtree.rs | 423 +++++++++++++++++++++++++++++++++++++++++ store/tests/sumtree.rs | 199 +++++++++++++++++++ 6 files changed, 968 insertions(+), 122 deletions(-) create mode 100644 store/src/sumtree.rs create mode 100644 store/tests/sumtree.rs diff --git a/core/src/core/pmmr.rs b/core/src/core/pmmr.rs index 275390834..dfdbb7e00 100644 --- a/core/src/core/pmmr.rs +++ b/core/src/core/pmmr.rs @@ -38,7 +38,7 @@ use std::clone::Clone; use std::fmt::Debug; use std::marker::PhantomData; -use std::ops::{self}; +use std::ops::{self, Deref}; use core::hash::{Hash, Hashed}; use ser::{self, Readable, Reader, Writeable, Writer}; @@ -54,7 +54,7 @@ pub trait Summable { /// Length of the Sum type when serialized. Can be used as a hint by /// underlying storages. - fn sum_len(&self) -> usize; + fn sum_len() -> usize; } /// An empty sum that takes no space, to store elements that do not need summing @@ -87,7 +87,7 @@ impl Summable for NoSum { fn sum(&self) -> NullSum { NullSum } - fn sum_len(&self) -> usize { + fn sum_len() -> usize { return 0; } } @@ -104,8 +104,8 @@ pub struct HashSum where T: Summable { impl HashSum where T: Summable + Writeable { /// Create a hash sum from a summable - pub fn from_summable(idx: u64, elmt: T) -> HashSum { - let hash = Hashed::hash(&elmt); + pub fn from_summable(idx: u64, elmt: &T) -> HashSum { + let hash = Hashed::hash(elmt); let sum = elmt.sum(); let node_hash = (idx, &sum, hash).hash(); HashSum { @@ -144,12 +144,16 @@ impl ops::Add for HashSum where T: Summable { /// Storage backend for the MMR, just needs to be indexed by order of insertion. /// The remove operation can be a no-op for unoptimized backends. pub trait Backend where T: Summable { - /// Append the provided HashSums to the backend storage. - fn append(&self, data: Vec>); + /// Append the provided HashSums to the backend storage. The position of the + /// first element of the Vec in the MMR is provided to help the + /// implementation. + fn append(&mut self, position: u64, data: Vec>) -> Result<(), String>; + /// Get a HashSum by insertion position fn get(&self, position: u64) -> Option>; + /// Remove HashSums by insertion position - fn remove(&self, positions: Vec); + fn remove(&mut self, positions: Vec) -> Result<(), String>; } /// Prunable Merkle Mountain Range implementation. All positions within the tree @@ -159,17 +163,17 @@ pub trait Backend where T: Summable { /// Heavily relies on navigation operations within a binary tree. In particular, /// all the implementation needs to keep track of the MMR structure is how far /// we are in the sequence of nodes making up the MMR. -pub struct PMMR where T: Summable, B: Backend { +pub struct PMMR<'a, T, B> where T: Summable, B: 'a + Backend { last_pos: u64, - backend: B, + backend: &'a mut B, // only needed for parameterizing Backend summable: PhantomData, } -impl PMMR where T: Summable + Writeable + Debug + Clone, B: Backend { +impl<'a, T, B> PMMR<'a, T, B> where T: Summable + Writeable + Debug + Clone, B: 'a + Backend { /// Build a new prunable Merkle Mountain Range using the provided backend. - pub fn new(backend: B) -> PMMR { + pub fn new(backend: &'a mut B) -> PMMR { PMMR { last_pos: 0, backend: backend, @@ -177,6 +181,16 @@ impl PMMR where T: Summable + Writeable + Debug + Clone, B: Backend< } } + /// Build a new prunable Merkle Mountain Range pre-initlialized until last_pos + /// with the provided backend. + pub fn at(backend: &'a mut B, last_pos: u64) -> PMMR { + PMMR { + last_pos: last_pos, + backend: backend, + summable: PhantomData, + } + } + /// Computes the root of the MMR. Find all the peaks in the current /// tree and "bags" them to get a single peak. pub fn root(&self) -> HashSum { @@ -198,7 +212,7 @@ impl PMMR where T: Summable + Writeable + Debug + Clone, B: Backend< /// the same time if applicable. pub fn push(&mut self, elmt: T) -> u64 { let elmt_pos = self.last_pos + 1; - let mut current_hashsum = HashSum::from_summable(elmt_pos, elmt); + let mut current_hashsum = HashSum::from_summable(elmt_pos, &elmt); let mut to_append = vec![current_hashsum.clone()]; let mut height = 0; let mut pos = elmt_pos; @@ -219,7 +233,7 @@ impl PMMR where T: Summable + Writeable + Debug + Clone, B: Backend< } // append all the new nodes and update the MMR index - self.backend.append(to_append); + self.backend.append(elmt_pos, to_append); self.last_pos = pos; elmt_pos } @@ -228,7 +242,7 @@ impl PMMR where T: Summable + Writeable + Debug + Clone, B: Backend< /// provide that position and prune, consumers of this API are expected to /// keep an index of elements to positions in the tree. Prunes parent /// nodes as well when they become childless. - pub fn prune(&self, position: u64) { + pub fn prune(&mut self, position: u64) { let prunable_height = bintree_postorder_height(position); if prunable_height > 0 { // only leaves can be pruned @@ -240,21 +254,7 @@ impl PMMR where T: Summable + Writeable + Debug + Clone, B: Backend< let mut to_prune = vec![]; let mut current = position; while current+1 < self.last_pos { - let next_height = bintree_postorder_height(current+1); - - // compare the node's height to the next height, if the next is higher - // we're on the right hand side of the subtree (otherwise we're on the - // left) - let sibling: u64; - let parent: u64; - if next_height > prunable_height { - sibling = bintree_jump_left_sibling(current); - parent = current + 1; - } else { - sibling = bintree_jump_right_sibling(current); - parent = sibling + 1; - } - + let (parent, sibling) = family(current); if parent > self.last_pos { // can't prune when our parent isn't here yet break; @@ -280,6 +280,150 @@ impl PMMR where T: Summable + Writeable + Debug + Clone, B: Backend< } } +/// Simple MMR backend implementation based on a Vector. Pruning does not +/// compact the Vector itself but still frees the reference to the +/// underlying HashSum. +#[derive(Clone)] +pub struct VecBackend where T: Summable + Clone { + elems: Vec>>, +} + +impl Backend for VecBackend where T: Summable + Clone { + fn append(&mut self, position: u64, data: Vec>) -> Result<(), String> { + self.elems.append(&mut map_vec!(data, |d| Some(d.clone()))); + Ok(()) + } + fn get(&self, position: u64) -> Option> { + self.elems[(position-1) as usize].clone() + } + fn remove(&mut self, positions: Vec) -> Result<(), String> { + for n in positions { + self.elems[(n-1) as usize] = None + } + Ok(()) + } +} + +impl VecBackend where T: Summable + Clone { + /// Instantiates a new VecBackend + pub fn new() -> VecBackend { + VecBackend{elems: vec![]} + } + + /// Current number of HashSum elements in the underlying Vec. + pub fn used_size(&self) -> usize { + let mut usz = self.elems.len(); + for elem in self.elems.deref() { + if elem.is_none() { + usz -= 1; + } + } + usz + } + + /// Resets the backend, emptying the underlying Vec. + pub fn clear(&mut self) { + self.elems = Vec::new(); + } + + /// Total length of the underlying vector. + pub fn len(&self) -> usize { + self.elems.len() + } +} + +/// Maintains a list of previously pruned nodes in PMMR, compacting the list as +/// parents get pruned and allowing checking whether a leaf is pruned. Given +/// a node's position, computes how much it should get shifted given the +/// subtrees that have been pruned before. +/// +/// The PruneList is useful when implementing compact backends for a PMMR (for +/// example a single large byte array or a file). As nodes get pruned and +/// removed from the backend to free space, the backend will get more compact +/// but positions of a node within the PMMR will not match positions in the +/// backend storage anymore. The PruneList accounts for that mismatch and does +/// the position translation. +pub struct PruneList { + pub pruned_nodes: Vec, +} + +impl PruneList { + pub fn new() -> PruneList { + PruneList{pruned_nodes: vec![]} + } + + /// Computes by how many positions a node at pos should be shifted given the + /// number of nodes that have already been pruned before it. + pub fn get_shift(&self, pos: u64) -> Option { + // get the position where the node at pos would fit in the pruned list, if + // it's already pruned, nothing to skip + match self.pruned_pos(pos) { + None => None, + Some(idx) => { + // skip by the number of elements pruned in the preceding subtrees, + // which is the sum of the size of each subtree + Some( + self.pruned_nodes[0..(idx as usize)] + .iter() + .map(|n| (1 << (bintree_postorder_height(*n) + 1)) - 1) + .sum(), + ) + } + } + } + + /// Push the node at the provided position in the prune list. Compacts the + /// list if pruning the additional node means a parent can get pruned as + /// well. + pub fn add(&mut self, pos: u64) { + let mut current = pos; + loop { + let (parent, sibling) = family(current); + match self.pruned_nodes.binary_search(&sibling) { + Ok(idx) => { + self.pruned_nodes.remove(idx); + current = parent; + } + Err(_) => { + if let Err(idx) = self.pruned_nodes.binary_search(¤t) { + self.pruned_nodes.insert(idx, current); + } + break; + } + } + } + } + + /// Gets the position a new pruned node should take in the prune list. + /// If the node has already bee pruned, either directly or through one of + /// its parents contained in the prune list, returns None. + pub fn pruned_pos(&self, pos: u64) -> Option { + match self.pruned_nodes.binary_search(&pos) { + Ok(_) => None, + Err(idx) => { + if self.pruned_nodes.len() > idx { + // the node at pos can't be a child of lower position nodes by MMR + // construction but can be a child of the next node, going up parents + // from pos to make sure it's not the case + let next_peak_pos = self.pruned_nodes[idx]; + let mut cursor = pos; + loop { + let (parent, _) = family(cursor); + if next_peak_pos == parent { + return None; + } + if next_peak_pos < parent { + break; + } + cursor = parent; + } + } + Some(idx) + } + } + } +} + /// Gets the postorder traversal index of all peaks in a MMR given the last /// node's position. Starts with the top peak, which is always on the left /// side of the range, and navigates toward lower siblings toward the right @@ -310,7 +454,6 @@ fn peaks(num: u64) -> Vec { let mut peak = top; 'outer: loop { peak = bintree_jump_right_sibling(peak); - //println!("peak {}", peak); while peak > num { match bintree_move_down_left(peak) { Some(p) => peak = p, @@ -380,7 +523,7 @@ fn peaks(num: u64) -> Vec { /// To get the height of any node (say 1101), we need to travel left in the /// tree, get the leftmost node and count the ones. To travel left, we just /// need to subtract the position by it's most significant bit, mins one. For -/// example to get from 1101 to 110 we subtract it by (1000-1) (`13-(8-1)=6`). +/// example to get from 1101 to 110 we subtract it by (1000-1) (`13-(8-1)=5`). /// Then to to get 110 to 11, we subtract it by (100-1) ('6-(4-1)=3`). /// /// By applying this operation recursively, until we get a number that, in @@ -389,7 +532,7 @@ fn peaks(num: u64) -> Vec { /// nodes are added in a MMR. /// /// [1] https://github.com/opentimestamps/opentimestamps-server/blob/master/doc/merkle-mountain-range.md -fn bintree_postorder_height(num: u64) -> u64 { +pub fn bintree_postorder_height(num: u64) -> u64 { let mut h = num; while !all_ones(h) { h = bintree_jump_left(h); @@ -397,6 +540,24 @@ fn bintree_postorder_height(num: u64) -> u64 { most_significant_pos(h) - 1 } +/// Calculates the positions of the parent and sibling of the node at the +/// provided position. +pub fn family(pos: u64) -> (u64, u64) { + let sibling: u64; + let parent: u64; + + let pos_height = bintree_postorder_height(pos); + let next_height = bintree_postorder_height(pos+1); + if next_height > pos_height { + sibling = bintree_jump_left_sibling(pos); + parent = pos + 1; + } else { + sibling = bintree_jump_right_sibling(pos); + parent = sibling + 1; + } + (parent, sibling) +} + /// Calculates the position of the top-left child of a parent node in the /// postorder traversal of a full binary tree. fn bintree_move_down_left(num: u64) -> Option { @@ -407,7 +568,6 @@ fn bintree_move_down_left(num: u64) -> Option { Some(num - (1 << height)) } - /// Calculates the position of the right sibling of a node a subtree in the /// postorder traversal of a full binary tree. fn bintree_jump_right_sibling(num: u64) -> u64 { @@ -457,9 +617,6 @@ fn most_significant_pos(num: u64) -> u64 { mod test { use super::*; use core::hash::Hashed; - use std::sync::{Arc, Mutex}; - use std::ops::Deref; - #[test] fn some_all_ones() { @@ -520,8 +677,8 @@ mod test { self.0[0] as u64 * 0x1000 + self.0[1] as u64 * 0x100 + self.0[2] as u64 * 0x10 + self.0[3] as u64 } - fn sum_len(&self) -> usize { - 4 + fn sum_len() -> usize { + 8 } } @@ -534,39 +691,6 @@ mod test { } } - #[derive(Clone)] - struct VecBackend { - elems: Arc>>>>, - } - impl Backend for VecBackend { - fn append(&self, data: Vec>) { - let mut elems = self.elems.lock().unwrap(); - elems.append(&mut map_vec!(data, |d| Some(d.clone()))); - } - fn get(&self, position: u64) -> Option> { - let elems = self.elems.lock().unwrap(); - elems[(position-1) as usize].clone() - } - fn remove(&self, positions: Vec) { - let mut elems = self.elems.lock().unwrap(); - for n in positions { - elems[(n-1) as usize] = None - } - } - } - impl VecBackend { - fn used_size(&self) -> usize { - let elems = self.elems.lock().unwrap(); - let mut usz = elems.len(); - for elem in elems.deref() { - if elem.is_none() { - usz -= 1; - } - } - usz - } - } - #[test] fn pmmr_push_root() { let elems = [ @@ -581,8 +705,8 @@ mod test { TestElem([1, 0, 0, 0]), ]; - let ba = VecBackend{elems: Arc::new(Mutex::new(vec![]))}; - let mut pmmr = PMMR::new(ba.clone()); + let mut ba = VecBackend::new(); + let mut pmmr = PMMR::new(&mut ba); // one element pmmr.push(elems[0]); @@ -594,49 +718,49 @@ mod test { // two elements pmmr.push(elems[1]); - let sum2 = HashSum::from_summable(1, elems[0]) + HashSum::from_summable(2, elems[1]); + let sum2 = HashSum::from_summable(1, &elems[0]) + HashSum::from_summable(2, &elems[1]); assert_eq!(pmmr.root(), sum2); assert_eq!(pmmr.unpruned_size(), 3); // three elements pmmr.push(elems[2]); - let sum3 = sum2.clone() + HashSum::from_summable(4, elems[2]); + let sum3 = sum2.clone() + HashSum::from_summable(4, &elems[2]); assert_eq!(pmmr.root(), sum3); assert_eq!(pmmr.unpruned_size(), 4); // four elements pmmr.push(elems[3]); - let sum4 = sum2 + (HashSum::from_summable(4, elems[2]) + HashSum::from_summable(5, elems[3])); + let sum4 = sum2 + (HashSum::from_summable(4, &elems[2]) + HashSum::from_summable(5, &elems[3])); assert_eq!(pmmr.root(), sum4); assert_eq!(pmmr.unpruned_size(), 7); // five elements pmmr.push(elems[4]); - let sum5 = sum4.clone() + HashSum::from_summable(8, elems[4]); + let sum5 = sum4.clone() + HashSum::from_summable(8, &elems[4]); assert_eq!(pmmr.root(), sum5); assert_eq!(pmmr.unpruned_size(), 8); // six elements pmmr.push(elems[5]); - let sum6 = sum4.clone() + (HashSum::from_summable(8, elems[4]) + HashSum::from_summable(9, elems[5])); + let sum6 = sum4.clone() + (HashSum::from_summable(8, &elems[4]) + HashSum::from_summable(9, &elems[5])); assert_eq!(pmmr.root(), sum6.clone()); assert_eq!(pmmr.unpruned_size(), 10); // seven elements pmmr.push(elems[6]); - let sum7 = sum6 + HashSum::from_summable(11, elems[6]); + let sum7 = sum6 + HashSum::from_summable(11, &elems[6]); assert_eq!(pmmr.root(), sum7); assert_eq!(pmmr.unpruned_size(), 11); // eight elements pmmr.push(elems[7]); - let sum8 = sum4 + ((HashSum::from_summable(8, elems[4]) + HashSum::from_summable(9, elems[5])) + (HashSum::from_summable(11, elems[6]) + HashSum::from_summable(12, elems[7]))); + let sum8 = sum4 + ((HashSum::from_summable(8, &elems[4]) + HashSum::from_summable(9, &elems[5])) + (HashSum::from_summable(11, &elems[6]) + HashSum::from_summable(12, &elems[7]))); assert_eq!(pmmr.root(), sum8); assert_eq!(pmmr.unpruned_size(), 15); // nine elements pmmr.push(elems[8]); - let sum9 = sum8 + HashSum::from_summable(16, elems[8]); + let sum9 = sum8 + HashSum::from_summable(16, &elems[8]); assert_eq!(pmmr.root(), sum9); assert_eq!(pmmr.unpruned_size(), 16); } @@ -655,48 +779,112 @@ mod test { TestElem([1, 0, 0, 0]), ]; - let ba = VecBackend{elems: Arc::new(Mutex::new(vec![]))}; - let mut pmmr = PMMR::new(ba.clone()); - for elem in &elems[..] { - pmmr.push(*elem); + let orig_root: HashSum; + let sz: u64; + let mut ba = VecBackend::new(); + { + let mut pmmr = PMMR::new(&mut ba); + for elem in &elems[..] { + pmmr.push(*elem); + } + orig_root = pmmr.root(); + sz = pmmr.unpruned_size(); } - let orig_root = pmmr.root(); - let orig_sz = ba.used_size(); // pruning a leaf with no parent should do nothing - pmmr.prune(16); - assert_eq!(orig_root, pmmr.root()); - assert_eq!(ba.used_size(), orig_sz); + { + let mut pmmr = PMMR::at(&mut ba, sz); + pmmr.prune(16); + assert_eq!(orig_root, pmmr.root()); + } + assert_eq!(ba.used_size(), 16); // pruning leaves with no shared parent just removes 1 element - pmmr.prune(2); - assert_eq!(orig_root, pmmr.root()); - assert_eq!(ba.used_size(), orig_sz - 1); + { + let mut pmmr = PMMR::at(&mut ba, sz); + pmmr.prune(2); + assert_eq!(orig_root, pmmr.root()); + } + assert_eq!(ba.used_size(), 15); - pmmr.prune(4); - assert_eq!(orig_root, pmmr.root()); - assert_eq!(ba.used_size(), orig_sz - 2); + { + let mut pmmr = PMMR::at(&mut ba, sz); + pmmr.prune(4); + assert_eq!(orig_root, pmmr.root()); + } + assert_eq!(ba.used_size(), 14); // pruning a non-leaf node has no effect - pmmr.prune(3); - assert_eq!(orig_root, pmmr.root()); - assert_eq!(ba.used_size(), orig_sz - 2); + { + let mut pmmr = PMMR::at(&mut ba, sz); + pmmr.prune(3); + assert_eq!(orig_root, pmmr.root()); + } + assert_eq!(ba.used_size(), 14); // pruning sibling removes subtree - pmmr.prune(5); - assert_eq!(orig_root, pmmr.root()); - assert_eq!(ba.used_size(), orig_sz - 4); + { + let mut pmmr = PMMR::at(&mut ba, sz); + pmmr.prune(5); + assert_eq!(orig_root, pmmr.root()); + } + assert_eq!(ba.used_size(), 12); // pruning all leaves under level >1 removes all subtree - pmmr.prune(1); - assert_eq!(orig_root, pmmr.root()); - assert_eq!(ba.used_size(), orig_sz - 7); + { + let mut pmmr = PMMR::at(&mut ba, sz); + pmmr.prune(1); + assert_eq!(orig_root, pmmr.root()); + } + assert_eq!(ba.used_size(), 9); // pruning everything should only leave us the peaks - for n in 1..16 { - pmmr.prune(n); + { + let mut pmmr = PMMR::at(&mut ba, sz); + for n in 1..16 { + pmmr.prune(n); + } + assert_eq!(orig_root, pmmr.root()); } - assert_eq!(orig_root, pmmr.root()); assert_eq!(ba.used_size(), 2); } + + #[test] + fn pmmr_prune_list() { + let mut pl = PruneList::new(); + pl.add(4); + assert_eq!(pl.pruned_nodes.len(), 1); + assert_eq!(pl.pruned_nodes[0], 4); + assert_eq!(pl.get_shift(5), Some(1)); + assert_eq!(pl.get_shift(2), Some(0)); + assert_eq!(pl.get_shift(4), None); + + pl.add(5); + assert_eq!(pl.pruned_nodes.len(), 1); + assert_eq!(pl.pruned_nodes[0], 6); + assert_eq!(pl.get_shift(8), Some(3)); + assert_eq!(pl.get_shift(2), Some(0)); + assert_eq!(pl.get_shift(5), None); + + pl.add(2); + assert_eq!(pl.pruned_nodes.len(), 2); + assert_eq!(pl.pruned_nodes[0], 2); + assert_eq!(pl.get_shift(8), Some(4)); + assert_eq!(pl.get_shift(1), Some(0)); + + pl.add(8); + pl.add(11); + assert_eq!(pl.pruned_nodes.len(), 4); + + pl.add(1); + assert_eq!(pl.pruned_nodes.len(), 3); + assert_eq!(pl.pruned_nodes[0], 7); + assert_eq!(pl.get_shift(12), Some(9)); + + pl.add(12); + assert_eq!(pl.pruned_nodes.len(), 3); + assert_eq!(pl.get_shift(12), None); + assert_eq!(pl.get_shift(9), Some(8)); + assert_eq!(pl.get_shift(17), Some(11)); + } } diff --git a/core/src/ser.rs b/core/src/ser.rs index 8667b4590..5abc5c1ba 100644 --- a/core/src/ser.rs +++ b/core/src/ser.rs @@ -327,6 +327,30 @@ impl_int!(u32, write_u32, read_u32); impl_int!(u64, write_u64, read_u64); impl_int!(i64, write_i64, read_i64); +impl Readable for Vec where T: Readable { + fn read(reader: &mut Reader) -> Result, Error> { + let mut buf = Vec::new(); + loop { + let elem = T::read(reader); + match elem { + Ok(e) => buf.push(e), + Err(Error::IOErr(ref ioerr)) if ioerr.kind() == io::ErrorKind::UnexpectedEof => break, + Err(e) => return Err(e), + } + } + Ok(buf) + } +} + +impl Writeable for Vec where T: Writeable { + fn write(&self, writer: &mut W) -> Result<(), Error> { + for elmt in self { + elmt.write(writer)?; + } + Ok(()) + } +} + impl<'a, A: Writeable> Writeable for &'a A { fn write(&self, writer: &mut W) -> Result<(), Error> { Writeable::write(*self, writer) @@ -386,12 +410,6 @@ impl Writeable for [u8; 4] { } } -impl Writeable for Vec { - fn write(&self, writer: &mut W) -> Result<(), Error> { - writer.write_fixed_bytes(self) - } -} - /// Useful marker trait on types that can be sized byte slices pub trait AsFixedBytes: Sized + AsRef<[u8]> { /// The length in bytes diff --git a/store/Cargo.toml b/store/Cargo.toml index 2fafe3239..7865182e1 100644 --- a/store/Cargo.toml +++ b/store/Cargo.toml @@ -6,6 +6,13 @@ workspace = ".." [dependencies] byteorder = "^0.5" +env_logger="^0.3.5" +log = "^0.3" +memmap = { git = "https://github.com/danburkert/memmap-rs" } rocksdb = "^0.7.0" grin_core = { path = "../core" } + +[dev-dependencies] +env_logger="^0.3.5" +time = "^0.1" diff --git a/store/src/lib.rs b/store/src/lib.rs index 819b62796..00f561747 100644 --- a/store/src/lib.rs +++ b/store/src/lib.rs @@ -22,8 +22,14 @@ extern crate byteorder; extern crate grin_core as core; +#[macro_use] +extern crate log; +extern crate env_logger; +extern crate memmap; extern crate rocksdb; +pub mod sumtree; + const SEP: u8 = ':' as u8; use std::fmt; @@ -103,7 +109,9 @@ impl Store { /// Gets a value from the db, provided its key pub fn get(&self, key: &[u8]) -> Result>, Error> { let db = self.rdb.read().unwrap(); - db.get(key).map(|r| r.map(|o| o.to_vec())).map_err(From::from) + db.get(key).map(|r| r.map(|o| o.to_vec())).map_err( + From::from, + ) } /// Gets a `Readable` value from the db, provided its key. Encapsulates @@ -115,10 +123,11 @@ impl Store { /// Gets a `Readable` value from the db, provided its key, allowing to /// extract only partial data. The underlying Readable size must align /// accordingly. Encapsulates serialization. - pub fn get_ser_limited(&self, - key: &[u8], - len: usize) - -> Result, Error> { + pub fn get_ser_limited( + &self, + key: &[u8], + len: usize, + ) -> Result, Error> { let data = try!(self.get(key)); match data { Some(val) => { @@ -203,14 +212,16 @@ impl<'a> Batch<'a> { /// An iterator thad produces Readable instances back. Wraps the lower level /// DBIterator and deserializes the returned values. pub struct SerIterator - where T: ser::Readable +where + T: ser::Readable, { iter: DBIterator, _marker: PhantomData, } impl Iterator for SerIterator - where T: ser::Readable +where + T: ser::Readable, { type Item = T; diff --git a/store/src/sumtree.rs b/store/src/sumtree.rs new file mode 100644 index 000000000..d88482b60 --- /dev/null +++ b/store/src/sumtree.rs @@ -0,0 +1,423 @@ +// Copyright 2017 The Grin Developers +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Implementation of the persistent Backend for the prunable MMR sum-tree. + +use memmap; + +use std::cmp; +use std::fs::{self, File, OpenOptions}; +use std::io::{self, Write, BufReader, BufRead, ErrorKind}; +use std::path::Path; +use std::io::Read; + +use core::core::pmmr::{self, Summable, Backend, HashSum, VecBackend}; +use core::ser; + +const PMMR_DATA_FILE: &'static str = "pmmr_dat.bin"; +const PMMR_RM_LOG_FILE: &'static str = "pmmr_rm_log.bin"; +const PMMR_PRUNED_FILE: &'static str = "pmmr_pruned.bin"; + +/// Maximum number of nodes in the remove log before it gets flushed +pub const RM_LOG_MAX_NODES: usize = 10000; + +/// Wrapper for a file that can be read at any position (random read) but for +/// which writes are append only. Reads are backed by a memory map (mmap(2)), +/// relying on the operating system for fast access and caching. The memory +/// map is reallocated to expand it when new writes are flushed. +struct AppendOnlyFile { + path: String, + file: File, + mmap: Option, +} + +impl AppendOnlyFile { + /// Open a file (existing or not) as append-only, backed by a mmap. + fn open(path: String) -> io::Result { + let file = OpenOptions::new() + .read(true) + .append(true) + .create(true) + .open(path.clone())?; + let mut aof = AppendOnlyFile { + path: path.clone(), + file: file, + mmap: None, + }; + let file_path = Path::new(&path); + if file_path.exists() { + aof.sync(); + } + Ok(aof) + } + + /// Append data to the file. + fn append(&mut self, buf: &[u8]) -> io::Result<()> { + self.file.write_all(buf) + } + + /// Syncs all writes (fsync), reallocating the memory map to make the newly + /// written data accessible. + fn sync(&mut self) -> io::Result<()> { + self.file.sync_data()?; + self.mmap = Some(unsafe { + memmap::file(&self.file) + .protection(memmap::Protection::Read) + .map()? + }); + Ok(()) + } + + /// Read length bytes of data at offset from the file. Leverages the memory + /// map. + fn read(&self, offset: usize, length: usize) -> Vec { + if let None = self.mmap { + return vec![]; + } + let mmap = self.mmap.as_ref().unwrap(); + (&mmap[offset..(offset + length)]).to_vec() + } + + /// Saves a copy of the current file content, skipping data at the provided + /// prune indices. The prune Vec must be ordered. + fn save_prune(&self, target: String, prune_offs: Vec, prune_len: u64) -> io::Result<()> { + let mut reader = File::open(self.path.clone())?; + let mut writer = File::create(target)?; + + // align the buffer on prune_len to avoid misalignments + let mut buf = vec![0; (prune_len * 256) as usize]; + let mut read = 0; + let mut prune_pos = 0; + loop { + // fill our buffer + let len = match reader.read(&mut buf) { + Ok(0) => return Ok(()), + Ok(len) => len, + Err(ref e) if e.kind() == ErrorKind::Interrupted => continue, + Err(e) => return Err(e), + } as u64; + + // write the buffer, except if we prune offsets in the current span, + // in which case we skip + let mut buf_start = 0; + while prune_offs[prune_pos] >= read && prune_offs[prune_pos] < read + len { + let prune_at = prune_offs[prune_pos] as usize; + if prune_at != buf_start { + writer.write_all(&buf[buf_start..prune_at])?; + } + buf_start = prune_at + (prune_len as usize); + if prune_offs.len() > prune_pos + 1 { + prune_pos += 1; + } else { + break; + } + } + writer.write_all(&mut buf[buf_start..(len as usize)])?; + read += len; + } + } + + /// Current size of the file in bytes. + fn size(&self) -> io::Result { + fs::metadata(&self.path).map(|md| md.len()) + } +} + +/// Log file fully cached in memory containing all positions that should be +/// eventually removed from the MMR append-only data file. Allows quick +/// checking of whether a piece of data has been marked for deletion. When the +/// log becomes too long, the MMR backend will actually remove chunks from the +/// MMR data file and truncate the remove log. +struct RemoveLog { + path: String, + file: File, + // Ordered vector of MMR positions that should get eventually removed. + removed: Vec, +} + +impl RemoveLog { + /// Open the remove log file. The content of the file will be read in memory + /// for fast checking. + fn open(path: String) -> io::Result { + let removed = read_ordered_vec(path.clone())?; + let file = OpenOptions::new().append(true).create(true).open(path.clone())?; + Ok(RemoveLog { + path: path, + file: file, + removed: removed, + }) + } + + /// Truncate and empties the remove log. + fn truncate(&mut self) -> io::Result<()> { + self.removed = vec![]; + self.file = File::create(self.path.clone())?; + Ok(()) + } + + /// Append a set of new positions to the remove log. Both adds those + /// positions + /// to the ordered in-memory set and to the file. + fn append(&mut self, elmts: Vec) -> io::Result<()> { + for elmt in elmts { + match self.removed.binary_search(&elmt) { + Ok(_) => continue, + Err(idx) => { + self.file.write_all(&ser::ser_vec(&elmt).unwrap()[..])?; + self.removed.insert(idx, elmt); + } + } + } + self.file.sync_data() + } + + /// Whether the remove log currently includes the provided position. + fn includes(&self, elmt: u64) -> bool { + self.removed.binary_search(&elmt).is_ok() + } + + /// Number of positions stored in the remove log. + fn len(&self) -> usize { + self.removed.len() + } +} + +/// PMMR persistent backend implementation. Relies on multiple facilities to +/// handle writing, reading and pruning. +/// +/// * A main storage file appends HashSum instances as they come. This +/// AppendOnlyFile is also backed by a mmap for reads. +/// * An in-memory backend buffers the latest batch of writes to ensure the +/// PMMR can always read recent values even if they haven't been flushed to +/// disk yet. +/// * A remove log tracks the positions that need to be pruned from the +/// main storage file. +pub struct PMMRBackend +where + T: Summable + Clone, +{ + data_dir: String, + hashsum_file: AppendOnlyFile, + remove_log: RemoveLog, + pruned_nodes: pmmr::PruneList, + // buffers addition of new elements until they're fully written to disk + buffer: VecBackend, + buffer_index: usize, +} + +impl Backend for PMMRBackend +where + T: Summable + Clone, +{ + /// Append the provided HashSums to the backend storage. + #[allow(unused_variables)] + fn append(&mut self, position: u64, data: Vec>) -> Result<(), String> { + self.buffer.append( + position - (self.buffer_index as u64), + data.clone(), + )?; + for hs in data { + if let Err(e) = self.hashsum_file.append(&ser::ser_vec(&hs).unwrap()[..]) { + return Err(format!( + "Could not write to log storage, disk full? {:?}", + e + )); + } + } + Ok(()) + } + + /// Get a HashSum by insertion position + fn get(&self, position: u64) -> Option> { + // First, check if it's in our temporary write buffer + let pos_sz = position as usize; + if pos_sz - 1 >= self.buffer_index && pos_sz - 1 < self.buffer_index + self.buffer.len() { + return self.buffer.get((pos_sz - self.buffer_index) as u64); + } + + // Second, check if this position has been pruned in the remove log + if self.remove_log.includes(position) { + return None; + } + + // Third, check if it's in the pruned list or its offset + let shift = self.pruned_nodes.get_shift(position); + if let None = shift { + return None + } + + // The MMR starts at 1, our binary backend starts at 0 + let pos = position - 1; + + // Must be on disk, doing a read at the correct position + let record_len = 32 + T::sum_len(); + let file_offset = ((pos - shift.unwrap()) as usize) * record_len; + let data = self.hashsum_file.read(file_offset, record_len); + match ser::deserialize(&mut &data[..]) { + Ok(hashsum) => Some(hashsum), + Err(e) => { + error!( + "Corrupted storage, could not read an entry from sum tree store: {:?}", + e + ); + None + } + } + } + + /// Remove HashSums by insertion position + fn remove(&mut self, positions: Vec) -> Result<(), String> { + if self.buffer.used_size() > 0 { + self.buffer.remove(positions.clone()).unwrap(); + } + self.remove_log.append(positions).map_err(|e| { + format!("Could not write to log storage, disk full? {:?}", e) + }) + } +} + +impl PMMRBackend +where + T: Summable + Clone, +{ + /// Instantiates a new PMMR backend that will use the provided directly to + /// store its files. + pub fn new(data_dir: String) -> io::Result> { + let hs_file = AppendOnlyFile::open(format!("{}/{}", data_dir, PMMR_DATA_FILE))?; + let sz = hs_file.size()?; + let record_len = 32 + T::sum_len(); + let rm_log = RemoveLog::open(format!("{}/{}", data_dir, PMMR_RM_LOG_FILE))?; + let prune_list = read_ordered_vec(format!("{}/{}", data_dir, PMMR_PRUNED_FILE))?; + + Ok(PMMRBackend { + data_dir: data_dir, + hashsum_file: hs_file, + remove_log: rm_log, + buffer: VecBackend::new(), + buffer_index: (sz as usize) / record_len, + pruned_nodes: pmmr::PruneList{pruned_nodes: prune_list}, + }) + } + + /// Syncs all files to disk. A call to sync is required to ensure all the + /// data has been successfully written to disk. + pub fn sync(&mut self) -> io::Result<()> { + self.buffer_index = self.buffer_index + self.buffer.len(); + self.buffer.clear(); + + self.hashsum_file.sync() + } + + /// Checks the length of the remove log to see if it should get compacted. + /// If so, the remove log is flushed into the pruned list, which itself gets + /// saved, and the main hashsum data file is rewritten, cutting the removed + /// data. + /// + /// If a max_len strictly greater than 0 is provided, the value will be used + /// to decide whether the remove log has reached its maximum length, + /// otherwise the RM_LOG_MAX_NODES default value is used. + pub fn check_compact(&mut self, max_len: usize) -> io::Result<()> { + if !(max_len > 0 && self.remove_log.len() > max_len || + max_len == 0 && self.remove_log.len() > RM_LOG_MAX_NODES) { + return Ok(()) + } + + // 0. validate none of the nodes in the rm log are in the prune list (to + // avoid accidental double compaction) + for pos in &self.remove_log.removed[..] { + if let None = self.pruned_nodes.pruned_pos(*pos) { + // TODO we likely can recover from this by directly jumping to 3 + error!("The remove log contains nodes that are already in the pruned \ + list, a previous compaction likely failed."); + return Ok(()); + } + } + + // 1. save hashsum file to a compact copy, skipping data that's in the + // remove list + let tmp_prune_file = format!("{}/{}.prune", self.data_dir, PMMR_DATA_FILE); + let record_len = (32 + T::sum_len()) as u64; + let to_rm = self.remove_log.removed.iter().map(|pos| { + let shift = self.pruned_nodes.get_shift(*pos); + (*pos - 1 - shift.unwrap()) * record_len + }).collect(); + self.hashsum_file.save_prune(tmp_prune_file.clone(), to_rm, record_len)?; + + // 2. update the prune list and save it in place + for rm_pos in &self.remove_log.removed[..] { + self.pruned_nodes.add(*rm_pos); + } + write_vec(format!("{}/{}", self.data_dir, PMMR_PRUNED_FILE), &self.pruned_nodes.pruned_nodes)?; + + // 3. move the compact copy to the hashsum file and re-open it + fs::rename(tmp_prune_file.clone(), format!("{}/{}", self.data_dir, PMMR_DATA_FILE))?; + self.hashsum_file = AppendOnlyFile::open(format!("{}/{}", self.data_dir, PMMR_DATA_FILE))?; + self.hashsum_file.sync()?; + + // 4. truncate the rm log + self.remove_log.truncate()?; + + Ok(()) + } +} + +// Read an ordered vector of scalars from a file. +fn read_ordered_vec(path: String) -> io::Result> + where T: ser::Readable + cmp::Ord { + + let file_path = Path::new(&path); + let mut ovec = Vec::with_capacity(1000); + if file_path.exists() { + let mut file = BufReader::with_capacity(8 * 1000, File::open(path.clone())?); + loop { + // need a block to end mutable borrow before consume + let buf_len = { + let buf = file.fill_buf()?; + if buf.len() == 0 { + break; + } + let elmts_res: Result, ser::Error> = ser::deserialize(&mut &buf[..]); + match elmts_res { + Ok(elmts) => { + for elmt in elmts { + if let Err(idx) = ovec.binary_search(&elmt) { + ovec.insert(idx, elmt); + } + } + } + Err(_) => { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("Corrupted storage, could not read file at {}", path), + )); + } + } + buf.len() + }; + file.consume(buf_len); + } + } + Ok(ovec) +} + +fn write_vec(path: String, v: &Vec) -> io::Result<()> + where T: ser::Writeable { + + let mut file_path = File::create(&path)?; + ser::serialize(&mut file_path, v).map_err(|_| { + io::Error::new( + io::ErrorKind::InvalidInput, + format!("Failed to serialize data when writing to {}", path)) + })?; + Ok(()) +} diff --git a/store/tests/sumtree.rs b/store/tests/sumtree.rs new file mode 100644 index 000000000..fd4ccc9a7 --- /dev/null +++ b/store/tests/sumtree.rs @@ -0,0 +1,199 @@ +// Copyright 2017 The Grin Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +extern crate env_logger; +extern crate grin_core as core; +extern crate grin_store as store; +extern crate time; + +use std::fs; + +use core::ser::*; +use core::core::pmmr::{PMMR, Summable, HashSum, Backend}; +use core::core::hash::Hashed; + +#[test] +fn sumtree_append() { + let (data_dir, elems) = setup(); + let mut backend = store::sumtree::PMMRBackend::new(data_dir).unwrap(); + + // adding first set of 4 elements and sync + let mut mmr_size = load(0, &elems[0..4], &mut backend); + backend.sync().unwrap(); + + // adding the rest and sync again + mmr_size = load(mmr_size, &elems[4..9], &mut backend); + backend.sync().unwrap(); + + // check the resulting backend store and the computation of the root + let hash = Hashed::hash(&elems[0].clone()); + let sum = elems[0].sum(); + let node_hash = (1 as u64, &sum, hash).hash(); + assert_eq!( + backend.get(1), + Some(HashSum { + hash: node_hash, + sum: sum, + }) + ); + + let sum2 = HashSum::from_summable(1, &elems[0]) + HashSum::from_summable(2, &elems[1]); + let sum4 = sum2 + (HashSum::from_summable(4, &elems[2]) + HashSum::from_summable(5, &elems[3])); + let sum8 = sum4 + + ((HashSum::from_summable(8, &elems[4]) + HashSum::from_summable(9, &elems[5])) + + (HashSum::from_summable(11, &elems[6]) + HashSum::from_summable(12, &elems[7]))); + let sum9 = sum8 + HashSum::from_summable(16, &elems[8]); + + { + let pmmr = PMMR::at(&mut backend, mmr_size); + assert_eq!(pmmr.root(), sum9); + } +} + +#[test] +fn sumtree_prune_compact() { + let (data_dir, elems) = setup(); + + // setup the mmr store with all elements + let mut backend = store::sumtree::PMMRBackend::new(data_dir).unwrap(); + let mmr_size = load(0, &elems[..], &mut backend); + backend.sync().unwrap(); + + // save the root + let root: HashSum; + { + let pmmr = PMMR::at(&mut backend, mmr_size); + root = pmmr.root(); + } + + // pruning some choice nodes + { + let mut pmmr = PMMR::at(&mut backend, mmr_size); + pmmr.prune(1); + pmmr.prune(4); + pmmr.prune(5); + } + backend.sync().unwrap(); + + // check the root + { + let pmmr = PMMR::at(&mut backend, mmr_size); + assert_eq!(root, pmmr.root()); + } + + // compact + backend.check_compact(2).unwrap(); + + // recheck the root + { + let pmmr = PMMR::at(&mut backend, mmr_size); + assert_eq!(root, pmmr.root()); + } +} + +#[test] +fn sumtree_reload() { + let (data_dir, elems) = setup(); + + // set everything up with a first backend + let mmr_size: u64; + let root: HashSum; + { + let mut backend = store::sumtree::PMMRBackend::new(data_dir.clone()).unwrap(); + mmr_size = load(0, &elems[..], &mut backend); + backend.sync().unwrap(); + + // save the root and prune some nodes so we have prune data + { + let mut pmmr = PMMR::at(&mut backend, mmr_size); + root = pmmr.root(); + pmmr.prune(1); + pmmr.prune(4); + } + backend.sync().unwrap(); + backend.check_compact(1).unwrap(); + backend.sync().unwrap(); + + // prune some more to get rm log data + { + let mut pmmr = PMMR::at(&mut backend, mmr_size); + pmmr.prune(5); + } + backend.sync().unwrap(); + } + + // create a new backend and check everything is kosher + { + let mut backend = store::sumtree::PMMRBackend::new(data_dir).unwrap(); + { + let pmmr = PMMR::at(&mut backend, mmr_size); + assert_eq!(root, pmmr.root()); + } + assert_eq!(backend.get(5), None); + } +} + +fn setup() -> (String, Vec) { + let _ = env_logger::init(); + let t = time::get_time(); + let data_dir = format!("./target/{}.{}", t.sec, t.nsec); + fs::create_dir_all(data_dir.clone()).unwrap(); + + let elems = vec![ + TestElem([0, 0, 0, 1]), + TestElem([0, 0, 0, 2]), + TestElem([0, 0, 0, 3]), + TestElem([0, 0, 0, 4]), + TestElem([0, 0, 0, 5]), + TestElem([0, 0, 0, 6]), + TestElem([0, 0, 0, 7]), + TestElem([0, 0, 0, 8]), + TestElem([1, 0, 0, 0]), + ]; + (data_dir, elems) +} + +fn load(pos: u64, elems: &[TestElem], + backend: &mut store::sumtree::PMMRBackend) -> u64 { + + let mut pmmr = PMMR::at(backend, pos); + for elem in elems { + pmmr.push(elem.clone()); + } + pmmr.unpruned_size() +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +struct TestElem([u32; 4]); +impl Summable for TestElem { + type Sum = u64; + fn sum(&self) -> u64 { + // sums are not allowed to overflow, so we use this simple + // non-injective "sum" function that will still be homomorphic + self.0[0] as u64 * 0x1000 + self.0[1] as u64 * 0x100 + self.0[2] as u64 * 0x10 + + self.0[3] as u64 + } + fn sum_len() -> usize { + 8 + } +} + +impl Writeable for TestElem { + fn write(&self, writer: &mut W) -> Result<(), Error> { + try!(writer.write_u32(self.0[0])); + try!(writer.write_u32(self.0[1])); + try!(writer.write_u32(self.0[2])); + writer.write_u32(self.0[3]) + } +}