PMMR Block to File positions metadata storage ()

* adding file position index data accessable to the chain, and allowing for storage of such within db

* missing file

* restart files at last recorded position in stored file metadata

* just use tip to store last pmmr index information

* error handling

* test fix
This commit is contained in:
Yeastplume 2018-03-03 09:08:36 +00:00 committed by GitHub
parent cc12798d7a
commit 4c34c9ab52
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 416 additions and 43 deletions

View file

@ -151,11 +151,22 @@ impl Chain {
let chain_store = store::ChainKVStore::new(db_root.clone())?;
let store = Arc::new(chain_store);
let mut sumtrees = sumtree::SumTrees::open(db_root.clone(), store.clone())?;
// check if we have a head in store, otherwise the genesis block is it
let head = match store.head() {
Ok(tip) => tip,
let head = store.head();
let sumtree_md = match head {
Ok(h) => {
Some(store.get_block_pmmr_file_metadata(&h.last_block_h)?)
},
Err(NotFoundErr) => None,
Err(e) => return Err(Error::StoreErr(e, "chain init load head".to_owned())),
};
let mut sumtrees = sumtree::SumTrees::open(db_root.clone(), store.clone(), sumtree_md)?;
let head = store.head();
let head = match head {
Ok(h) => h,
Err(NotFoundErr) => {
let tip = Tip::new(genesis.hash());
store.save_block(&genesis)?;
@ -175,6 +186,7 @@ impl Chain {
genesis.header.nonce,
genesis.header.pow,
);
pipe::save_pmmr_metadata(&tip, &sumtrees, store.clone())?;
tip
}
Err(e) => return Err(Error::StoreErr(e, "chain init load head".to_owned())),
@ -507,7 +519,7 @@ impl Chain {
let header = self.store.get_block_header(&h)?;
sumtree::zip_write(self.db_root.clone(), sumtree_data)?;
let mut sumtrees = sumtree::SumTrees::open(self.db_root.clone(), self.store.clone())?;
let mut sumtrees = sumtree::SumTrees::open(self.db_root.clone(), self.store.clone(), None)?;
sumtree::extending(&mut sumtrees, |extension| {
extension.rewind_pos(header.height, rewind_to_output, rewind_to_kernel)?;
extension.validate(&header)?;
@ -636,8 +648,13 @@ impl Chain {
/// Check whether we have a block without reading it
pub fn block_exists(&self, h: Hash) -> Result<bool, Error> {
self.store
.block_exists(&h)
self.store.block_exists(&h)
.map_err(|e| Error::StoreErr(e, "chain block exists".to_owned()))
}
/// Retrieve the file index metadata for a given block
pub fn get_block_pmmr_file_metadata(&self, h: &Hash) -> Result<PMMRFileMetadataCollection, Error> {
self.store.get_block_pmmr_file_metadata(h)
.map_err(|e| Error::StoreErr(e, "retrieve block pmmr metadata".to_owned()))
}
}

View file

@ -93,7 +93,7 @@ pub fn process_block(b: &Block, mut ctx: BlockContext) -> Result<Option<Tip>, Er
// start a chain extension unit of work dependent on the success of the
// internal validation and saving operations
sumtree::extending(&mut sumtrees, |mut extension| {
let result = sumtree::extending(&mut sumtrees, |mut extension| {
validate_block(b, &mut ctx, &mut extension)?;
debug!(
LOGGER,
@ -108,7 +108,25 @@ pub fn process_block(b: &Block, mut ctx: BlockContext) -> Result<Option<Tip>, Er
extension.force_rollback();
}
Ok(h)
})
});
match result {
Ok(t) => {
save_pmmr_metadata(&Tip::from_block(&b.header), &sumtrees, ctx.store.clone())?;
Ok(t)
},
Err(e) => Err(e),
}
}
/// Save pmmr index location for a given block
pub fn save_pmmr_metadata(t: &Tip, sumtrees: &sumtree::SumTrees, store: Arc<ChainStore>) -> Result<(), Error> {
// Save pmmr file metadata for this block
let block_file_md = sumtrees.last_file_metadata();
store
.save_block_pmmr_file_metadata(&t.last_block_h, &block_file_md)
.map_err(|e| Error::StoreErr(e, "saving pmmr file metadata".to_owned()))?;
Ok(())
}
/// Process the block header.

View file

@ -35,6 +35,7 @@ const SYNC_HEAD_PREFIX: u8 = 's' as u8;
const HEADER_HEIGHT_PREFIX: u8 = '8' as u8;
const COMMIT_POS_PREFIX: u8 = 'c' as u8;
const KERNEL_POS_PREFIX: u8 = 'k' as u8;
const BLOCK_PMMR_FILE_METADATA_PREFIX: u8 = 'p' as u8;
/// An implementation of the ChainStore trait backed by a simple key-value
/// store.
@ -186,6 +187,24 @@ impl ChainStore for ChainKVStore {
)
}
fn save_block_pmmr_file_metadata(&self, h:&Hash, md: &PMMRFileMetadataCollection) -> Result<(), Error> {
self.db.put_ser(
&to_key(BLOCK_PMMR_FILE_METADATA_PREFIX, &mut h.to_vec())[..],
&md,
)
}
fn get_block_pmmr_file_metadata(&self, h: &Hash) -> Result<PMMRFileMetadataCollection, Error>{
option_to_not_found(
self.db
.get_ser(&to_key(BLOCK_PMMR_FILE_METADATA_PREFIX, &mut h.to_vec())),
)
}
fn delete_block_pmmr_file_metadata(&self, h: &Hash) -> Result<(), Error> {
self.db.delete(&to_key(BLOCK_PMMR_FILE_METADATA_PREFIX, &mut h.to_vec())[..])
}
/// Maintain consistency of the "header_by_height" index by traversing back
/// through the current chain and updating "header_by_height" until we reach
/// a block_header that is consistent with its height (everything prior to

View file

@ -33,8 +33,8 @@ use core::core::hash::{Hash, Hashed};
use core::ser::{self, PMMRable};
use grin_store;
use grin_store::pmmr::PMMRBackend;
use types::{ChainStore, SumTreeRoots, Error};
use grin_store::pmmr::{PMMRBackend, PMMRFileMetadata};
use types::{ChainStore, SumTreeRoots, PMMRFileMetadataCollection, Error};
use util::{LOGGER, zip};
const SUMTREES_SUBDIR: &'static str = "sumtrees";
@ -55,16 +55,21 @@ impl<T> PMMRHandle<T>
where
T: PMMRable,
{
fn new(root_dir: String, file_name: &str) -> Result<PMMRHandle<T>, Error> {
fn new(root_dir: String, file_name: &str, index_md: Option<PMMRFileMetadata>) -> Result<PMMRHandle<T>, Error> {
let path = Path::new(&root_dir).join(SUMTREES_SUBDIR).join(file_name);
fs::create_dir_all(path.clone())?;
let be = PMMRBackend::new(path.to_str().unwrap().to_string())?;
let be = PMMRBackend::new(path.to_str().unwrap().to_string(), index_md)?;
let sz = be.unpruned_size()?;
Ok(PMMRHandle {
backend: be,
last_pos: sz,
})
}
/// Return last written positions of hash file and data file
pub fn last_file_positions(&self) -> PMMRFileMetadata {
self.backend.last_file_positions()
}
}
/// An easy to manipulate structure holding the 3 sum trees necessary to
@ -88,7 +93,10 @@ pub struct SumTrees {
impl SumTrees {
/// Open an existing or new set of backends for the SumTrees
pub fn open(root_dir: String, commit_index: Arc<ChainStore>) -> Result<SumTrees, Error> {
pub fn open(root_dir: String,
commit_index: Arc<ChainStore>,
last_file_positions: Option<PMMRFileMetadataCollection>
) -> Result<SumTrees, Error> {
let utxo_file_path: PathBuf = [&root_dir, SUMTREES_SUBDIR, UTXO_SUBDIR].iter().collect();
fs::create_dir_all(utxo_file_path.clone())?;
@ -99,10 +107,20 @@ impl SumTrees {
let kernel_file_path: PathBuf = [&root_dir, SUMTREES_SUBDIR, KERNEL_SUBDIR].iter().collect();
fs::create_dir_all(kernel_file_path.clone())?;
let mut utxo_md = None;
let mut rproof_md = None;
let mut kernel_md = None;
if let Some(p) = last_file_positions {
utxo_md = Some(p.utxo_file_md);
rproof_md = Some(p.rproof_file_md);
kernel_md = Some(p.kernel_file_md);
}
Ok(SumTrees {
utxo_pmmr_h: PMMRHandle::new(root_dir.clone(), UTXO_SUBDIR)?,
rproof_pmmr_h: PMMRHandle::new(root_dir.clone(), RANGE_PROOF_SUBDIR)?,
kernel_pmmr_h: PMMRHandle::new(root_dir.clone(), KERNEL_SUBDIR)?,
utxo_pmmr_h: PMMRHandle::new(root_dir.clone(), UTXO_SUBDIR, utxo_md)?,
rproof_pmmr_h: PMMRHandle::new(root_dir.clone(), RANGE_PROOF_SUBDIR, rproof_md)?,
kernel_pmmr_h: PMMRHandle::new(root_dir.clone(), KERNEL_SUBDIR, kernel_md)?,
commit_index: commit_index,
})
}
@ -157,7 +175,15 @@ impl SumTrees {
indexes_at(block, self.commit_index.deref())
}
/// Last file positions of UTXO set.. hash file,data file
pub fn last_file_metadata(&self) -> PMMRFileMetadataCollection {
PMMRFileMetadataCollection::new(
self.utxo_pmmr_h.last_file_positions(),
self.rproof_pmmr_h.last_file_positions(),
self.kernel_pmmr_h.last_file_positions()
)
}
/// Get sum tree roots
/// TODO: Return data instead of hashes
pub fn roots(
@ -451,8 +477,7 @@ impl<'a> Extension<'a> {
/// Rewinds the MMRs to the provided positions, given the output and
/// kernel we want to rewind to.
pub fn rewind_pos(&mut self, height: u64, out_pos_rew: u64, kern_pos_rew: u64) -> Result<(), Error> {
debug!(
LOGGER,
debug!(LOGGER,
"Rewind sumtrees to output pos: {}, kernel pos: {}",
out_pos_rew,
kern_pos_rew,
@ -487,7 +512,6 @@ impl<'a> Extension<'a> {
}
}
/// Current root hashes and sums (if applicable) for the UTXO, range proof
/// and kernel sum trees.
pub fn roots(

View file

@ -23,8 +23,9 @@ use grin_store as store;
use core::core::{Block, BlockHeader, block, transaction};
use core::core::hash::{Hash, Hashed};
use core::core::target::Difficulty;
use core::ser;
use core::ser::{self, Readable, Writeable, Reader, Writer};
use grin_store;
use grin_store::pmmr::PMMRFileMetadata;
bitflags! {
/// Options for block validation
@ -290,12 +291,73 @@ pub trait ChainStore: Send + Sync {
/// UTXO MMR. Used as an index for spending and pruning.
fn get_kernel_pos(&self, commit: &Commitment) -> Result<u64, store::Error>;
/// Saves information about the last written PMMR file positions for each committed block
fn save_block_pmmr_file_metadata(&self, h: &Hash, md: &PMMRFileMetadataCollection) -> Result<(), store::Error>;
/// Retrieves stored pmmr file metadata information for a given block
fn get_block_pmmr_file_metadata(&self, h: &Hash) -> Result<PMMRFileMetadataCollection, store::Error>;
/// Delete stored pmmr file metadata information for a given block
fn delete_block_pmmr_file_metadata(&self, h: &Hash) -> Result<(), store::Error>;
/// Saves the provided block header at the corresponding height. Also check
/// the consistency of the height chain in store by assuring previous
/// headers are also at their respective heights.
fn setup_height(&self, bh: &BlockHeader, old_tip: &Tip) -> Result<(), store::Error>;
}
/// Single serializable struct to hold metadata about all PMMR file position for a given block
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub struct PMMRFileMetadataCollection {
/// file metadata for the utxo file
pub utxo_file_md: PMMRFileMetadata,
/// file metadata for the rangeproof file
pub rproof_file_md: PMMRFileMetadata,
/// file metadata for the kernel file
pub kernel_file_md: PMMRFileMetadata
}
impl Writeable for PMMRFileMetadataCollection {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
self.utxo_file_md.write(writer)?;
self.rproof_file_md.write(writer)?;
self.kernel_file_md.write(writer)?;
Ok(())
}
}
impl Readable for PMMRFileMetadataCollection {
fn read(reader: &mut Reader) -> Result<PMMRFileMetadataCollection, ser::Error> {
Ok(PMMRFileMetadataCollection {
utxo_file_md : PMMRFileMetadata::read(reader)?,
rproof_file_md : PMMRFileMetadata::read(reader)?,
kernel_file_md : PMMRFileMetadata::read(reader)?,
})
}
}
impl PMMRFileMetadataCollection {
/// Return empty with all file positions = 0
pub fn empty() -> PMMRFileMetadataCollection {
PMMRFileMetadataCollection {
utxo_file_md: PMMRFileMetadata::empty(),
rproof_file_md: PMMRFileMetadata::empty(),
kernel_file_md: PMMRFileMetadata::empty(),
}
}
/// Helper to create a new collection
pub fn new(utxo_md: PMMRFileMetadata,
rproof_md: PMMRFileMetadata,
kernel_md: PMMRFileMetadata) -> PMMRFileMetadataCollection {
PMMRFileMetadataCollection {
utxo_file_md : utxo_md,
rproof_file_md: rproof_md,
kernel_file_md: kernel_md,
}
}
}
/// Bridge between the chain pipeline and the rest of the system. Handles
/// downstream processing of valid blocks by the rest of the system, most
/// importantly the broadcasting of blocks to our peers.

View file

@ -0,0 +1,168 @@
// Copyright 2017 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
extern crate env_logger;
extern crate grin_chain as chain;
extern crate grin_core as core;
extern crate grin_keychain as keychain;
extern crate grin_pow as pow;
extern crate grin_util as util;
extern crate rand;
extern crate time;
use std::fs;
use std::sync::Arc;
use chain::Chain;
use chain::types::*;
use core::core::{Block, BlockHeader, Transaction};
use core::core::hash::Hashed;
use core::core::target::Difficulty;
use core::{consensus, genesis};
use core::global;
use core::global::ChainTypes;
use keychain::Keychain;
use pow::{cuckoo, types, MiningWorker};
fn clean_output_dir(dir_name: &str) {
let _ = fs::remove_dir_all(dir_name);
}
fn setup(dir_name: &str) -> Chain {
util::init_test_logger();
clean_output_dir(dir_name);
global::set_mining_mode(ChainTypes::AutomatedTesting);
let genesis_block = pow::mine_genesis_block(None).unwrap();
chain::Chain::init(
dir_name.to_string(),
Arc::new(NoopAdapter {}),
genesis_block,
pow::verify_size,
).unwrap()
}
fn reload_chain(dir_name: &str) -> Chain {
chain::Chain::init(
dir_name.to_string(),
Arc::new(NoopAdapter {}),
genesis::genesis_dev(),
pow::verify_size,
).unwrap()
}
#[test]
fn data_files() {
let chain_dir = ".grin_df";
//new block so chain references should be freed
{
let chain = setup(chain_dir);
let keychain = Keychain::from_random_seed().unwrap();
// mine and add a few blocks
let mut miner_config = types::MinerConfig {
enable_mining: true,
burn_reward: true,
..Default::default()
};
miner_config.cuckoo_miner_plugin_dir = Some(String::from("../target/debug/deps"));
let mut cuckoo_miner = cuckoo::Miner::new(
consensus::EASINESS,
global::sizeshift() as u32,
global::proofsize(),
);
for n in 1..4 {
let prev = chain.head_header().unwrap();
let difficulty = consensus::next_difficulty(chain.difficulty_iter()).unwrap();
let pk = keychain.derive_key_id(n as u32).unwrap();
let mut b = core::core::Block::new(
&prev,
vec![],
&keychain,
&pk,
difficulty.clone(),
).unwrap();
b.header.timestamp = prev.timestamp + time::Duration::seconds(60);
b.header.difficulty = difficulty.clone(); // TODO: overwrite here? really?
chain.set_sumtree_roots(&mut b, false).unwrap();
pow::pow_size(
&mut cuckoo_miner,
&mut b.header,
difficulty,
global::sizeshift() as u32,
).unwrap();
let prev_bhash = b.header.previous;
let bhash = b.hash();
chain.process_block(b.clone(), chain::Options::MINE).unwrap();
let head = Tip::from_block(&b.header);
// Check we have indexes for the last block and the block previous
let cur_pmmr_md = chain.get_block_pmmr_file_metadata(&head.last_block_h)
.expect("block pmmr file data doesn't exist");
let pref_pmmr_md = chain.get_block_pmmr_file_metadata(&head.prev_block_h)
.expect("previous block pmmr file data doesn't exist");
println!("Cur_pmmr_md: {:?}", cur_pmmr_md);
chain.validate().unwrap();
}
}
// Now reload the chain, should have valid indices
{
let chain = reload_chain(chain_dir);
chain.validate().unwrap();
}
}
fn prepare_block(kc: &Keychain, prev: &BlockHeader, chain: &Chain, diff: u64) -> Block {
let mut b = prepare_block_nosum(kc, prev, diff, vec![]);
chain.set_sumtree_roots(&mut b, false).unwrap();
b
}
fn prepare_block_tx(kc: &Keychain, prev: &BlockHeader, chain: &Chain, diff: u64, txs: Vec<&Transaction>) -> Block {
let mut b = prepare_block_nosum(kc, prev, diff, txs);
chain.set_sumtree_roots(&mut b, false).unwrap();
b
}
fn prepare_fork_block(kc: &Keychain, prev: &BlockHeader, chain: &Chain, diff: u64) -> Block {
let mut b = prepare_block_nosum(kc, prev, diff, vec![]);
chain.set_sumtree_roots(&mut b, true).unwrap();
b
}
fn prepare_fork_block_tx(kc: &Keychain, prev: &BlockHeader, chain: &Chain, diff: u64, txs: Vec<&Transaction>) -> Block {
let mut b = prepare_block_nosum(kc, prev, diff, txs);
chain.set_sumtree_roots(&mut b, true).unwrap();
b
}
fn prepare_block_nosum(kc: &Keychain, prev: &BlockHeader, diff: u64, txs: Vec<&Transaction>) -> Block {
let key_id = kc.derive_key_id(diff as u32).unwrap();
let mut b = match core::core::Block::new(prev, txs, kc, &key_id, Difficulty::from_num(diff)) {
Err(e) => panic!("{:?}",e),
Ok(b) => b
};
b.header.timestamp = prev.timestamp + time::Duration::seconds(60);
b.header.total_difficulty = Difficulty::from_num(diff);
b
}

View file

@ -58,7 +58,8 @@ fn test_various_store_indices() {
let block_hash = block.hash();
chain_store.save_block(&block).unwrap();
chain_store.setup_height(&block.header, &Tip::from_block(&block.header)).unwrap();
chain_store.setup_height(&block.header,
&Tip::from_block(&block.header)).unwrap();
let block_header = chain_store.get_block_header(&block_hash).unwrap();
assert_eq!(block_header.hash(), block_hash);

View file

@ -7,6 +7,8 @@ workspace = ".."
[dependencies]
byteorder = "^1.0"
env_logger="^0.3.5"
serde = "~1.0.8"
serde_derive = "~1.0.8"
slog = { version = "^2.0.12", features = ["max_level_trace", "release_max_level_trace"] }
libc = "^0.2"
memmap = { git = "https://github.com/danburkert/memmap-rs", tag="0.6.0" }

View file

@ -28,9 +28,13 @@ extern crate grin_util as util;
extern crate libc;
extern crate memmap;
extern crate rocksdb;
extern crate serde;
#[macro_use]
extern crate serde_derive;
#[macro_use]
extern crate slog;
pub mod pmmr;
pub mod types;

View file

@ -18,7 +18,7 @@ use std::io::{self};
use std::marker::PhantomData;
use core::core::pmmr::{self, Backend};
use core::ser::{self, PMMRable};
use core::ser::{self, PMMRable, Readable, Writeable, Reader, Writer};
use core::core::hash::Hash;
use util::LOGGER;
use types::{AppendOnlyFile, RemoveLog, read_ordered_vec, write_vec};
@ -31,6 +31,42 @@ const PMMR_PRUNED_FILE: &'static str = "pmmr_pruned.bin";
/// Maximum number of nodes in the remove log before it gets flushed
pub const RM_LOG_MAX_NODES: usize = 10000;
/// Metadata for the PMMR backend's AppendOnlyFile, which can be serialized and stored
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub struct PMMRFileMetadata {
/// last written index of the hash file
pub last_hash_file_pos: u64,
/// last written index of the data file
pub last_data_file_pos: u64,
}
impl Writeable for PMMRFileMetadata {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
writer.write_u64(self.last_hash_file_pos)?;
writer.write_u64(self.last_data_file_pos)?;
Ok(())
}
}
impl Readable for PMMRFileMetadata {
fn read(reader: &mut Reader) -> Result<PMMRFileMetadata, ser::Error> {
Ok(PMMRFileMetadata {
last_hash_file_pos: reader.read_u64()?,
last_data_file_pos: reader.read_u64()?,
})
}
}
impl PMMRFileMetadata {
/// Return fields with all positions = 0
pub fn empty() -> PMMRFileMetadata {
PMMRFileMetadata {
last_hash_file_pos: 0,
last_data_file_pos: 0,
}
}
}
/// PMMR persistent backend implementation. Relies on multiple facilities to
/// handle writing, reading and pruning.
///
@ -177,11 +213,15 @@ where
{
/// Instantiates a new PMMR backend that will use the provided directly to
/// store its files.
pub fn new(data_dir: String) -> io::Result<PMMRBackend<T>> {
let hash_file = AppendOnlyFile::open(format!("{}/{}", data_dir, PMMR_HASH_FILE))?;
pub fn new(data_dir: String, file_md: Option<PMMRFileMetadata>) -> io::Result<PMMRBackend<T>> {
let (hash_to_pos, data_to_pos) = match file_md {
Some(m) => (m.last_hash_file_pos, m.last_data_file_pos),
None => (0,0)
};
let hash_file = AppendOnlyFile::open(format!("{}/{}", data_dir, PMMR_HASH_FILE), hash_to_pos)?;
let rm_log = RemoveLog::open(format!("{}/{}", data_dir, PMMR_RM_LOG_FILE))?;
let prune_list = read_ordered_vec(format!("{}/{}", data_dir, PMMR_PRUNED_FILE), 8)?;
let data_file = AppendOnlyFile::open(format!("{}/{}", data_dir, PMMR_DATA_FILE))?;
let data_file = AppendOnlyFile::open(format!("{}/{}", data_dir, PMMR_DATA_FILE), data_to_pos)?;
Ok(PMMRBackend {
data_dir: data_dir,
@ -248,6 +288,14 @@ where
self.get_data_file_path()
}
/// Return last written buffer positions for the hash file and the data file
pub fn last_file_positions(&self) -> PMMRFileMetadata {
PMMRFileMetadata {
last_hash_file_pos: self.hash_file.last_buffer_pos() as u64,
last_data_file_pos: self.data_file.last_buffer_pos() as u64
}
}
/// Checks the length of the remove log to see if it should get compacted.
/// If so, the remove log is flushed into the pruned list, which itself gets
/// saved, and the main hashsum data file is rewritten, cutting the removed
@ -276,7 +324,6 @@ where
// avoid accidental double compaction)
for pos in &self.rm_log.removed[..] {
if let None = self.pruned_nodes.pruned_pos(pos.0) {
println!("ALREADY PRUNED?");
// TODO we likely can recover from this by directly jumping to 3
error!(
LOGGER,
@ -333,14 +380,14 @@ where
tmp_prune_file_hash.clone(),
format!("{}/{}", self.data_dir, PMMR_HASH_FILE),
)?;
self.hash_file = AppendOnlyFile::open(format!("{}/{}", self.data_dir, PMMR_HASH_FILE))?;
self.hash_file = AppendOnlyFile::open(format!("{}/{}", self.data_dir, PMMR_HASH_FILE), 0)?;
// 5. and the same with the data file
fs::rename(
tmp_prune_file_data.clone(),
format!("{}/{}", self.data_dir, PMMR_DATA_FILE),
)?;
self.data_file = AppendOnlyFile::open(format!("{}/{}", self.data_dir, PMMR_DATA_FILE))?;
self.data_file = AppendOnlyFile::open(format!("{}/{}", self.data_dir, PMMR_DATA_FILE), 0)?;
// 6. truncate the rm log
self.rm_log.removed = self.rm_log.removed

View file

@ -42,12 +42,13 @@ pub struct AppendOnlyFile {
mmap: Option<memmap::Mmap>,
buffer_start: usize,
buffer: Vec<u8>,
buffer_start_bak: usize,
buffer_start_bak: usize
}
impl AppendOnlyFile {
/// Open a file (existing or not) as append-only, backed by a mmap.
pub fn open(path: String) -> io::Result<AppendOnlyFile> {
/// Open a file (existing or not) as append-only, backed by a mmap. Sets
/// the last written pos to to_pos if > 0, otherwise the end of the file
pub fn open(path: String, to_pos: u64) -> io::Result<AppendOnlyFile> {
let file = OpenOptions::new()
.read(true)
.append(true)
@ -62,8 +63,12 @@ impl AppendOnlyFile {
buffer_start_bak: 0,
};
if let Ok(sz) = aof.size() {
if sz > 0 {
aof.buffer_start = sz as usize;
let mut buf_start = sz;
if to_pos > 0 && to_pos <= buf_start {
buf_start = to_pos;
}
if buf_start > 0 {
aof.buffer_start = buf_start as usize;
aof.mmap = Some(unsafe { memmap::Mmap::map(&aof.file)? });
}
}
@ -96,12 +101,18 @@ impl AppendOnlyFile {
}
self.buffer_start += self.buffer.len();
self.file.write(&self.buffer[..])?;
self.file.sync_data()?;
self.file.sync_all()?;
self.buffer = vec![];
self.mmap = Some(unsafe { memmap::Mmap::map(&self.file)? });
Ok(())
}
/// Returns the last position (in bytes), taking into account whether data
/// has been rewound
pub fn last_buffer_pos(&self) -> usize {
self.buffer_start
}
/// Discard the current non-flushed data.
pub fn discard(&mut self) {
if self.buffer_start_bak > 0 {

View file

@ -26,7 +26,7 @@ use core::core::hash::{Hash, Hashed};
#[test]
fn pmmr_append() {
let (data_dir, elems) = setup("append");
let mut backend = store::pmmr::PMMRBackend::new(data_dir.to_string()).unwrap();
let mut backend = store::pmmr::PMMRBackend::new(data_dir.to_string(), None).unwrap();
// adding first set of 4 elements and sync
let mut mmr_size = load(0, &elems[0..4], &mut backend);
@ -64,7 +64,7 @@ fn pmmr_prune_compact() {
let (data_dir, elems) = setup("prune_compact");
// setup the mmr store with all elements
let mut backend = store::pmmr::PMMRBackend::new(data_dir.to_string()).unwrap();
let mut backend = store::pmmr::PMMRBackend::new(data_dir.to_string(), None).unwrap();
let mmr_size = load(0, &elems[..], &mut backend);
backend.sync().unwrap();
@ -114,7 +114,7 @@ fn pmmr_reload() {
let mmr_size: u64;
let root: Hash;
{
let mut backend = store::pmmr::PMMRBackend::new(data_dir.to_string()).unwrap();
let mut backend = store::pmmr::PMMRBackend::new(data_dir.to_string(), None).unwrap();
mmr_size = load(0, &elems[..], &mut backend);
backend.sync().unwrap();
@ -144,7 +144,7 @@ fn pmmr_reload() {
// create a new backend and check everything is kosher
{
let mut backend:store::pmmr::PMMRBackend<TestElem> =
store::pmmr::PMMRBackend::new(data_dir.to_string()).unwrap();
store::pmmr::PMMRBackend::new(data_dir.to_string(), None).unwrap();
assert_eq!(backend.unpruned_size().unwrap(), mmr_size);
{
let pmmr:PMMR<TestElem, _> = PMMR::at(&mut backend, mmr_size);
@ -159,7 +159,7 @@ fn pmmr_reload() {
#[test]
fn pmmr_rewind() {
let (data_dir, elems) = setup("rewind");
let mut backend = store::pmmr::PMMRBackend::new(data_dir.clone()).unwrap();
let mut backend = store::pmmr::PMMRBackend::new(data_dir.clone(), None).unwrap();
// adding elements and keeping the corresponding root
let mut mmr_size = load(0, &elems[0..4], &mut backend);
@ -223,7 +223,7 @@ fn pmmr_compact_horizon() {
let root: Hash;
{
// setup the mmr store with all elements
let mut backend = store::pmmr::PMMRBackend::new(data_dir.to_string()).unwrap();
let mut backend = store::pmmr::PMMRBackend::new(data_dir.to_string(), None).unwrap();
let mmr_size = load(0, &elems[..], &mut backend);
backend.sync().unwrap();
@ -249,7 +249,7 @@ fn pmmr_compact_horizon() {
// recheck stored data
{
// recreate backend
let mut backend = store::pmmr::PMMRBackend::<TestElem>::new(data_dir.to_string()).unwrap();
let mut backend = store::pmmr::PMMRBackend::<TestElem>::new(data_dir.to_string(), None).unwrap();
// 9 elements total, minus 2 compacted
assert_eq!(backend.data_size().unwrap(), 7);
// 15 nodes total, 2 pruned and compacted
@ -262,7 +262,7 @@ fn pmmr_compact_horizon() {
// recheck stored data
{
// recreate backend
let backend = store::pmmr::PMMRBackend::<TestElem>::new(data_dir.to_string()).unwrap();
let backend = store::pmmr::PMMRBackend::<TestElem>::new(data_dir.to_string(), None).unwrap();
// 9 elements total, minus 4 compacted
assert_eq!(backend.data_size().unwrap(), 5);
// 15 nodes total, 6 pruned and compacted