Compaction of pruned data for chain data structures ()

* Implementation of compaction for the chain. Single entry point on the chain triggers compaction of all MMRs as well as the cleanup of the positional index and full blocks.
* API endpoint, additional tests and more fixes for compaction
* Also prune PMMR metadata, minor bug fix
* PMMR store tests fix
This commit is contained in:
Ignotus Peverell 2018-03-06 17:58:33 +00:00 committed by GitHub
parent 1ab43df807
commit a9d1b76414
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 198 additions and 62 deletions

View file

@ -351,8 +351,8 @@ impl Handler for PeerGetHandler {
}
}
// Status handler. Post a summary of the server status
// GET /v1/status
/// Status handler. Post a summary of the server status
/// GET /v1/status
pub struct StatusHandler {
pub chain: Weak<chain::Chain>,
pub peers: Weak<p2p::Peers>,
@ -370,8 +370,8 @@ impl Handler for StatusHandler {
}
}
// Chain handler. Get the head details.
// GET /v1/chain
/// Chain handler. Get the head details.
/// GET /v1/chain
pub struct ChainHandler {
pub chain: Weak<chain::Chain>,
}
@ -388,6 +388,20 @@ impl Handler for ChainHandler {
}
}
/// Chain compaction handler. Trigger a compaction of the chain state to regain
/// storage space.
/// GET /v1/chain/compact
pub struct ChainCompactHandler {
pub chain: Weak<chain::Chain>,
}
impl Handler for ChainCompactHandler {
fn handle(&self, _req: &mut Request) -> IronResult<Response> {
w(&self.chain).compact().unwrap();
Ok(Response::with((status::Ok, "{}")))
}
}
/// Gets block details given either a hash or height.
/// GET /v1/blocks/<hash>
/// GET /v1/blocks/<height>
@ -581,6 +595,9 @@ pub fn start_rest_apis<T>(
let chain_tip_handler = ChainHandler {
chain: chain.clone(),
};
let chain_compact_handler = ChainCompactHandler {
chain: chain.clone(),
};
let status_handler = StatusHandler {
chain: chain.clone(),
peers: peers.clone(),
@ -610,6 +627,7 @@ pub fn start_rest_apis<T>(
let route_list = vec![
"get blocks".to_string(),
"get chain".to_string(),
"get chain/compact".to_string(),
"get chain/outputs".to_string(),
"get status".to_string(),
"get txhashset/roots".to_string(),
@ -624,13 +642,13 @@ pub fn start_rest_apis<T>(
"get peers/connected".to_string(),
"get peers/a.b.c.d".to_string(),
];
// We allow manually banning, like this:
// curl -v -X POST http://127.0.0.1:13413/v1/peers/88.99.251.87:13414/ban
let index_handler = IndexHandler { list: route_list };
let router = router!(
index: get "/" => index_handler,
blocks: get "/blocks/*" => block_handler,
chain_tip: get "/chain" => chain_tip_handler,
chain_compact: get "/chain/compact" => chain_compact_handler,
chain_outputs: get "/chain/outputs/*" => output_handler,
status: get "/status" => status_handler,
txhashset_roots: get "/txhashset/*" => txhashset_handler,

View file

@ -162,7 +162,8 @@ impl Chain {
Err(e) => return Err(Error::StoreErr(e, "chain init load head".to_owned())),
};
let mut txhashset = txhashset::TxHashSet::open(db_root.clone(), store.clone(), txhashset_md)?;
let mut txhashset =
txhashset::TxHashSet::open(db_root.clone(), store.clone(), txhashset_md)?;
let head = store.head();
let head = match head {
@ -172,7 +173,9 @@ impl Chain {
store.save_block(&genesis)?;
store.setup_height(&genesis.header, &tip)?;
if genesis.kernels.len() > 0 {
txhashset::extending(&mut txhashset, |extension| extension.apply_block(&genesis))?;
txhashset::extending(&mut txhashset, |extension| {
extension.apply_block(&genesis)
})?;
}
// saving a new tip based on genesis
@ -506,7 +509,8 @@ impl Chain {
let header = self.store.get_block_header(&h)?;
txhashset::zip_write(self.db_root.clone(), txhashset_data)?;
let mut txhashset = txhashset::TxHashSet::open(self.db_root.clone(), self.store.clone(), None)?;
let mut txhashset =
txhashset::TxHashSet::open(self.db_root.clone(), self.store.clone(), None)?;
txhashset::extending(&mut txhashset, |extension| {
extension.rewind_pos(header.height, rewind_to_output, rewind_to_kernel)?;
extension.validate(&header)?;
@ -534,6 +538,47 @@ impl Chain {
Ok(())
}
/// Triggers chain compaction, cleaning up some unecessary historical
/// information. We introduce a chain depth called horizon, which is
/// typically in the range of a couple days. Before that horizon, this
/// method will:
///
/// * compact the MMRs data files and flushing the corresponding remove logs
/// * delete old records from the k/v store (older blocks, indexes, etc.)
///
/// This operation can be resource intensive and takes some time to execute.
/// Meanwhile, the chain will not be able to accept new blocks. It should
/// therefore be called judiciously.
pub fn compact(&self) -> Result<(), Error> {
let mut sumtrees = self.txhashset.write().unwrap();
sumtrees.compact()?;
let horizon = global::cut_through_horizon() as u64;
let head = self.head()?;
let mut current = self.store.get_header_by_height(head.height - horizon - 1)?;
loop {
match self.store.get_block(&current.hash()) {
Ok(b) => {
self.store.delete_block(&b.hash())?;
self.store.delete_block_pmmr_file_metadata(&b.hash())?;
}
Err(NotFoundErr) => {
break;
}
Err(e) => return Err(Error::StoreErr(e, "retrieving block to compact".to_owned())),
}
if current.height <= 1 {
break;
}
match self.store.get_block_header(&current.previous) {
Ok(h) => current = h,
Err(NotFoundErr) => break,
Err(e) => return Err(From::from(e)),
}
}
Ok(())
}
/// returns the last n nodes inserted into the output sum tree
pub fn get_last_n_output(&self, distance: u64) -> Vec<(Hash, Option<OutputStoreable>)> {
let mut txhashset = self.txhashset.write().unwrap();

View file

@ -171,6 +171,11 @@ impl ChainStore for ChainKVStore {
)
}
fn delete_output_pos(&self, commit: &[u8]) -> Result<(), Error> {
self.db
.delete(&to_key(COMMIT_POS_PREFIX, &mut commit.to_vec()))
}
fn save_kernel_pos(&self, excess: &Commitment, pos: u64) -> Result<(), Error> {
self.db.put_ser(
&to_key(KERNEL_POS_PREFIX, &mut excess.as_ref().to_vec())[..],

View file

@ -29,11 +29,13 @@ use core::consensus::reward;
use core::core::{Block, BlockHeader, Input, Output, OutputFeatures, OutputIdentifier,
OutputStoreable, TxKernel};
use core::core::pmmr::{self, MerkleProof, PMMR};
use core::global;
use core::core::hash::{Hash, Hashed};
use core::ser::{self, PMMRable, PMMRIndexHashable};
use core::ser::{self, PMMRIndexHashable, PMMRable};
use grin_store;
use grin_store::pmmr::{PMMRBackend, PMMRFileMetadata};
use grin_store::types::prune_noop;
use types::{ChainStore, Error, PMMRFileMetadataCollection, TxHashSetRoots};
use util::{zip, LOGGER};
@ -102,7 +104,9 @@ impl TxHashSet {
commit_index: Arc<ChainStore>,
last_file_positions: Option<PMMRFileMetadataCollection>,
) -> Result<TxHashSet, Error> {
let output_file_path: PathBuf = [&root_dir, TXHASHSET_SUBDIR, OUTPUT_SUBDIR].iter().collect();
let output_file_path: PathBuf = [&root_dir, TXHASHSET_SUBDIR, OUTPUT_SUBDIR]
.iter()
.collect();
fs::create_dir_all(output_file_path.clone())?;
let rproof_file_path: PathBuf = [&root_dir, TXHASHSET_SUBDIR, RANGE_PROOF_SUBDIR]
@ -110,8 +114,9 @@ impl TxHashSet {
.collect();
fs::create_dir_all(rproof_file_path.clone())?;
let kernel_file_path: PathBuf =
[&root_dir, TXHASHSET_SUBDIR, KERNEL_SUBDIR].iter().collect();
let kernel_file_path: PathBuf = [&root_dir, TXHASHSET_SUBDIR, KERNEL_SUBDIR]
.iter()
.collect();
fs::create_dir_all(kernel_file_path.clone())?;
let mut output_md = None;
@ -203,6 +208,23 @@ impl TxHashSet {
PMMR::at(&mut self.kernel_pmmr_h.backend, self.kernel_pmmr_h.last_pos);
(output_pmmr.root(), rproof_pmmr.root(), kernel_pmmr.root())
}
/// Compact the MMR data files and flush the rm logs
pub fn compact(&mut self) -> Result<(), Error> {
let horizon = global::cut_through_horizon();
let commit_index = self.commit_index.clone();
let clean_output_index = |commit: &[u8]| {
let _ = commit_index.delete_output_pos(commit);
};
let min_rm = (horizon / 10) as usize;
self.output_pmmr_h
.backend
.check_compact(min_rm, horizon, clean_output_index)?;
self.rproof_pmmr_h
.backend
.check_compact(min_rm, horizon, &prune_noop)?;
Ok(())
}
}
/// Starts a new unit of work to extend the chain with additional blocks,
@ -280,7 +302,10 @@ impl<'a> Extension<'a> {
// constructor
fn new(trees: &'a mut TxHashSet, commit_index: Arc<ChainStore>) -> Extension<'a> {
Extension {
output_pmmr: PMMR::at(&mut trees.output_pmmr_h.backend, trees.output_pmmr_h.last_pos),
output_pmmr: PMMR::at(
&mut trees.output_pmmr_h.backend,
trees.output_pmmr_h.last_pos,
),
rproof_pmmr: PMMR::at(
&mut trees.rproof_pmmr_h.backend,
trees.rproof_pmmr_h.last_pos,
@ -355,7 +380,10 @@ impl<'a> Extension<'a> {
// if not then the input is not being honest about
// what it is attempting to spend...
if output_id_hash != read_hash
|| output_id_hash != read_elem.expect("no output at position").hash_with_index(pos)
|| output_id_hash
!= read_elem
.expect("no output at position")
.hash_with_index(pos)
{
return Err(Error::TxHashSetErr(format!("output pmmr hash mismatch")));
}
@ -583,6 +611,15 @@ impl<'a> Extension<'a> {
}
}
}
for n in 1..self.kernel_pmmr.unpruned_size() + 1 {
// non-pruned leaves only
if pmmr::bintree_postorder_height(n) == 0 {
if let Some((_, kernel)) = self.kernel_pmmr.get(n, true) {
self.commit_index
.save_kernel_pos(&kernel.expect("not a leaf node").excess, n)?;
}
}
}
Ok(())
}
@ -668,7 +705,8 @@ impl<'a> Extension<'a> {
sum_output = Some(commit);
} else {
let secp = secp.lock().unwrap();
sum_output = Some(secp.commit_sum(vec![sum_output.unwrap(), commit], vec![])?);
sum_output =
Some(secp.commit_sum(vec![sum_output.unwrap(), commit], vec![])?);
}
output_count += 1;
}

View file

@ -283,6 +283,9 @@ pub trait ChainStore: Send + Sync {
/// Output MMR. Used as an index for spending and pruning.
fn get_output_pos(&self, commit: &Commitment) -> Result<u64, store::Error>;
/// Deletes the MMR position of an output.
fn delete_output_pos(&self, commit: &[u8]) -> Result<(), store::Error>;
/// Saves the position of a kernel, represented by its excess, in the
/// Kernel MMR. Used as an index for spending and pruning.
fn save_kernel_pos(&self, commit: &Commitment, pos: u64) -> Result<(), store::Error>;

View file

@ -240,7 +240,7 @@ fn longer_fork() {
}
#[test]
fn spend_in_fork() {
fn spend_in_fork_and_compact() {
util::init_test_logger();
let chain = setup(".grin6");
let prev = chain.head_header().unwrap();
@ -366,6 +366,18 @@ fn spend_in_fork() {
.is_unspent(&OutputIdentifier::from_output(&tx1.outputs[0]))
.is_err()
);
// add 20 blocks to go past the test horizon
let mut prev = prev_fork;
for n in 0..20 {
let next = prepare_block(&kc, &prev, &chain, 11 + n);
prev = next.header.clone();
chain.process_block(next, chain::Options::SKIP_POW).unwrap();
}
chain.validate().unwrap();
chain.compact().unwrap();
chain.validate().unwrap();
}
fn prepare_block(kc: &Keychain, prev: &BlockHeader, chain: &Chain, diff: u64) -> Block {

View file

@ -20,7 +20,7 @@ extern crate test;
use rand::Rng;
use test::Bencher;
use core::core::txhashset::{self, TxHashSet, Summable};
use core::core::txhashset::{self, Summable, TxHashSet};
use core::ser::{Error, Writeable, Writer};
#[derive(Copy, Clone, Debug)]

View file

@ -40,7 +40,7 @@ use std::marker::PhantomData;
use core::hash::{Hash, Hashed};
use ser;
use ser::{Readable, Reader, Writeable, Writer};
use ser::{PMMRable, PMMRIndexHashable};
use ser::{PMMRIndexHashable, PMMRable};
use util;
use util::LOGGER;
@ -584,7 +584,8 @@ impl PruneList {
}
/// Computes by how many positions a node at pos should be shifted given the
/// number of nodes that have already been pruned before it.
/// number of nodes that have already been pruned before it. Returns None if
/// the position has already been pruned.
pub fn get_shift(&self, pos: u64) -> Option<u64> {
// get the position where the node at pos would fit in the pruned list, if
// it's already pruned, nothing to skip
@ -605,7 +606,8 @@ impl PruneList {
/// As above, but only returning the number of leaf nodes to skip for a
/// given leaf. Helpful if, for instance, data for each leaf is being stored
/// separately in a continuous flat-file
/// separately in a continuous flat-file. Returns None if the position has
/// already been pruned.
pub fn get_leaf_shift(&self, pos: u64) -> Option<u64> {
// get the position where the node at pos would fit in the pruned list, if
// it's already pruned, nothing to skip
@ -917,7 +919,7 @@ mod test {
use ser::{Error, Readable, Writeable};
use core::{Reader, Writer};
use core::hash::Hash;
use ser::{PMMRable, PMMRIndexHashable};
use ser::{PMMRIndexHashable, PMMRable};
/// Simple MMR backend implementation based on a Vector. Pruning does not
/// compact the Vec itself.
@ -1170,7 +1172,7 @@ mod test {
fn len() -> usize {
16
}
}
}
impl Writeable for TestElem {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), Error> {
@ -1354,8 +1356,8 @@ mod test {
// eight elements
pmmr.push(elems[7]).unwrap();
let sum8 =
sum4 + ((elems[4].hash_with_index(8) + elems[5].hash_with_index(9))
let sum8 = sum4
+ ((elems[4].hash_with_index(8) + elems[5].hash_with_index(9))
+ (elems[6].hash_with_index(11) + elems[7].hash_with_index(12)));
assert_eq!(pmmr.root(), sum8);
assert_eq!(pmmr.unpruned_size(), 15);

View file

@ -938,8 +938,6 @@ pub struct OutputIdentifier {
pub commit: Commitment,
}
impl OutputIdentifier {
/// Build a new output_identifier.
pub fn new(features: OutputFeatures, commit: &Commitment) -> OutputIdentifier {

View file

@ -25,7 +25,7 @@ use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
use keychain::{BlindingFactor, Identifier, IDENTIFIER_SIZE};
use consensus;
use consensus::VerifySortOrder;
use core::hash::{Hashed, Hash};
use core::hash::{Hash, Hashed};
use core::transaction::{SwitchCommitHash, SWITCH_COMMIT_HASH_SIZE};
use util::secp::pedersen::Commitment;
use util::secp::pedersen::RangeProof;

View file

@ -82,7 +82,8 @@ pub fn run_sync(
}
// run the body_sync every 5s
if have_txhashset && current_time - prev_body_sync > time::Duration::seconds(5) {
if have_txhashset && current_time - prev_body_sync > time::Duration::seconds(5)
{
body_sync(peers.clone(), chain.clone());
prev_body_sync = current_time;
}
@ -107,8 +108,10 @@ pub fn run_sync(
txhashset_head =
chain.get_block_header(&txhashset_head.previous).unwrap();
}
p.send_txhashset_request(txhashset_head.height, txhashset_head.hash())
.unwrap();
p.send_txhashset_request(
txhashset_head.height,
txhashset_head.hash(),
).unwrap();
prev_state_sync = current_time;
}
}

View file

@ -21,7 +21,7 @@ use core::core::pmmr::{self, Backend};
use core::ser::{self, PMMRable, Readable, Reader, Writeable, Writer};
use core::core::hash::Hash;
use util::LOGGER;
use types::{read_ordered_vec, write_vec, AppendOnlyFile, RemoveLog};
use types::*;
const PMMR_HASH_FILE: &'static str = "pmmr_hash.bin";
const PMMR_DATA_FILE: &'static str = "pmmr_data.bin";
@ -134,26 +134,23 @@ where
/// Get a Hash by insertion position
fn get(&self, position: u64, include_data: bool) -> Option<(Hash, Option<T>)> {
// Check if this position has been pruned in the remove log or the
// pruned list
// Check if this position has been pruned in the remove log...
if self.rm_log.includes(position) {
return None;
}
// ... or in the prune list
let prune_shift = match self.pruned_nodes.get_leaf_shift(position) {
Some(shift) => shift,
None => return None,
};
let hash_val = self.get_from_file(position);
// TODO - clean this up
if !include_data {
if let Some(hash) = hash_val {
return Some((hash, None));
} else {
return None;
}
return hash_val.map(|hash| (hash, None));
}
// Optionally read flatfile storage to get data element
let flatfile_pos =
pmmr::n_leaves(position) - 1 - self.pruned_nodes.get_leaf_shift(position).unwrap();
let flatfile_pos = pmmr::n_leaves(position) - 1 - prune_shift;
let record_len = T::len();
let file_offset = flatfile_pos as usize * T::len();
let data = self.data_file.read(file_offset, record_len);
@ -312,14 +309,19 @@ where
/// ignore any prunable data beyond the cutoff. This is used to enforce
/// a horizon after which the local node should have all the data to allow
/// rewinding.
///
/// TODO whatever is calling this should also clean up the commit to
/// position index in db
pub fn check_compact(&mut self, max_len: usize, cutoff_index: u32) -> io::Result<()> {
if !(max_len > 0 && self.rm_log.len() > max_len
pub fn check_compact<P>(
&mut self,
max_len: usize,
cutoff_index: u32,
prune_cb: P,
) -> io::Result<bool>
where
P: Fn(&[u8]),
{
if !(max_len > 0 && self.rm_log.len() >= max_len
|| max_len == 0 && self.rm_log.len() > RM_LOG_MAX_NODES)
{
return Ok(());
return Ok(false);
}
// 0. validate none of the nodes in the rm log are in the prune list (to
@ -332,7 +334,7 @@ where
"The remove log contains nodes that are already in the pruned \
list, a previous compaction likely failed."
);
return Ok(());
return Ok(false);
}
}
@ -347,7 +349,7 @@ where
None
});
self.hash_file
.save_prune(tmp_prune_file_hash.clone(), to_rm, record_len)?;
.save_prune(tmp_prune_file_hash.clone(), to_rm, record_len, &prune_noop)?;
// 2. And the same with the data file
let tmp_prune_file_data = format!("{}/{}.dataprune", self.data_dir, PMMR_DATA_FILE);
@ -362,7 +364,7 @@ where
}
});
self.data_file
.save_prune(tmp_prune_file_data.clone(), to_rm, record_len)?;
.save_prune(tmp_prune_file_data.clone(), to_rm, record_len, prune_cb)?;
// 3. update the prune list and save it in place
for &(rm_pos, idx) in &self.rm_log.removed[..] {
@ -398,6 +400,6 @@ where
.collect();
self.rm_log.flush()?;
Ok(())
Ok(true)
}
}

View file

@ -28,6 +28,8 @@ use libc::{ftruncate as ftruncate64, off_t as off64_t};
use core::ser;
pub fn prune_noop(_pruned_data: &[u8]) {}
/// Wrapper for a file that can be read at any position (random read) but for
/// which writes are append only. Reads are backed by a memory map (mmap(2)),
/// relying on the operating system for fast access and caching. The memory
@ -150,12 +152,16 @@ impl AppendOnlyFile {
/// Saves a copy of the current file content, skipping data at the provided
/// prune indices. The prune Vec must be ordered.
pub fn save_prune(
pub fn save_prune<T>(
&self,
target: String,
prune_offs: Vec<u64>,
prune_len: u64,
) -> io::Result<()> {
prune_cb: T,
) -> io::Result<()>
where
T: Fn(&[u8]),
{
let mut reader = File::open(self.path.clone())?;
let mut writer = File::create(target)?;
@ -179,6 +185,8 @@ impl AppendOnlyFile {
let prune_at = prune_offs[prune_pos] as usize;
if prune_at != buf_start {
writer.write_all(&buf[buf_start..prune_at])?;
} else {
prune_cb(&buf[buf_start..prune_at]);
}
buf_start = prune_at + (prune_len as usize);
if prune_offs.len() > prune_pos + 1 {

View file

@ -22,6 +22,7 @@ use std::fs;
use core::ser::*;
use core::core::pmmr::{Backend, PMMR};
use core::core::hash::{Hash, Hashed};
use store::types::prune_noop;
#[test]
fn pmmr_append() {
@ -42,8 +43,9 @@ fn pmmr_append() {
let sum2 = elems[0].hash_with_index(1) + elems[1].hash_with_index(2);
let sum4 = sum2 + (elems[2].hash_with_index(4) + elems[3].hash_with_index(5));
let sum8 = sum4 + ((elems[4].hash_with_index(8) + elems[5].hash_with_index(9))
+ (elems[6].hash_with_index(11) + elems[7].hash_with_index(12)));
let sum8 = sum4
+ ((elems[4].hash_with_index(8) + elems[5].hash_with_index(9))
+ (elems[6].hash_with_index(11) + elems[7].hash_with_index(12)));
let sum9 = sum8 + elems[8].hash_with_index(16);
{
@ -91,7 +93,7 @@ fn pmmr_prune_compact() {
}
// compact
backend.check_compact(2, 2).unwrap();
backend.check_compact(2, 2, &prune_noop).unwrap();
// recheck the root and stored data
{
@ -132,7 +134,7 @@ fn pmmr_reload() {
}
backend.sync().unwrap();
backend.check_compact(1, 2).unwrap();
backend.check_compact(1, 2, &prune_noop).unwrap();
backend.sync().unwrap();
assert_eq!(backend.unpruned_size().unwrap(), mmr_size);
@ -191,7 +193,7 @@ fn pmmr_rewind() {
pmmr.prune(1, 1).unwrap();
pmmr.prune(2, 1).unwrap();
}
backend.check_compact(1, 2).unwrap();
backend.check_compact(1, 2, &prune_noop).unwrap();
backend.sync().unwrap();
// rewind and check the roots still match
@ -247,7 +249,7 @@ fn pmmr_compact_horizon() {
}
backend.sync().unwrap();
// compact
backend.check_compact(2, 3).unwrap();
backend.check_compact(2, 3, &prune_noop).unwrap();
}
// recheck stored data
@ -261,7 +263,7 @@ fn pmmr_compact_horizon() {
assert_eq!(backend.hash_size().unwrap(), 13);
// compact some more
backend.check_compact(1, 5).unwrap();
backend.check_compact(1, 5, &prune_noop).unwrap();
}
// recheck stored data