Prunable MMR storage (#112)

* Base MMR storage structures

Implementations of the MMR append-only file structure and its
remove log. The append-only file is backed by a mmap for read
access. The remove log is stored in memory for quick checking
and backed by a simple file to persist it.
* Add PMMR backend buffer, make PMMR Backend mutable
* The Backend trait now has &mut self methods, and an &mut
reference in PMMR. This simplifies the implementation of all
backends by not forcing them to be interior mutable. Slight
drawback is that a backend can't be used directly as long as it's
used by a PMMR instance.
* Introduced a buffer in the PMMR persistent backend to allow
reads before the underlying files are fully flushed. Implemented
with a temporary VecBackend.
* Implement a prune list to use with dense backends
The PruneList is useful when implementing compact backends for a PMMR (for
example a single large byte array or a file). As nodes get pruned and
removed from the backend to free space, the backend will get more compact
but positions of a node within the PMMR will not match positions in the
backend storage anymore. The PruneList accounts for that mismatch and does
the position translation.
* PMMR store compaction
Implement actual pruning of the underlying PMMR storage by
flushing the remove log. This triggers a rewrite of the PMMR nodes
data (hashes and sums), removing pruned nodes. The information of
what has been removed is kept in a prune list and the remove log
is truncated.
* PMMR store pruning tests and fixes
This commit is contained in:
Ignotus Peverell 2017-09-05 05:50:25 +00:00 committed by GitHub
parent 301e9a6e98
commit 405a4bc985
6 changed files with 968 additions and 122 deletions

View file

@ -38,7 +38,7 @@
use std::clone::Clone;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::ops::{self};
use std::ops::{self, Deref};
use core::hash::{Hash, Hashed};
use ser::{self, Readable, Reader, Writeable, Writer};
@ -54,7 +54,7 @@ pub trait Summable {
/// Length of the Sum type when serialized. Can be used as a hint by
/// underlying storages.
fn sum_len(&self) -> usize;
fn sum_len() -> usize;
}
/// An empty sum that takes no space, to store elements that do not need summing
@ -87,7 +87,7 @@ impl<T> Summable for NoSum<T> {
fn sum(&self) -> NullSum {
NullSum
}
fn sum_len(&self) -> usize {
fn sum_len() -> usize {
return 0;
}
}
@ -104,8 +104,8 @@ pub struct HashSum<T> where T: Summable {
impl<T> HashSum<T> where T: Summable + Writeable {
/// Create a hash sum from a summable
pub fn from_summable(idx: u64, elmt: T) -> HashSum<T> {
let hash = Hashed::hash(&elmt);
pub fn from_summable(idx: u64, elmt: &T) -> HashSum<T> {
let hash = Hashed::hash(elmt);
let sum = elmt.sum();
let node_hash = (idx, &sum, hash).hash();
HashSum {
@ -144,12 +144,16 @@ impl<T> ops::Add for HashSum<T> where T: Summable {
/// Storage backend for the MMR, just needs to be indexed by order of insertion.
/// The remove operation can be a no-op for unoptimized backends.
pub trait Backend<T> where T: Summable {
/// Append the provided HashSums to the backend storage.
fn append(&self, data: Vec<HashSum<T>>);
/// Append the provided HashSums to the backend storage. The position of the
/// first element of the Vec in the MMR is provided to help the
/// implementation.
fn append(&mut self, position: u64, data: Vec<HashSum<T>>) -> Result<(), String>;
/// Get a HashSum by insertion position
fn get(&self, position: u64) -> Option<HashSum<T>>;
/// Remove HashSums by insertion position
fn remove(&self, positions: Vec<u64>);
fn remove(&mut self, positions: Vec<u64>) -> Result<(), String>;
}
/// Prunable Merkle Mountain Range implementation. All positions within the tree
@ -159,17 +163,17 @@ pub trait Backend<T> where T: Summable {
/// Heavily relies on navigation operations within a binary tree. In particular,
/// all the implementation needs to keep track of the MMR structure is how far
/// we are in the sequence of nodes making up the MMR.
pub struct PMMR<T, B> where T: Summable, B: Backend<T> {
pub struct PMMR<'a, T, B> where T: Summable, B: 'a + Backend<T> {
last_pos: u64,
backend: B,
backend: &'a mut B,
// only needed for parameterizing Backend
summable: PhantomData<T>,
}
impl<T, B> PMMR<T, B> where T: Summable + Writeable + Debug + Clone, B: Backend<T> {
impl<'a, T, B> PMMR<'a, T, B> where T: Summable + Writeable + Debug + Clone, B: 'a + Backend<T> {
/// Build a new prunable Merkle Mountain Range using the provided backend.
pub fn new(backend: B) -> PMMR<T, B> {
pub fn new(backend: &'a mut B) -> PMMR<T, B> {
PMMR {
last_pos: 0,
backend: backend,
@ -177,6 +181,16 @@ impl<T, B> PMMR<T, B> where T: Summable + Writeable + Debug + Clone, B: Backend<
}
}
/// Build a new prunable Merkle Mountain Range pre-initlialized until last_pos
/// with the provided backend.
pub fn at(backend: &'a mut B, last_pos: u64) -> PMMR<T, B> {
PMMR {
last_pos: last_pos,
backend: backend,
summable: PhantomData,
}
}
/// Computes the root of the MMR. Find all the peaks in the current
/// tree and "bags" them to get a single peak.
pub fn root(&self) -> HashSum<T> {
@ -198,7 +212,7 @@ impl<T, B> PMMR<T, B> where T: Summable + Writeable + Debug + Clone, B: Backend<
/// the same time if applicable.
pub fn push(&mut self, elmt: T) -> u64 {
let elmt_pos = self.last_pos + 1;
let mut current_hashsum = HashSum::from_summable(elmt_pos, elmt);
let mut current_hashsum = HashSum::from_summable(elmt_pos, &elmt);
let mut to_append = vec![current_hashsum.clone()];
let mut height = 0;
let mut pos = elmt_pos;
@ -219,7 +233,7 @@ impl<T, B> PMMR<T, B> where T: Summable + Writeable + Debug + Clone, B: Backend<
}
// append all the new nodes and update the MMR index
self.backend.append(to_append);
self.backend.append(elmt_pos, to_append);
self.last_pos = pos;
elmt_pos
}
@ -228,7 +242,7 @@ impl<T, B> PMMR<T, B> where T: Summable + Writeable + Debug + Clone, B: Backend<
/// provide that position and prune, consumers of this API are expected to
/// keep an index of elements to positions in the tree. Prunes parent
/// nodes as well when they become childless.
pub fn prune(&self, position: u64) {
pub fn prune(&mut self, position: u64) {
let prunable_height = bintree_postorder_height(position);
if prunable_height > 0 {
// only leaves can be pruned
@ -240,21 +254,7 @@ impl<T, B> PMMR<T, B> where T: Summable + Writeable + Debug + Clone, B: Backend<
let mut to_prune = vec![];
let mut current = position;
while current+1 < self.last_pos {
let next_height = bintree_postorder_height(current+1);
// compare the node's height to the next height, if the next is higher
// we're on the right hand side of the subtree (otherwise we're on the
// left)
let sibling: u64;
let parent: u64;
if next_height > prunable_height {
sibling = bintree_jump_left_sibling(current);
parent = current + 1;
} else {
sibling = bintree_jump_right_sibling(current);
parent = sibling + 1;
}
let (parent, sibling) = family(current);
if parent > self.last_pos {
// can't prune when our parent isn't here yet
break;
@ -280,6 +280,150 @@ impl<T, B> PMMR<T, B> where T: Summable + Writeable + Debug + Clone, B: Backend<
}
}
/// Simple MMR backend implementation based on a Vector. Pruning does not
/// compact the Vector itself but still frees the reference to the
/// underlying HashSum.
#[derive(Clone)]
pub struct VecBackend<T> where T: Summable + Clone {
elems: Vec<Option<HashSum<T>>>,
}
impl<T> Backend<T> for VecBackend<T> where T: Summable + Clone {
fn append(&mut self, position: u64, data: Vec<HashSum<T>>) -> Result<(), String> {
self.elems.append(&mut map_vec!(data, |d| Some(d.clone())));
Ok(())
}
fn get(&self, position: u64) -> Option<HashSum<T>> {
self.elems[(position-1) as usize].clone()
}
fn remove(&mut self, positions: Vec<u64>) -> Result<(), String> {
for n in positions {
self.elems[(n-1) as usize] = None
}
Ok(())
}
}
impl<T> VecBackend<T> where T: Summable + Clone {
/// Instantiates a new VecBackend<T>
pub fn new() -> VecBackend<T> {
VecBackend{elems: vec![]}
}
/// Current number of HashSum elements in the underlying Vec.
pub fn used_size(&self) -> usize {
let mut usz = self.elems.len();
for elem in self.elems.deref() {
if elem.is_none() {
usz -= 1;
}
}
usz
}
/// Resets the backend, emptying the underlying Vec.
pub fn clear(&mut self) {
self.elems = Vec::new();
}
/// Total length of the underlying vector.
pub fn len(&self) -> usize {
self.elems.len()
}
}
/// Maintains a list of previously pruned nodes in PMMR, compacting the list as
/// parents get pruned and allowing checking whether a leaf is pruned. Given
/// a node's position, computes how much it should get shifted given the
/// subtrees that have been pruned before.
///
/// The PruneList is useful when implementing compact backends for a PMMR (for
/// example a single large byte array or a file). As nodes get pruned and
/// removed from the backend to free space, the backend will get more compact
/// but positions of a node within the PMMR will not match positions in the
/// backend storage anymore. The PruneList accounts for that mismatch and does
/// the position translation.
pub struct PruneList {
pub pruned_nodes: Vec<u64>,
}
impl PruneList {
pub fn new() -> PruneList {
PruneList{pruned_nodes: vec![]}
}
/// Computes by how many positions a node at pos should be shifted given the
/// number of nodes that have already been pruned before it.
pub fn get_shift(&self, pos: u64) -> Option<u64> {
// get the position where the node at pos would fit in the pruned list, if
// it's already pruned, nothing to skip
match self.pruned_pos(pos) {
None => None,
Some(idx) => {
// skip by the number of elements pruned in the preceding subtrees,
// which is the sum of the size of each subtree
Some(
self.pruned_nodes[0..(idx as usize)]
.iter()
.map(|n| (1 << (bintree_postorder_height(*n) + 1)) - 1)
.sum(),
)
}
}
}
/// Push the node at the provided position in the prune list. Compacts the
/// list if pruning the additional node means a parent can get pruned as
/// well.
pub fn add(&mut self, pos: u64) {
let mut current = pos;
loop {
let (parent, sibling) = family(current);
match self.pruned_nodes.binary_search(&sibling) {
Ok(idx) => {
self.pruned_nodes.remove(idx);
current = parent;
}
Err(_) => {
if let Err(idx) = self.pruned_nodes.binary_search(&current) {
self.pruned_nodes.insert(idx, current);
}
break;
}
}
}
}
/// Gets the position a new pruned node should take in the prune list.
/// If the node has already bee pruned, either directly or through one of
/// its parents contained in the prune list, returns None.
pub fn pruned_pos(&self, pos: u64) -> Option<usize> {
match self.pruned_nodes.binary_search(&pos) {
Ok(_) => None,
Err(idx) => {
if self.pruned_nodes.len() > idx {
// the node at pos can't be a child of lower position nodes by MMR
// construction but can be a child of the next node, going up parents
// from pos to make sure it's not the case
let next_peak_pos = self.pruned_nodes[idx];
let mut cursor = pos;
loop {
let (parent, _) = family(cursor);
if next_peak_pos == parent {
return None;
}
if next_peak_pos < parent {
break;
}
cursor = parent;
}
}
Some(idx)
}
}
}
}
/// Gets the postorder traversal index of all peaks in a MMR given the last
/// node's position. Starts with the top peak, which is always on the left
/// side of the range, and navigates toward lower siblings toward the right
@ -310,7 +454,6 @@ fn peaks(num: u64) -> Vec<u64> {
let mut peak = top;
'outer: loop {
peak = bintree_jump_right_sibling(peak);
//println!("peak {}", peak);
while peak > num {
match bintree_move_down_left(peak) {
Some(p) => peak = p,
@ -380,7 +523,7 @@ fn peaks(num: u64) -> Vec<u64> {
/// To get the height of any node (say 1101), we need to travel left in the
/// tree, get the leftmost node and count the ones. To travel left, we just
/// need to subtract the position by it's most significant bit, mins one. For
/// example to get from 1101 to 110 we subtract it by (1000-1) (`13-(8-1)=6`).
/// example to get from 1101 to 110 we subtract it by (1000-1) (`13-(8-1)=5`).
/// Then to to get 110 to 11, we subtract it by (100-1) ('6-(4-1)=3`).
///
/// By applying this operation recursively, until we get a number that, in
@ -389,7 +532,7 @@ fn peaks(num: u64) -> Vec<u64> {
/// nodes are added in a MMR.
///
/// [1] https://github.com/opentimestamps/opentimestamps-server/blob/master/doc/merkle-mountain-range.md
fn bintree_postorder_height(num: u64) -> u64 {
pub fn bintree_postorder_height(num: u64) -> u64 {
let mut h = num;
while !all_ones(h) {
h = bintree_jump_left(h);
@ -397,6 +540,24 @@ fn bintree_postorder_height(num: u64) -> u64 {
most_significant_pos(h) - 1
}
/// Calculates the positions of the parent and sibling of the node at the
/// provided position.
pub fn family(pos: u64) -> (u64, u64) {
let sibling: u64;
let parent: u64;
let pos_height = bintree_postorder_height(pos);
let next_height = bintree_postorder_height(pos+1);
if next_height > pos_height {
sibling = bintree_jump_left_sibling(pos);
parent = pos + 1;
} else {
sibling = bintree_jump_right_sibling(pos);
parent = sibling + 1;
}
(parent, sibling)
}
/// Calculates the position of the top-left child of a parent node in the
/// postorder traversal of a full binary tree.
fn bintree_move_down_left(num: u64) -> Option<u64> {
@ -407,7 +568,6 @@ fn bintree_move_down_left(num: u64) -> Option<u64> {
Some(num - (1 << height))
}
/// Calculates the position of the right sibling of a node a subtree in the
/// postorder traversal of a full binary tree.
fn bintree_jump_right_sibling(num: u64) -> u64 {
@ -457,9 +617,6 @@ fn most_significant_pos(num: u64) -> u64 {
mod test {
use super::*;
use core::hash::Hashed;
use std::sync::{Arc, Mutex};
use std::ops::Deref;
#[test]
fn some_all_ones() {
@ -520,8 +677,8 @@ mod test {
self.0[0] as u64 * 0x1000 + self.0[1] as u64 * 0x100 + self.0[2] as u64 * 0x10 +
self.0[3] as u64
}
fn sum_len(&self) -> usize {
4
fn sum_len() -> usize {
8
}
}
@ -534,39 +691,6 @@ mod test {
}
}
#[derive(Clone)]
struct VecBackend {
elems: Arc<Mutex<Vec<Option<HashSum<TestElem>>>>>,
}
impl Backend<TestElem> for VecBackend {
fn append(&self, data: Vec<HashSum<TestElem>>) {
let mut elems = self.elems.lock().unwrap();
elems.append(&mut map_vec!(data, |d| Some(d.clone())));
}
fn get(&self, position: u64) -> Option<HashSum<TestElem>> {
let elems = self.elems.lock().unwrap();
elems[(position-1) as usize].clone()
}
fn remove(&self, positions: Vec<u64>) {
let mut elems = self.elems.lock().unwrap();
for n in positions {
elems[(n-1) as usize] = None
}
}
}
impl VecBackend {
fn used_size(&self) -> usize {
let elems = self.elems.lock().unwrap();
let mut usz = elems.len();
for elem in elems.deref() {
if elem.is_none() {
usz -= 1;
}
}
usz
}
}
#[test]
fn pmmr_push_root() {
let elems = [
@ -581,8 +705,8 @@ mod test {
TestElem([1, 0, 0, 0]),
];
let ba = VecBackend{elems: Arc::new(Mutex::new(vec![]))};
let mut pmmr = PMMR::new(ba.clone());
let mut ba = VecBackend::new();
let mut pmmr = PMMR::new(&mut ba);
// one element
pmmr.push(elems[0]);
@ -594,49 +718,49 @@ mod test {
// two elements
pmmr.push(elems[1]);
let sum2 = HashSum::from_summable(1, elems[0]) + HashSum::from_summable(2, elems[1]);
let sum2 = HashSum::from_summable(1, &elems[0]) + HashSum::from_summable(2, &elems[1]);
assert_eq!(pmmr.root(), sum2);
assert_eq!(pmmr.unpruned_size(), 3);
// three elements
pmmr.push(elems[2]);
let sum3 = sum2.clone() + HashSum::from_summable(4, elems[2]);
let sum3 = sum2.clone() + HashSum::from_summable(4, &elems[2]);
assert_eq!(pmmr.root(), sum3);
assert_eq!(pmmr.unpruned_size(), 4);
// four elements
pmmr.push(elems[3]);
let sum4 = sum2 + (HashSum::from_summable(4, elems[2]) + HashSum::from_summable(5, elems[3]));
let sum4 = sum2 + (HashSum::from_summable(4, &elems[2]) + HashSum::from_summable(5, &elems[3]));
assert_eq!(pmmr.root(), sum4);
assert_eq!(pmmr.unpruned_size(), 7);
// five elements
pmmr.push(elems[4]);
let sum5 = sum4.clone() + HashSum::from_summable(8, elems[4]);
let sum5 = sum4.clone() + HashSum::from_summable(8, &elems[4]);
assert_eq!(pmmr.root(), sum5);
assert_eq!(pmmr.unpruned_size(), 8);
// six elements
pmmr.push(elems[5]);
let sum6 = sum4.clone() + (HashSum::from_summable(8, elems[4]) + HashSum::from_summable(9, elems[5]));
let sum6 = sum4.clone() + (HashSum::from_summable(8, &elems[4]) + HashSum::from_summable(9, &elems[5]));
assert_eq!(pmmr.root(), sum6.clone());
assert_eq!(pmmr.unpruned_size(), 10);
// seven elements
pmmr.push(elems[6]);
let sum7 = sum6 + HashSum::from_summable(11, elems[6]);
let sum7 = sum6 + HashSum::from_summable(11, &elems[6]);
assert_eq!(pmmr.root(), sum7);
assert_eq!(pmmr.unpruned_size(), 11);
// eight elements
pmmr.push(elems[7]);
let sum8 = sum4 + ((HashSum::from_summable(8, elems[4]) + HashSum::from_summable(9, elems[5])) + (HashSum::from_summable(11, elems[6]) + HashSum::from_summable(12, elems[7])));
let sum8 = sum4 + ((HashSum::from_summable(8, &elems[4]) + HashSum::from_summable(9, &elems[5])) + (HashSum::from_summable(11, &elems[6]) + HashSum::from_summable(12, &elems[7])));
assert_eq!(pmmr.root(), sum8);
assert_eq!(pmmr.unpruned_size(), 15);
// nine elements
pmmr.push(elems[8]);
let sum9 = sum8 + HashSum::from_summable(16, elems[8]);
let sum9 = sum8 + HashSum::from_summable(16, &elems[8]);
assert_eq!(pmmr.root(), sum9);
assert_eq!(pmmr.unpruned_size(), 16);
}
@ -655,48 +779,112 @@ mod test {
TestElem([1, 0, 0, 0]),
];
let ba = VecBackend{elems: Arc::new(Mutex::new(vec![]))};
let mut pmmr = PMMR::new(ba.clone());
for elem in &elems[..] {
pmmr.push(*elem);
let orig_root: HashSum<TestElem>;
let sz: u64;
let mut ba = VecBackend::new();
{
let mut pmmr = PMMR::new(&mut ba);
for elem in &elems[..] {
pmmr.push(*elem);
}
orig_root = pmmr.root();
sz = pmmr.unpruned_size();
}
let orig_root = pmmr.root();
let orig_sz = ba.used_size();
// pruning a leaf with no parent should do nothing
pmmr.prune(16);
assert_eq!(orig_root, pmmr.root());
assert_eq!(ba.used_size(), orig_sz);
{
let mut pmmr = PMMR::at(&mut ba, sz);
pmmr.prune(16);
assert_eq!(orig_root, pmmr.root());
}
assert_eq!(ba.used_size(), 16);
// pruning leaves with no shared parent just removes 1 element
pmmr.prune(2);
assert_eq!(orig_root, pmmr.root());
assert_eq!(ba.used_size(), orig_sz - 1);
{
let mut pmmr = PMMR::at(&mut ba, sz);
pmmr.prune(2);
assert_eq!(orig_root, pmmr.root());
}
assert_eq!(ba.used_size(), 15);
pmmr.prune(4);
assert_eq!(orig_root, pmmr.root());
assert_eq!(ba.used_size(), orig_sz - 2);
{
let mut pmmr = PMMR::at(&mut ba, sz);
pmmr.prune(4);
assert_eq!(orig_root, pmmr.root());
}
assert_eq!(ba.used_size(), 14);
// pruning a non-leaf node has no effect
pmmr.prune(3);
assert_eq!(orig_root, pmmr.root());
assert_eq!(ba.used_size(), orig_sz - 2);
{
let mut pmmr = PMMR::at(&mut ba, sz);
pmmr.prune(3);
assert_eq!(orig_root, pmmr.root());
}
assert_eq!(ba.used_size(), 14);
// pruning sibling removes subtree
pmmr.prune(5);
assert_eq!(orig_root, pmmr.root());
assert_eq!(ba.used_size(), orig_sz - 4);
{
let mut pmmr = PMMR::at(&mut ba, sz);
pmmr.prune(5);
assert_eq!(orig_root, pmmr.root());
}
assert_eq!(ba.used_size(), 12);
// pruning all leaves under level >1 removes all subtree
pmmr.prune(1);
assert_eq!(orig_root, pmmr.root());
assert_eq!(ba.used_size(), orig_sz - 7);
{
let mut pmmr = PMMR::at(&mut ba, sz);
pmmr.prune(1);
assert_eq!(orig_root, pmmr.root());
}
assert_eq!(ba.used_size(), 9);
// pruning everything should only leave us the peaks
for n in 1..16 {
pmmr.prune(n);
{
let mut pmmr = PMMR::at(&mut ba, sz);
for n in 1..16 {
pmmr.prune(n);
}
assert_eq!(orig_root, pmmr.root());
}
assert_eq!(orig_root, pmmr.root());
assert_eq!(ba.used_size(), 2);
}
#[test]
fn pmmr_prune_list() {
let mut pl = PruneList::new();
pl.add(4);
assert_eq!(pl.pruned_nodes.len(), 1);
assert_eq!(pl.pruned_nodes[0], 4);
assert_eq!(pl.get_shift(5), Some(1));
assert_eq!(pl.get_shift(2), Some(0));
assert_eq!(pl.get_shift(4), None);
pl.add(5);
assert_eq!(pl.pruned_nodes.len(), 1);
assert_eq!(pl.pruned_nodes[0], 6);
assert_eq!(pl.get_shift(8), Some(3));
assert_eq!(pl.get_shift(2), Some(0));
assert_eq!(pl.get_shift(5), None);
pl.add(2);
assert_eq!(pl.pruned_nodes.len(), 2);
assert_eq!(pl.pruned_nodes[0], 2);
assert_eq!(pl.get_shift(8), Some(4));
assert_eq!(pl.get_shift(1), Some(0));
pl.add(8);
pl.add(11);
assert_eq!(pl.pruned_nodes.len(), 4);
pl.add(1);
assert_eq!(pl.pruned_nodes.len(), 3);
assert_eq!(pl.pruned_nodes[0], 7);
assert_eq!(pl.get_shift(12), Some(9));
pl.add(12);
assert_eq!(pl.pruned_nodes.len(), 3);
assert_eq!(pl.get_shift(12), None);
assert_eq!(pl.get_shift(9), Some(8));
assert_eq!(pl.get_shift(17), Some(11));
}
}

View file

@ -327,6 +327,30 @@ impl_int!(u32, write_u32, read_u32);
impl_int!(u64, write_u64, read_u64);
impl_int!(i64, write_i64, read_i64);
impl<T> Readable for Vec<T> where T: Readable {
fn read(reader: &mut Reader) -> Result<Vec<T>, Error> {
let mut buf = Vec::new();
loop {
let elem = T::read(reader);
match elem {
Ok(e) => buf.push(e),
Err(Error::IOErr(ref ioerr)) if ioerr.kind() == io::ErrorKind::UnexpectedEof => break,
Err(e) => return Err(e),
}
}
Ok(buf)
}
}
impl<T> Writeable for Vec<T> where T: Writeable {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), Error> {
for elmt in self {
elmt.write(writer)?;
}
Ok(())
}
}
impl<'a, A: Writeable> Writeable for &'a A {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), Error> {
Writeable::write(*self, writer)
@ -386,12 +410,6 @@ impl Writeable for [u8; 4] {
}
}
impl Writeable for Vec<u8> {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), Error> {
writer.write_fixed_bytes(self)
}
}
/// Useful marker trait on types that can be sized byte slices
pub trait AsFixedBytes: Sized + AsRef<[u8]> {
/// The length in bytes

View file

@ -6,6 +6,13 @@ workspace = ".."
[dependencies]
byteorder = "^0.5"
env_logger="^0.3.5"
log = "^0.3"
memmap = { git = "https://github.com/danburkert/memmap-rs" }
rocksdb = "^0.7.0"
grin_core = { path = "../core" }
[dev-dependencies]
env_logger="^0.3.5"
time = "^0.1"

View file

@ -22,8 +22,14 @@
extern crate byteorder;
extern crate grin_core as core;
#[macro_use]
extern crate log;
extern crate env_logger;
extern crate memmap;
extern crate rocksdb;
pub mod sumtree;
const SEP: u8 = ':' as u8;
use std::fmt;
@ -103,7 +109,9 @@ impl Store {
/// Gets a value from the db, provided its key
pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Error> {
let db = self.rdb.read().unwrap();
db.get(key).map(|r| r.map(|o| o.to_vec())).map_err(From::from)
db.get(key).map(|r| r.map(|o| o.to_vec())).map_err(
From::from,
)
}
/// Gets a `Readable` value from the db, provided its key. Encapsulates
@ -115,10 +123,11 @@ impl Store {
/// Gets a `Readable` value from the db, provided its key, allowing to
/// extract only partial data. The underlying Readable size must align
/// accordingly. Encapsulates serialization.
pub fn get_ser_limited<T: ser::Readable>(&self,
key: &[u8],
len: usize)
-> Result<Option<T>, Error> {
pub fn get_ser_limited<T: ser::Readable>(
&self,
key: &[u8],
len: usize,
) -> Result<Option<T>, Error> {
let data = try!(self.get(key));
match data {
Some(val) => {
@ -203,14 +212,16 @@ impl<'a> Batch<'a> {
/// An iterator thad produces Readable instances back. Wraps the lower level
/// DBIterator and deserializes the returned values.
pub struct SerIterator<T>
where T: ser::Readable
where
T: ser::Readable,
{
iter: DBIterator,
_marker: PhantomData<T>,
}
impl<T> Iterator for SerIterator<T>
where T: ser::Readable
where
T: ser::Readable,
{
type Item = T;

423
store/src/sumtree.rs Normal file
View file

@ -0,0 +1,423 @@
// Copyright 2017 The Grin Developers
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Implementation of the persistent Backend for the prunable MMR sum-tree.
use memmap;
use std::cmp;
use std::fs::{self, File, OpenOptions};
use std::io::{self, Write, BufReader, BufRead, ErrorKind};
use std::path::Path;
use std::io::Read;
use core::core::pmmr::{self, Summable, Backend, HashSum, VecBackend};
use core::ser;
const PMMR_DATA_FILE: &'static str = "pmmr_dat.bin";
const PMMR_RM_LOG_FILE: &'static str = "pmmr_rm_log.bin";
const PMMR_PRUNED_FILE: &'static str = "pmmr_pruned.bin";
/// Maximum number of nodes in the remove log before it gets flushed
pub const RM_LOG_MAX_NODES: usize = 10000;
/// Wrapper for a file that can be read at any position (random read) but for
/// which writes are append only. Reads are backed by a memory map (mmap(2)),
/// relying on the operating system for fast access and caching. The memory
/// map is reallocated to expand it when new writes are flushed.
struct AppendOnlyFile {
path: String,
file: File,
mmap: Option<memmap::Mmap>,
}
impl AppendOnlyFile {
/// Open a file (existing or not) as append-only, backed by a mmap.
fn open(path: String) -> io::Result<AppendOnlyFile> {
let file = OpenOptions::new()
.read(true)
.append(true)
.create(true)
.open(path.clone())?;
let mut aof = AppendOnlyFile {
path: path.clone(),
file: file,
mmap: None,
};
let file_path = Path::new(&path);
if file_path.exists() {
aof.sync();
}
Ok(aof)
}
/// Append data to the file.
fn append(&mut self, buf: &[u8]) -> io::Result<()> {
self.file.write_all(buf)
}
/// Syncs all writes (fsync), reallocating the memory map to make the newly
/// written data accessible.
fn sync(&mut self) -> io::Result<()> {
self.file.sync_data()?;
self.mmap = Some(unsafe {
memmap::file(&self.file)
.protection(memmap::Protection::Read)
.map()?
});
Ok(())
}
/// Read length bytes of data at offset from the file. Leverages the memory
/// map.
fn read(&self, offset: usize, length: usize) -> Vec<u8> {
if let None = self.mmap {
return vec![];
}
let mmap = self.mmap.as_ref().unwrap();
(&mmap[offset..(offset + length)]).to_vec()
}
/// Saves a copy of the current file content, skipping data at the provided
/// prune indices. The prune Vec must be ordered.
fn save_prune(&self, target: String, prune_offs: Vec<u64>, prune_len: u64) -> io::Result<()> {
let mut reader = File::open(self.path.clone())?;
let mut writer = File::create(target)?;
// align the buffer on prune_len to avoid misalignments
let mut buf = vec![0; (prune_len * 256) as usize];
let mut read = 0;
let mut prune_pos = 0;
loop {
// fill our buffer
let len = match reader.read(&mut buf) {
Ok(0) => return Ok(()),
Ok(len) => len,
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
} as u64;
// write the buffer, except if we prune offsets in the current span,
// in which case we skip
let mut buf_start = 0;
while prune_offs[prune_pos] >= read && prune_offs[prune_pos] < read + len {
let prune_at = prune_offs[prune_pos] as usize;
if prune_at != buf_start {
writer.write_all(&buf[buf_start..prune_at])?;
}
buf_start = prune_at + (prune_len as usize);
if prune_offs.len() > prune_pos + 1 {
prune_pos += 1;
} else {
break;
}
}
writer.write_all(&mut buf[buf_start..(len as usize)])?;
read += len;
}
}
/// Current size of the file in bytes.
fn size(&self) -> io::Result<u64> {
fs::metadata(&self.path).map(|md| md.len())
}
}
/// Log file fully cached in memory containing all positions that should be
/// eventually removed from the MMR append-only data file. Allows quick
/// checking of whether a piece of data has been marked for deletion. When the
/// log becomes too long, the MMR backend will actually remove chunks from the
/// MMR data file and truncate the remove log.
struct RemoveLog {
path: String,
file: File,
// Ordered vector of MMR positions that should get eventually removed.
removed: Vec<u64>,
}
impl RemoveLog {
/// Open the remove log file. The content of the file will be read in memory
/// for fast checking.
fn open(path: String) -> io::Result<RemoveLog> {
let removed = read_ordered_vec(path.clone())?;
let file = OpenOptions::new().append(true).create(true).open(path.clone())?;
Ok(RemoveLog {
path: path,
file: file,
removed: removed,
})
}
/// Truncate and empties the remove log.
fn truncate(&mut self) -> io::Result<()> {
self.removed = vec![];
self.file = File::create(self.path.clone())?;
Ok(())
}
/// Append a set of new positions to the remove log. Both adds those
/// positions
/// to the ordered in-memory set and to the file.
fn append(&mut self, elmts: Vec<u64>) -> io::Result<()> {
for elmt in elmts {
match self.removed.binary_search(&elmt) {
Ok(_) => continue,
Err(idx) => {
self.file.write_all(&ser::ser_vec(&elmt).unwrap()[..])?;
self.removed.insert(idx, elmt);
}
}
}
self.file.sync_data()
}
/// Whether the remove log currently includes the provided position.
fn includes(&self, elmt: u64) -> bool {
self.removed.binary_search(&elmt).is_ok()
}
/// Number of positions stored in the remove log.
fn len(&self) -> usize {
self.removed.len()
}
}
/// PMMR persistent backend implementation. Relies on multiple facilities to
/// handle writing, reading and pruning.
///
/// * A main storage file appends HashSum instances as they come. This
/// AppendOnlyFile is also backed by a mmap for reads.
/// * An in-memory backend buffers the latest batch of writes to ensure the
/// PMMR can always read recent values even if they haven't been flushed to
/// disk yet.
/// * A remove log tracks the positions that need to be pruned from the
/// main storage file.
pub struct PMMRBackend<T>
where
T: Summable + Clone,
{
data_dir: String,
hashsum_file: AppendOnlyFile,
remove_log: RemoveLog,
pruned_nodes: pmmr::PruneList,
// buffers addition of new elements until they're fully written to disk
buffer: VecBackend<T>,
buffer_index: usize,
}
impl<T> Backend<T> for PMMRBackend<T>
where
T: Summable + Clone,
{
/// Append the provided HashSums to the backend storage.
#[allow(unused_variables)]
fn append(&mut self, position: u64, data: Vec<HashSum<T>>) -> Result<(), String> {
self.buffer.append(
position - (self.buffer_index as u64),
data.clone(),
)?;
for hs in data {
if let Err(e) = self.hashsum_file.append(&ser::ser_vec(&hs).unwrap()[..]) {
return Err(format!(
"Could not write to log storage, disk full? {:?}",
e
));
}
}
Ok(())
}
/// Get a HashSum by insertion position
fn get(&self, position: u64) -> Option<HashSum<T>> {
// First, check if it's in our temporary write buffer
let pos_sz = position as usize;
if pos_sz - 1 >= self.buffer_index && pos_sz - 1 < self.buffer_index + self.buffer.len() {
return self.buffer.get((pos_sz - self.buffer_index) as u64);
}
// Second, check if this position has been pruned in the remove log
if self.remove_log.includes(position) {
return None;
}
// Third, check if it's in the pruned list or its offset
let shift = self.pruned_nodes.get_shift(position);
if let None = shift {
return None
}
// The MMR starts at 1, our binary backend starts at 0
let pos = position - 1;
// Must be on disk, doing a read at the correct position
let record_len = 32 + T::sum_len();
let file_offset = ((pos - shift.unwrap()) as usize) * record_len;
let data = self.hashsum_file.read(file_offset, record_len);
match ser::deserialize(&mut &data[..]) {
Ok(hashsum) => Some(hashsum),
Err(e) => {
error!(
"Corrupted storage, could not read an entry from sum tree store: {:?}",
e
);
None
}
}
}
/// Remove HashSums by insertion position
fn remove(&mut self, positions: Vec<u64>) -> Result<(), String> {
if self.buffer.used_size() > 0 {
self.buffer.remove(positions.clone()).unwrap();
}
self.remove_log.append(positions).map_err(|e| {
format!("Could not write to log storage, disk full? {:?}", e)
})
}
}
impl<T> PMMRBackend<T>
where
T: Summable + Clone,
{
/// Instantiates a new PMMR backend that will use the provided directly to
/// store its files.
pub fn new(data_dir: String) -> io::Result<PMMRBackend<T>> {
let hs_file = AppendOnlyFile::open(format!("{}/{}", data_dir, PMMR_DATA_FILE))?;
let sz = hs_file.size()?;
let record_len = 32 + T::sum_len();
let rm_log = RemoveLog::open(format!("{}/{}", data_dir, PMMR_RM_LOG_FILE))?;
let prune_list = read_ordered_vec(format!("{}/{}", data_dir, PMMR_PRUNED_FILE))?;
Ok(PMMRBackend {
data_dir: data_dir,
hashsum_file: hs_file,
remove_log: rm_log,
buffer: VecBackend::new(),
buffer_index: (sz as usize) / record_len,
pruned_nodes: pmmr::PruneList{pruned_nodes: prune_list},
})
}
/// Syncs all files to disk. A call to sync is required to ensure all the
/// data has been successfully written to disk.
pub fn sync(&mut self) -> io::Result<()> {
self.buffer_index = self.buffer_index + self.buffer.len();
self.buffer.clear();
self.hashsum_file.sync()
}
/// Checks the length of the remove log to see if it should get compacted.
/// If so, the remove log is flushed into the pruned list, which itself gets
/// saved, and the main hashsum data file is rewritten, cutting the removed
/// data.
///
/// If a max_len strictly greater than 0 is provided, the value will be used
/// to decide whether the remove log has reached its maximum length,
/// otherwise the RM_LOG_MAX_NODES default value is used.
pub fn check_compact(&mut self, max_len: usize) -> io::Result<()> {
if !(max_len > 0 && self.remove_log.len() > max_len ||
max_len == 0 && self.remove_log.len() > RM_LOG_MAX_NODES) {
return Ok(())
}
// 0. validate none of the nodes in the rm log are in the prune list (to
// avoid accidental double compaction)
for pos in &self.remove_log.removed[..] {
if let None = self.pruned_nodes.pruned_pos(*pos) {
// TODO we likely can recover from this by directly jumping to 3
error!("The remove log contains nodes that are already in the pruned \
list, a previous compaction likely failed.");
return Ok(());
}
}
// 1. save hashsum file to a compact copy, skipping data that's in the
// remove list
let tmp_prune_file = format!("{}/{}.prune", self.data_dir, PMMR_DATA_FILE);
let record_len = (32 + T::sum_len()) as u64;
let to_rm = self.remove_log.removed.iter().map(|pos| {
let shift = self.pruned_nodes.get_shift(*pos);
(*pos - 1 - shift.unwrap()) * record_len
}).collect();
self.hashsum_file.save_prune(tmp_prune_file.clone(), to_rm, record_len)?;
// 2. update the prune list and save it in place
for rm_pos in &self.remove_log.removed[..] {
self.pruned_nodes.add(*rm_pos);
}
write_vec(format!("{}/{}", self.data_dir, PMMR_PRUNED_FILE), &self.pruned_nodes.pruned_nodes)?;
// 3. move the compact copy to the hashsum file and re-open it
fs::rename(tmp_prune_file.clone(), format!("{}/{}", self.data_dir, PMMR_DATA_FILE))?;
self.hashsum_file = AppendOnlyFile::open(format!("{}/{}", self.data_dir, PMMR_DATA_FILE))?;
self.hashsum_file.sync()?;
// 4. truncate the rm log
self.remove_log.truncate()?;
Ok(())
}
}
// Read an ordered vector of scalars from a file.
fn read_ordered_vec<T>(path: String) -> io::Result<Vec<T>>
where T: ser::Readable + cmp::Ord {
let file_path = Path::new(&path);
let mut ovec = Vec::with_capacity(1000);
if file_path.exists() {
let mut file = BufReader::with_capacity(8 * 1000, File::open(path.clone())?);
loop {
// need a block to end mutable borrow before consume
let buf_len = {
let buf = file.fill_buf()?;
if buf.len() == 0 {
break;
}
let elmts_res: Result<Vec<T>, ser::Error> = ser::deserialize(&mut &buf[..]);
match elmts_res {
Ok(elmts) => {
for elmt in elmts {
if let Err(idx) = ovec.binary_search(&elmt) {
ovec.insert(idx, elmt);
}
}
}
Err(_) => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("Corrupted storage, could not read file at {}", path),
));
}
}
buf.len()
};
file.consume(buf_len);
}
}
Ok(ovec)
}
fn write_vec<T>(path: String, v: &Vec<T>) -> io::Result<()>
where T: ser::Writeable {
let mut file_path = File::create(&path)?;
ser::serialize(&mut file_path, v).map_err(|_| {
io::Error::new(
io::ErrorKind::InvalidInput,
format!("Failed to serialize data when writing to {}", path))
})?;
Ok(())
}

199
store/tests/sumtree.rs Normal file
View file

@ -0,0 +1,199 @@
// Copyright 2017 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
extern crate env_logger;
extern crate grin_core as core;
extern crate grin_store as store;
extern crate time;
use std::fs;
use core::ser::*;
use core::core::pmmr::{PMMR, Summable, HashSum, Backend};
use core::core::hash::Hashed;
#[test]
fn sumtree_append() {
let (data_dir, elems) = setup();
let mut backend = store::sumtree::PMMRBackend::new(data_dir).unwrap();
// adding first set of 4 elements and sync
let mut mmr_size = load(0, &elems[0..4], &mut backend);
backend.sync().unwrap();
// adding the rest and sync again
mmr_size = load(mmr_size, &elems[4..9], &mut backend);
backend.sync().unwrap();
// check the resulting backend store and the computation of the root
let hash = Hashed::hash(&elems[0].clone());
let sum = elems[0].sum();
let node_hash = (1 as u64, &sum, hash).hash();
assert_eq!(
backend.get(1),
Some(HashSum {
hash: node_hash,
sum: sum,
})
);
let sum2 = HashSum::from_summable(1, &elems[0]) + HashSum::from_summable(2, &elems[1]);
let sum4 = sum2 + (HashSum::from_summable(4, &elems[2]) + HashSum::from_summable(5, &elems[3]));
let sum8 = sum4 +
((HashSum::from_summable(8, &elems[4]) + HashSum::from_summable(9, &elems[5])) +
(HashSum::from_summable(11, &elems[6]) + HashSum::from_summable(12, &elems[7])));
let sum9 = sum8 + HashSum::from_summable(16, &elems[8]);
{
let pmmr = PMMR::at(&mut backend, mmr_size);
assert_eq!(pmmr.root(), sum9);
}
}
#[test]
fn sumtree_prune_compact() {
let (data_dir, elems) = setup();
// setup the mmr store with all elements
let mut backend = store::sumtree::PMMRBackend::new(data_dir).unwrap();
let mmr_size = load(0, &elems[..], &mut backend);
backend.sync().unwrap();
// save the root
let root: HashSum<TestElem>;
{
let pmmr = PMMR::at(&mut backend, mmr_size);
root = pmmr.root();
}
// pruning some choice nodes
{
let mut pmmr = PMMR::at(&mut backend, mmr_size);
pmmr.prune(1);
pmmr.prune(4);
pmmr.prune(5);
}
backend.sync().unwrap();
// check the root
{
let pmmr = PMMR::at(&mut backend, mmr_size);
assert_eq!(root, pmmr.root());
}
// compact
backend.check_compact(2).unwrap();
// recheck the root
{
let pmmr = PMMR::at(&mut backend, mmr_size);
assert_eq!(root, pmmr.root());
}
}
#[test]
fn sumtree_reload() {
let (data_dir, elems) = setup();
// set everything up with a first backend
let mmr_size: u64;
let root: HashSum<TestElem>;
{
let mut backend = store::sumtree::PMMRBackend::new(data_dir.clone()).unwrap();
mmr_size = load(0, &elems[..], &mut backend);
backend.sync().unwrap();
// save the root and prune some nodes so we have prune data
{
let mut pmmr = PMMR::at(&mut backend, mmr_size);
root = pmmr.root();
pmmr.prune(1);
pmmr.prune(4);
}
backend.sync().unwrap();
backend.check_compact(1).unwrap();
backend.sync().unwrap();
// prune some more to get rm log data
{
let mut pmmr = PMMR::at(&mut backend, mmr_size);
pmmr.prune(5);
}
backend.sync().unwrap();
}
// create a new backend and check everything is kosher
{
let mut backend = store::sumtree::PMMRBackend::new(data_dir).unwrap();
{
let pmmr = PMMR::at(&mut backend, mmr_size);
assert_eq!(root, pmmr.root());
}
assert_eq!(backend.get(5), None);
}
}
fn setup() -> (String, Vec<TestElem>) {
let _ = env_logger::init();
let t = time::get_time();
let data_dir = format!("./target/{}.{}", t.sec, t.nsec);
fs::create_dir_all(data_dir.clone()).unwrap();
let elems = vec![
TestElem([0, 0, 0, 1]),
TestElem([0, 0, 0, 2]),
TestElem([0, 0, 0, 3]),
TestElem([0, 0, 0, 4]),
TestElem([0, 0, 0, 5]),
TestElem([0, 0, 0, 6]),
TestElem([0, 0, 0, 7]),
TestElem([0, 0, 0, 8]),
TestElem([1, 0, 0, 0]),
];
(data_dir, elems)
}
fn load(pos: u64, elems: &[TestElem],
backend: &mut store::sumtree::PMMRBackend<TestElem>) -> u64 {
let mut pmmr = PMMR::at(backend, pos);
for elem in elems {
pmmr.push(elem.clone());
}
pmmr.unpruned_size()
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
struct TestElem([u32; 4]);
impl Summable for TestElem {
type Sum = u64;
fn sum(&self) -> u64 {
// sums are not allowed to overflow, so we use this simple
// non-injective "sum" function that will still be homomorphic
self.0[0] as u64 * 0x1000 + self.0[1] as u64 * 0x100 + self.0[2] as u64 * 0x10 +
self.0[3] as u64
}
fn sum_len() -> usize {
8
}
}
impl Writeable for TestElem {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), Error> {
try!(writer.write_u32(self.0[0]));
try!(writer.write_u32(self.0[1]));
try!(writer.write_u32(self.0[2]));
writer.write_u32(self.0[3])
}
}