Fix MMR compaction to take a horizon. Additional tests.

This commit is contained in:
Ignotus Peverell 2018-02-28 21:15:29 +00:00
parent 9e11afe8a2
commit 9fa344383c
No known key found for this signature in database
GPG key ID: 99CD25F39F8F8211
4 changed files with 106 additions and 29 deletions

View file

@ -22,6 +22,7 @@
extern crate byteorder;
extern crate env_logger;
#[macro_use]
extern crate grin_core as core;
extern crate grin_util as util;
extern crate libc;

View file

@ -180,8 +180,8 @@ where
})
}
/// Total size of the PMMR stored by this backend. Only produces the fully
/// sync'd size.
/// Number of elements in the PMMR stored by this backend. Only produces the
/// fully sync'd size.
pub fn unpruned_size(&self) -> io::Result<u64> {
let total_shift = self.pruned_nodes.get_shift(::std::u64::MAX).unwrap();
let record_len = 32;
@ -189,6 +189,19 @@ where
Ok(sz / record_len + total_shift)
}
/// Number of elements in the underlying stored data. Extremely dependent on
/// pruning and compaction.
pub fn data_size(&self) -> io::Result<u64> {
let record_len = T::len() as u64;
self.data_file.size().map(|sz| sz / record_len)
}
/// Size of the underlying hashed data. Extremely dependent on pruning
/// and compaction.
pub fn hash_size(&self) -> io::Result<u64> {
self.hash_file.size().map(|sz| sz / 32)
}
/// Syncs all files to disk. A call to sync is required to ensure all the
/// data has been successfully written to disk.
pub fn sync(&mut self) -> io::Result<()> {
@ -229,9 +242,15 @@ where
/// to decide whether the remove log has reached its maximum length,
/// otherwise the RM_LOG_MAX_NODES default value is used.
///
/// A cutoff limits compaction on recent data. Provided as an indexed value
/// on pruned data (practically a block height), it forces compaction to
/// ignore any prunable data beyond the cutoff. This is used to enforce
/// an horizon after which the local node should have all the data to allow
/// rewinding.
///
/// TODO whatever is calling this should also clean up the commit to
/// position index in db
pub fn check_compact(&mut self, max_len: usize) -> io::Result<()> {
pub fn check_compact(&mut self, max_len: usize, cutoff_index: u32) -> io::Result<()> {
if !(max_len > 0 && self.rm_log.len() > max_len
|| max_len == 0 && self.rm_log.len() > RM_LOG_MAX_NODES)
{
@ -242,6 +261,7 @@ where
// avoid accidental double compaction)
for pos in &self.rm_log.removed[..] {
if let None = self.pruned_nodes.pruned_pos(pos.0) {
println!("ALREADY PRUNED?");
// TODO we likely can recover from this by directly jumping to 3
error!(
LOGGER,
@ -256,38 +276,38 @@ where
// remove list
let tmp_prune_file_hash = format!("{}/{}.hashprune", self.data_dir, PMMR_HASH_FILE);
let record_len = 32;
let to_rm = self.rm_log
.removed
.iter()
.map(|&(pos, _)| {
let to_rm = filter_map_vec!(self.rm_log.removed, |&(pos, idx)| {
if idx < cutoff_index {
let shift = self.pruned_nodes.get_shift(pos);
(pos - 1 - shift.unwrap()) * record_len
})
.collect();
Some((pos - 1 - shift.unwrap()) * record_len)
} else {
None
}
});
self.hash_file
.save_prune(tmp_prune_file_hash.clone(), to_rm, record_len)?;
// 2. And the same with the data file
let tmp_prune_file_data = format!("{}/{}.dataprune", self.data_dir, PMMR_DATA_FILE);
let record_len = T::len() as u64;
let to_rm = self.rm_log
.removed.clone()
.into_iter()
.filter(|&(pos, _)| pmmr::bintree_postorder_height(pos) == 0)
.map(|(pos, _)| {
let to_rm = filter_map_vec!(self.rm_log.removed, |&(pos, idx)| {
if pmmr::bintree_postorder_height(pos) == 0 && idx < cutoff_index {
let shift = self.pruned_nodes.get_leaf_shift(pos).unwrap();
let pos = pmmr::n_leaves(pos as u64);
(pos - 1 - shift) * record_len
})
.collect();
Some((pos - 1 - shift) * record_len)
} else {
None
}
});
self.data_file
.save_prune(tmp_prune_file_data.clone(), to_rm, record_len)?;
// 3. update the prune list and save it in place
for &(rm_pos, _) in &self.rm_log.removed[..] {
for &(rm_pos, idx) in &self.rm_log.removed[..] {
if idx < cutoff_index {
self.pruned_nodes.add(rm_pos);
}
}
write_vec(
format!("{}/{}", self.data_dir, PMMR_PRUNED_FILE),
&self.pruned_nodes.pruned_nodes,
@ -308,7 +328,11 @@ where
self.data_file = AppendOnlyFile::open(format!("{}/{}", self.data_dir, PMMR_DATA_FILE))?;
// 6. truncate the rm log
self.rm_log.rewind(0)?;
self.rm_log.removed = self.rm_log.removed
.iter()
.filter(|&&(_, idx)| idx >= cutoff_index)
.map(|x| *x)
.collect();
self.rm_log.flush()?;
Ok(())

View file

@ -219,10 +219,6 @@ impl RemoveLog {
pub fn rewind(&mut self, last_offs: u32) -> io::Result<()> {
// simplifying assumption: we always remove older than what's in tmp
self.removed_tmp = vec![];
// backing it up before truncating
self.removed_bak = self.removed.clone();
// backing it up before truncating
self.removed_bak = self.removed.clone();

View file

@ -93,7 +93,7 @@ fn pmmr_prune_compact() {
}
// compact
backend.check_compact(2).unwrap();
backend.check_compact(2, 2).unwrap();
// recheck the root and stored data
{
@ -128,7 +128,7 @@ fn pmmr_reload() {
}
backend.sync().unwrap();
backend.check_compact(1).unwrap();
backend.check_compact(1, 2).unwrap();
backend.sync().unwrap();
assert_eq!(backend.unpruned_size().unwrap(), mmr_size);
@ -187,7 +187,7 @@ fn pmmr_rewind() {
pmmr.prune(1, 1).unwrap();
pmmr.prune(2, 1).unwrap();
}
backend.check_compact(1).unwrap();
backend.check_compact(1, 2).unwrap();
backend.sync().unwrap();
// rewind and check the roots still match
@ -216,6 +216,62 @@ fn pmmr_rewind() {
teardown(data_dir);
}
#[test]
fn pmmr_compact_horizon() {
let (data_dir, elems) = setup("compact_horizon");
let root: Hash;
{
// setup the mmr store with all elements
let mut backend = store::pmmr::PMMRBackend::new(data_dir.to_string()).unwrap();
let mmr_size = load(0, &elems[..], &mut backend);
backend.sync().unwrap();
// save the root
{
let pmmr:PMMR<TestElem, _> = PMMR::at(&mut backend, mmr_size);
root = pmmr.root();
}
// pruning some choice nodes with an increasing block height
{
let mut pmmr:PMMR<TestElem, _> = PMMR::at(&mut backend, mmr_size);
pmmr.prune(1, 1).unwrap();
pmmr.prune(2, 2).unwrap();
pmmr.prune(4, 3).unwrap();
pmmr.prune(5, 4).unwrap();
}
backend.sync().unwrap();
// compact
backend.check_compact(2, 3).unwrap();
}
// recheck stored data
{
// recreate backend
let mut backend = store::pmmr::PMMRBackend::<TestElem>::new(data_dir.to_string()).unwrap();
// 9 elements total, minus 2 compacted
assert_eq!(backend.data_size().unwrap(), 7);
// 15 nodes total, 2 pruned and compacted
assert_eq!(backend.hash_size().unwrap(), 13);
// compact some more
backend.check_compact(1, 5).unwrap();
}
// recheck stored data
{
// recreate backend
let backend = store::pmmr::PMMRBackend::<TestElem>::new(data_dir.to_string()).unwrap();
// 9 elements total, minus 4 compacted
assert_eq!(backend.data_size().unwrap(), 5);
// 15 nodes total, 6 pruned and compacted
assert_eq!(backend.hash_size().unwrap(), 9);
}
teardown(data_dir);
}
fn setup(tag: &str) -> (String, Vec<TestElem>) {
let _ = env_logger::init();
let t = time::get_time();