diff --git a/chain/src/chain.rs b/chain/src/chain.rs index c9c75b0c3..c0a393b79 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -151,11 +151,22 @@ impl Chain { let chain_store = store::ChainKVStore::new(db_root.clone())?; let store = Arc::new(chain_store); - let mut sumtrees = sumtree::SumTrees::open(db_root.clone(), store.clone())?; // check if we have a head in store, otherwise the genesis block is it - let head = match store.head() { - Ok(tip) => tip, + let head = store.head(); + let sumtree_md = match head { + Ok(h) => { + Some(store.get_block_pmmr_file_metadata(&h.last_block_h)?) + }, + Err(NotFoundErr) => None, + Err(e) => return Err(Error::StoreErr(e, "chain init load head".to_owned())), + }; + + let mut sumtrees = sumtree::SumTrees::open(db_root.clone(), store.clone(), sumtree_md)?; + + let head = store.head(); + let head = match head { + Ok(h) => h, Err(NotFoundErr) => { let tip = Tip::new(genesis.hash()); store.save_block(&genesis)?; @@ -175,6 +186,7 @@ impl Chain { genesis.header.nonce, genesis.header.pow, ); + pipe::save_pmmr_metadata(&tip, &sumtrees, store.clone())?; tip } Err(e) => return Err(Error::StoreErr(e, "chain init load head".to_owned())), @@ -507,7 +519,7 @@ impl Chain { let header = self.store.get_block_header(&h)?; sumtree::zip_write(self.db_root.clone(), sumtree_data)?; - let mut sumtrees = sumtree::SumTrees::open(self.db_root.clone(), self.store.clone())?; + let mut sumtrees = sumtree::SumTrees::open(self.db_root.clone(), self.store.clone(), None)?; sumtree::extending(&mut sumtrees, |extension| { extension.rewind_pos(header.height, rewind_to_output, rewind_to_kernel)?; extension.validate(&header)?; @@ -636,8 +648,13 @@ impl Chain { /// Check whether we have a block without reading it pub fn block_exists(&self, h: Hash) -> Result<bool, Error> { - self.store - .block_exists(&h) + self.store.block_exists(&h) .map_err(|e| Error::StoreErr(e, "chain block exists".to_owned())) } + + /// Retrieve the file index metadata for a given block + pub fn get_block_pmmr_file_metadata(&self, h: &Hash) -> Result<PMMRFileMetadataCollection, Error> { + self.store.get_block_pmmr_file_metadata(h) + .map_err(|e| Error::StoreErr(e, "retrieve block pmmr metadata".to_owned())) + } } diff --git a/chain/src/pipe.rs b/chain/src/pipe.rs index 58c3cc5c5..d0c7b047e 100644 --- a/chain/src/pipe.rs +++ b/chain/src/pipe.rs @@ -93,7 +93,7 @@ pub fn process_block(b: &Block, mut ctx: BlockContext) -> Result<Option<Tip>, Er // start a chain extension unit of work dependent on the success of the // internal validation and saving operations - sumtree::extending(&mut sumtrees, |mut extension| { + let result = sumtree::extending(&mut sumtrees, |mut extension| { validate_block(b, &mut ctx, &mut extension)?; debug!( LOGGER, @@ -108,7 +108,25 @@ pub fn process_block(b: &Block, mut ctx: BlockContext) -> Result<Option<Tip>, Er extension.force_rollback(); } Ok(h) - }) + }); + + match result { + Ok(t) => { + save_pmmr_metadata(&Tip::from_block(&b.header), &sumtrees, ctx.store.clone())?; + Ok(t) + }, + Err(e) => Err(e), + } +} + +/// Save pmmr index location for a given block +pub fn save_pmmr_metadata(t: &Tip, sumtrees: &sumtree::SumTrees, store: Arc<ChainStore>) -> Result<(), Error> { + // Save pmmr file metadata for this block + let block_file_md = sumtrees.last_file_metadata(); + store + .save_block_pmmr_file_metadata(&t.last_block_h, &block_file_md) + .map_err(|e| Error::StoreErr(e, "saving pmmr file metadata".to_owned()))?; + Ok(()) } /// Process the block header. diff --git a/chain/src/store.rs b/chain/src/store.rs index 9cb76a51b..e0eff72fd 100644 --- a/chain/src/store.rs +++ b/chain/src/store.rs @@ -35,6 +35,7 @@ const SYNC_HEAD_PREFIX: u8 = 's' as u8; const HEADER_HEIGHT_PREFIX: u8 = '8' as u8; const COMMIT_POS_PREFIX: u8 = 'c' as u8; const KERNEL_POS_PREFIX: u8 = 'k' as u8; +const BLOCK_PMMR_FILE_METADATA_PREFIX: u8 = 'p' as u8; /// An implementation of the ChainStore trait backed by a simple key-value /// store. @@ -186,6 +187,24 @@ impl ChainStore for ChainKVStore { ) } + fn save_block_pmmr_file_metadata(&self, h:&Hash, md: &PMMRFileMetadataCollection) -> Result<(), Error> { + self.db.put_ser( + &to_key(BLOCK_PMMR_FILE_METADATA_PREFIX, &mut h.to_vec())[..], + &md, + ) + } + + fn get_block_pmmr_file_metadata(&self, h: &Hash) -> Result<PMMRFileMetadataCollection, Error>{ + option_to_not_found( + self.db + .get_ser(&to_key(BLOCK_PMMR_FILE_METADATA_PREFIX, &mut h.to_vec())), + ) + } + + fn delete_block_pmmr_file_metadata(&self, h: &Hash) -> Result<(), Error> { + self.db.delete(&to_key(BLOCK_PMMR_FILE_METADATA_PREFIX, &mut h.to_vec())[..]) + } + /// Maintain consistency of the "header_by_height" index by traversing back /// through the current chain and updating "header_by_height" until we reach /// a block_header that is consistent with its height (everything prior to diff --git a/chain/src/sumtree.rs b/chain/src/sumtree.rs index 39b52665a..bc50b01b3 100644 --- a/chain/src/sumtree.rs +++ b/chain/src/sumtree.rs @@ -33,8 +33,8 @@ use core::core::hash::{Hash, Hashed}; use core::ser::{self, PMMRable}; use grin_store; -use grin_store::pmmr::PMMRBackend; -use types::{ChainStore, SumTreeRoots, Error}; +use grin_store::pmmr::{PMMRBackend, PMMRFileMetadata}; +use types::{ChainStore, SumTreeRoots, PMMRFileMetadataCollection, Error}; use util::{LOGGER, zip}; const SUMTREES_SUBDIR: &'static str = "sumtrees"; @@ -55,16 +55,21 @@ impl<T> PMMRHandle<T> where T: PMMRable, { - fn new(root_dir: String, file_name: &str) -> Result<PMMRHandle<T>, Error> { + fn new(root_dir: String, file_name: &str, index_md: Option<PMMRFileMetadata>) -> Result<PMMRHandle<T>, Error> { let path = Path::new(&root_dir).join(SUMTREES_SUBDIR).join(file_name); fs::create_dir_all(path.clone())?; - let be = PMMRBackend::new(path.to_str().unwrap().to_string())?; + let be = PMMRBackend::new(path.to_str().unwrap().to_string(), index_md)?; let sz = be.unpruned_size()?; Ok(PMMRHandle { backend: be, last_pos: sz, }) } + + /// Return last written positions of hash file and data file + pub fn last_file_positions(&self) -> PMMRFileMetadata { + self.backend.last_file_positions() + } } /// An easy to manipulate structure holding the 3 sum trees necessary to @@ -88,7 +93,10 @@ pub struct SumTrees { impl SumTrees { /// Open an existing or new set of backends for the SumTrees - pub fn open(root_dir: String, commit_index: Arc<ChainStore>) -> Result<SumTrees, Error> { + pub fn open(root_dir: String, + commit_index: Arc<ChainStore>, + last_file_positions: Option<PMMRFileMetadataCollection> + ) -> Result<SumTrees, Error> { let utxo_file_path: PathBuf = [&root_dir, SUMTREES_SUBDIR, UTXO_SUBDIR].iter().collect(); fs::create_dir_all(utxo_file_path.clone())?; @@ -99,10 +107,20 @@ impl SumTrees { let kernel_file_path: PathBuf = [&root_dir, SUMTREES_SUBDIR, KERNEL_SUBDIR].iter().collect(); fs::create_dir_all(kernel_file_path.clone())?; + let mut utxo_md = None; + let mut rproof_md = None; + let mut kernel_md = None; + + if let Some(p) = last_file_positions { + utxo_md = Some(p.utxo_file_md); + rproof_md = Some(p.rproof_file_md); + kernel_md = Some(p.kernel_file_md); + } + Ok(SumTrees { - utxo_pmmr_h: PMMRHandle::new(root_dir.clone(), UTXO_SUBDIR)?, - rproof_pmmr_h: PMMRHandle::new(root_dir.clone(), RANGE_PROOF_SUBDIR)?, - kernel_pmmr_h: PMMRHandle::new(root_dir.clone(), KERNEL_SUBDIR)?, + utxo_pmmr_h: PMMRHandle::new(root_dir.clone(), UTXO_SUBDIR, utxo_md)?, + rproof_pmmr_h: PMMRHandle::new(root_dir.clone(), RANGE_PROOF_SUBDIR, rproof_md)?, + kernel_pmmr_h: PMMRHandle::new(root_dir.clone(), KERNEL_SUBDIR, kernel_md)?, commit_index: commit_index, }) } @@ -157,7 +175,15 @@ impl SumTrees { indexes_at(block, self.commit_index.deref()) } - + /// Last file positions of UTXO set.. hash file,data file + pub fn last_file_metadata(&self) -> PMMRFileMetadataCollection { + PMMRFileMetadataCollection::new( + self.utxo_pmmr_h.last_file_positions(), + self.rproof_pmmr_h.last_file_positions(), + self.kernel_pmmr_h.last_file_positions() + ) + } + /// Get sum tree roots /// TODO: Return data instead of hashes pub fn roots( @@ -451,8 +477,7 @@ impl<'a> Extension<'a> { /// Rewinds the MMRs to the provided positions, given the output and /// kernel we want to rewind to. pub fn rewind_pos(&mut self, height: u64, out_pos_rew: u64, kern_pos_rew: u64) -> Result<(), Error> { - debug!( - LOGGER, + debug!(LOGGER, "Rewind sumtrees to output pos: {}, kernel pos: {}", out_pos_rew, kern_pos_rew, @@ -487,7 +512,6 @@ impl<'a> Extension<'a> { } } - /// Current root hashes and sums (if applicable) for the UTXO, range proof /// and kernel sum trees. pub fn roots( diff --git a/chain/src/types.rs b/chain/src/types.rs index 464fb35dc..ea77be74e 100644 --- a/chain/src/types.rs +++ b/chain/src/types.rs @@ -23,8 +23,9 @@ use grin_store as store; use core::core::{Block, BlockHeader, block, transaction}; use core::core::hash::{Hash, Hashed}; use core::core::target::Difficulty; -use core::ser; +use core::ser::{self, Readable, Writeable, Reader, Writer}; use grin_store; +use grin_store::pmmr::PMMRFileMetadata; bitflags! { /// Options for block validation @@ -290,12 +291,73 @@ pub trait ChainStore: Send + Sync { /// UTXO MMR. Used as an index for spending and pruning. fn get_kernel_pos(&self, commit: &Commitment) -> Result<u64, store::Error>; + /// Saves information about the last written PMMR file positions for each committed block + fn save_block_pmmr_file_metadata(&self, h: &Hash, md: &PMMRFileMetadataCollection) -> Result<(), store::Error>; + + /// Retrieves stored pmmr file metadata information for a given block + fn get_block_pmmr_file_metadata(&self, h: &Hash) -> Result<PMMRFileMetadataCollection, store::Error>; + + /// Delete stored pmmr file metadata information for a given block + fn delete_block_pmmr_file_metadata(&self, h: &Hash) -> Result<(), store::Error>; + /// Saves the provided block header at the corresponding height. Also check /// the consistency of the height chain in store by assuring previous /// headers are also at their respective heights. fn setup_height(&self, bh: &BlockHeader, old_tip: &Tip) -> Result<(), store::Error>; } +/// Single serializable struct to hold metadata about all PMMR file position for a given block +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub struct PMMRFileMetadataCollection { + /// file metadata for the utxo file + pub utxo_file_md: PMMRFileMetadata, + /// file metadata for the rangeproof file + pub rproof_file_md: PMMRFileMetadata, + /// file metadata for the kernel file + pub kernel_file_md: PMMRFileMetadata +} + +impl Writeable for PMMRFileMetadataCollection { + fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> { + self.utxo_file_md.write(writer)?; + self.rproof_file_md.write(writer)?; + self.kernel_file_md.write(writer)?; + Ok(()) + } +} + +impl Readable for PMMRFileMetadataCollection { + fn read(reader: &mut Reader) -> Result<PMMRFileMetadataCollection, ser::Error> { + Ok(PMMRFileMetadataCollection { + utxo_file_md : PMMRFileMetadata::read(reader)?, + rproof_file_md : PMMRFileMetadata::read(reader)?, + kernel_file_md : PMMRFileMetadata::read(reader)?, + }) + } +} + +impl PMMRFileMetadataCollection { + /// Return empty with all file positions = 0 + pub fn empty() -> PMMRFileMetadataCollection { + PMMRFileMetadataCollection { + utxo_file_md: PMMRFileMetadata::empty(), + rproof_file_md: PMMRFileMetadata::empty(), + kernel_file_md: PMMRFileMetadata::empty(), + } + } + + /// Helper to create a new collection + pub fn new(utxo_md: PMMRFileMetadata, + rproof_md: PMMRFileMetadata, + kernel_md: PMMRFileMetadata) -> PMMRFileMetadataCollection { + PMMRFileMetadataCollection { + utxo_file_md : utxo_md, + rproof_file_md: rproof_md, + kernel_file_md: kernel_md, + } + } +} + /// Bridge between the chain pipeline and the rest of the system. Handles /// downstream processing of valid blocks by the rest of the system, most /// importantly the broadcasting of blocks to our peers. diff --git a/chain/tests/data_file_integrity.rs b/chain/tests/data_file_integrity.rs new file mode 100644 index 000000000..d0b15bb43 --- /dev/null +++ b/chain/tests/data_file_integrity.rs @@ -0,0 +1,168 @@ +// Copyright 2017 The Grin Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +extern crate env_logger; +extern crate grin_chain as chain; +extern crate grin_core as core; +extern crate grin_keychain as keychain; +extern crate grin_pow as pow; +extern crate grin_util as util; +extern crate rand; +extern crate time; + +use std::fs; +use std::sync::Arc; + +use chain::Chain; +use chain::types::*; +use core::core::{Block, BlockHeader, Transaction}; +use core::core::hash::Hashed; +use core::core::target::Difficulty; +use core::{consensus, genesis}; +use core::global; +use core::global::ChainTypes; + +use keychain::Keychain; + +use pow::{cuckoo, types, MiningWorker}; + +fn clean_output_dir(dir_name: &str) { + let _ = fs::remove_dir_all(dir_name); +} + +fn setup(dir_name: &str) -> Chain { + util::init_test_logger(); + clean_output_dir(dir_name); + global::set_mining_mode(ChainTypes::AutomatedTesting); + let genesis_block = pow::mine_genesis_block(None).unwrap(); + chain::Chain::init( + dir_name.to_string(), + Arc::new(NoopAdapter {}), + genesis_block, + pow::verify_size, + ).unwrap() +} + +fn reload_chain(dir_name: &str) -> Chain { + chain::Chain::init( + dir_name.to_string(), + Arc::new(NoopAdapter {}), + genesis::genesis_dev(), + pow::verify_size, + ).unwrap() +} + +#[test] +fn data_files() { + let chain_dir = ".grin_df"; + //new block so chain references should be freed + { + let chain = setup(chain_dir); + let keychain = Keychain::from_random_seed().unwrap(); + + // mine and add a few blocks + let mut miner_config = types::MinerConfig { + enable_mining: true, + burn_reward: true, + ..Default::default() + }; + miner_config.cuckoo_miner_plugin_dir = Some(String::from("../target/debug/deps")); + + let mut cuckoo_miner = cuckoo::Miner::new( + consensus::EASINESS, + global::sizeshift() as u32, + global::proofsize(), + ); + for n in 1..4 { + let prev = chain.head_header().unwrap(); + let difficulty = consensus::next_difficulty(chain.difficulty_iter()).unwrap(); + let pk = keychain.derive_key_id(n as u32).unwrap(); + let mut b = core::core::Block::new( + &prev, + vec![], + &keychain, + &pk, + difficulty.clone(), + ).unwrap(); + b.header.timestamp = prev.timestamp + time::Duration::seconds(60); + + b.header.difficulty = difficulty.clone(); // TODO: overwrite here? really? + chain.set_sumtree_roots(&mut b, false).unwrap(); + + pow::pow_size( + &mut cuckoo_miner, + &mut b.header, + difficulty, + global::sizeshift() as u32, + ).unwrap(); + + let prev_bhash = b.header.previous; + let bhash = b.hash(); + chain.process_block(b.clone(), chain::Options::MINE).unwrap(); + + let head = Tip::from_block(&b.header); + + // Check we have indexes for the last block and the block previous + let cur_pmmr_md = chain.get_block_pmmr_file_metadata(&head.last_block_h) + .expect("block pmmr file data doesn't exist"); + let pref_pmmr_md = chain.get_block_pmmr_file_metadata(&head.prev_block_h) + .expect("previous block pmmr file data doesn't exist"); + + + println!("Cur_pmmr_md: {:?}", cur_pmmr_md); + chain.validate().unwrap(); + } + } + // Now reload the chain, should have valid indices + { + let chain = reload_chain(chain_dir); + chain.validate().unwrap(); + } +} + +fn prepare_block(kc: &Keychain, prev: &BlockHeader, chain: &Chain, diff: u64) -> Block { + let mut b = prepare_block_nosum(kc, prev, diff, vec![]); + chain.set_sumtree_roots(&mut b, false).unwrap(); + b +} + +fn prepare_block_tx(kc: &Keychain, prev: &BlockHeader, chain: &Chain, diff: u64, txs: Vec<&Transaction>) -> Block { + let mut b = prepare_block_nosum(kc, prev, diff, txs); + chain.set_sumtree_roots(&mut b, false).unwrap(); + b +} + +fn prepare_fork_block(kc: &Keychain, prev: &BlockHeader, chain: &Chain, diff: u64) -> Block { + let mut b = prepare_block_nosum(kc, prev, diff, vec![]); + chain.set_sumtree_roots(&mut b, true).unwrap(); + b +} + +fn prepare_fork_block_tx(kc: &Keychain, prev: &BlockHeader, chain: &Chain, diff: u64, txs: Vec<&Transaction>) -> Block { + let mut b = prepare_block_nosum(kc, prev, diff, txs); + chain.set_sumtree_roots(&mut b, true).unwrap(); + b +} + +fn prepare_block_nosum(kc: &Keychain, prev: &BlockHeader, diff: u64, txs: Vec<&Transaction>) -> Block { + let key_id = kc.derive_key_id(diff as u32).unwrap(); + + let mut b = match core::core::Block::new(prev, txs, kc, &key_id, Difficulty::from_num(diff)) { + Err(e) => panic!("{:?}",e), + Ok(b) => b + }; + b.header.timestamp = prev.timestamp + time::Duration::seconds(60); + b.header.total_difficulty = Difficulty::from_num(diff); + b +} diff --git a/chain/tests/store_indices.rs b/chain/tests/store_indices.rs index 139a7f419..7d77575ae 100644 --- a/chain/tests/store_indices.rs +++ b/chain/tests/store_indices.rs @@ -58,7 +58,8 @@ fn test_various_store_indices() { let block_hash = block.hash(); chain_store.save_block(&block).unwrap(); - chain_store.setup_height(&block.header, &Tip::from_block(&block.header)).unwrap(); + chain_store.setup_height(&block.header, + &Tip::from_block(&block.header)).unwrap(); let block_header = chain_store.get_block_header(&block_hash).unwrap(); assert_eq!(block_header.hash(), block_hash); diff --git a/store/Cargo.toml b/store/Cargo.toml index af1151ef9..99ff71754 100644 --- a/store/Cargo.toml +++ b/store/Cargo.toml @@ -7,6 +7,8 @@ workspace = ".." [dependencies] byteorder = "^1.0" env_logger="^0.3.5" +serde = "~1.0.8" +serde_derive = "~1.0.8" slog = { version = "^2.0.12", features = ["max_level_trace", "release_max_level_trace"] } libc = "^0.2" memmap = { git = "https://github.com/danburkert/memmap-rs", tag="0.6.0" } diff --git a/store/src/lib.rs b/store/src/lib.rs index f15e3ba49..70a40027a 100644 --- a/store/src/lib.rs +++ b/store/src/lib.rs @@ -28,9 +28,13 @@ extern crate grin_util as util; extern crate libc; extern crate memmap; extern crate rocksdb; +extern crate serde; +#[macro_use] +extern crate serde_derive; #[macro_use] extern crate slog; + pub mod pmmr; pub mod types; diff --git a/store/src/pmmr.rs b/store/src/pmmr.rs index b7d432f98..0e924dd69 100644 --- a/store/src/pmmr.rs +++ b/store/src/pmmr.rs @@ -18,7 +18,7 @@ use std::io::{self}; use std::marker::PhantomData; use core::core::pmmr::{self, Backend}; -use core::ser::{self, PMMRable}; +use core::ser::{self, PMMRable, Readable, Writeable, Reader, Writer}; use core::core::hash::Hash; use util::LOGGER; use types::{AppendOnlyFile, RemoveLog, read_ordered_vec, write_vec}; @@ -31,6 +31,42 @@ const PMMR_PRUNED_FILE: &'static str = "pmmr_pruned.bin"; /// Maximum number of nodes in the remove log before it gets flushed pub const RM_LOG_MAX_NODES: usize = 10000; +/// Metadata for the PMMR backend's AppendOnlyFile, which can be serialized and stored +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub struct PMMRFileMetadata { + /// last written index of the hash file + pub last_hash_file_pos: u64, + /// last written index of the data file + pub last_data_file_pos: u64, +} + +impl Writeable for PMMRFileMetadata { + fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> { + writer.write_u64(self.last_hash_file_pos)?; + writer.write_u64(self.last_data_file_pos)?; + Ok(()) + } +} + +impl Readable for PMMRFileMetadata { + fn read(reader: &mut Reader) -> Result<PMMRFileMetadata, ser::Error> { + Ok(PMMRFileMetadata { + last_hash_file_pos: reader.read_u64()?, + last_data_file_pos: reader.read_u64()?, + }) + } +} + +impl PMMRFileMetadata { + /// Return fields with all positions = 0 + pub fn empty() -> PMMRFileMetadata { + PMMRFileMetadata { + last_hash_file_pos: 0, + last_data_file_pos: 0, + } + } +} + /// PMMR persistent backend implementation. Relies on multiple facilities to /// handle writing, reading and pruning. /// @@ -177,11 +213,15 @@ where { /// Instantiates a new PMMR backend that will use the provided directly to /// store its files. - pub fn new(data_dir: String) -> io::Result<PMMRBackend<T>> { - let hash_file = AppendOnlyFile::open(format!("{}/{}", data_dir, PMMR_HASH_FILE))?; + pub fn new(data_dir: String, file_md: Option<PMMRFileMetadata>) -> io::Result<PMMRBackend<T>> { + let (hash_to_pos, data_to_pos) = match file_md { + Some(m) => (m.last_hash_file_pos, m.last_data_file_pos), + None => (0,0) + }; + let hash_file = AppendOnlyFile::open(format!("{}/{}", data_dir, PMMR_HASH_FILE), hash_to_pos)?; let rm_log = RemoveLog::open(format!("{}/{}", data_dir, PMMR_RM_LOG_FILE))?; let prune_list = read_ordered_vec(format!("{}/{}", data_dir, PMMR_PRUNED_FILE), 8)?; - let data_file = AppendOnlyFile::open(format!("{}/{}", data_dir, PMMR_DATA_FILE))?; + let data_file = AppendOnlyFile::open(format!("{}/{}", data_dir, PMMR_DATA_FILE), data_to_pos)?; Ok(PMMRBackend { data_dir: data_dir, @@ -248,6 +288,14 @@ where self.get_data_file_path() } + /// Return last written buffer positions for the hash file and the data file + pub fn last_file_positions(&self) -> PMMRFileMetadata { + PMMRFileMetadata { + last_hash_file_pos: self.hash_file.last_buffer_pos() as u64, + last_data_file_pos: self.data_file.last_buffer_pos() as u64 + } + } + /// Checks the length of the remove log to see if it should get compacted. /// If so, the remove log is flushed into the pruned list, which itself gets /// saved, and the main hashsum data file is rewritten, cutting the removed @@ -276,7 +324,6 @@ where // avoid accidental double compaction) for pos in &self.rm_log.removed[..] { if let None = self.pruned_nodes.pruned_pos(pos.0) { - println!("ALREADY PRUNED?"); // TODO we likely can recover from this by directly jumping to 3 error!( LOGGER, @@ -333,14 +380,14 @@ where tmp_prune_file_hash.clone(), format!("{}/{}", self.data_dir, PMMR_HASH_FILE), )?; - self.hash_file = AppendOnlyFile::open(format!("{}/{}", self.data_dir, PMMR_HASH_FILE))?; + self.hash_file = AppendOnlyFile::open(format!("{}/{}", self.data_dir, PMMR_HASH_FILE), 0)?; // 5. and the same with the data file fs::rename( tmp_prune_file_data.clone(), format!("{}/{}", self.data_dir, PMMR_DATA_FILE), )?; - self.data_file = AppendOnlyFile::open(format!("{}/{}", self.data_dir, PMMR_DATA_FILE))?; + self.data_file = AppendOnlyFile::open(format!("{}/{}", self.data_dir, PMMR_DATA_FILE), 0)?; // 6. truncate the rm log self.rm_log.removed = self.rm_log.removed diff --git a/store/src/types.rs b/store/src/types.rs index 60a4eca84..46bac7c57 100644 --- a/store/src/types.rs +++ b/store/src/types.rs @@ -42,12 +42,13 @@ pub struct AppendOnlyFile { mmap: Option<memmap::Mmap>, buffer_start: usize, buffer: Vec<u8>, - buffer_start_bak: usize, + buffer_start_bak: usize } impl AppendOnlyFile { - /// Open a file (existing or not) as append-only, backed by a mmap. - pub fn open(path: String) -> io::Result<AppendOnlyFile> { + /// Open a file (existing or not) as append-only, backed by a mmap. Sets + /// the last written pos to to_pos if > 0, otherwise the end of the file + pub fn open(path: String, to_pos: u64) -> io::Result<AppendOnlyFile> { let file = OpenOptions::new() .read(true) .append(true) @@ -62,8 +63,12 @@ impl AppendOnlyFile { buffer_start_bak: 0, }; if let Ok(sz) = aof.size() { - if sz > 0 { - aof.buffer_start = sz as usize; + let mut buf_start = sz; + if to_pos > 0 && to_pos <= buf_start { + buf_start = to_pos; + } + if buf_start > 0 { + aof.buffer_start = buf_start as usize; aof.mmap = Some(unsafe { memmap::Mmap::map(&aof.file)? }); } } @@ -96,12 +101,18 @@ impl AppendOnlyFile { } self.buffer_start += self.buffer.len(); self.file.write(&self.buffer[..])?; - self.file.sync_data()?; + self.file.sync_all()?; self.buffer = vec![]; self.mmap = Some(unsafe { memmap::Mmap::map(&self.file)? }); Ok(()) } + /// Returns the last position (in bytes), taking into account whether data + /// has been rewound + pub fn last_buffer_pos(&self) -> usize { + self.buffer_start + } + /// Discard the current non-flushed data. pub fn discard(&mut self) { if self.buffer_start_bak > 0 { diff --git a/store/tests/pmmr.rs b/store/tests/pmmr.rs index 4e3b53322..117ba8b96 100644 --- a/store/tests/pmmr.rs +++ b/store/tests/pmmr.rs @@ -26,7 +26,7 @@ use core::core::hash::{Hash, Hashed}; #[test] fn pmmr_append() { let (data_dir, elems) = setup("append"); - let mut backend = store::pmmr::PMMRBackend::new(data_dir.to_string()).unwrap(); + let mut backend = store::pmmr::PMMRBackend::new(data_dir.to_string(), None).unwrap(); // adding first set of 4 elements and sync let mut mmr_size = load(0, &elems[0..4], &mut backend); @@ -64,7 +64,7 @@ fn pmmr_prune_compact() { let (data_dir, elems) = setup("prune_compact"); // setup the mmr store with all elements - let mut backend = store::pmmr::PMMRBackend::new(data_dir.to_string()).unwrap(); + let mut backend = store::pmmr::PMMRBackend::new(data_dir.to_string(), None).unwrap(); let mmr_size = load(0, &elems[..], &mut backend); backend.sync().unwrap(); @@ -114,7 +114,7 @@ fn pmmr_reload() { let mmr_size: u64; let root: Hash; { - let mut backend = store::pmmr::PMMRBackend::new(data_dir.to_string()).unwrap(); + let mut backend = store::pmmr::PMMRBackend::new(data_dir.to_string(), None).unwrap(); mmr_size = load(0, &elems[..], &mut backend); backend.sync().unwrap(); @@ -144,7 +144,7 @@ fn pmmr_reload() { // create a new backend and check everything is kosher { let mut backend:store::pmmr::PMMRBackend<TestElem> = - store::pmmr::PMMRBackend::new(data_dir.to_string()).unwrap(); + store::pmmr::PMMRBackend::new(data_dir.to_string(), None).unwrap(); assert_eq!(backend.unpruned_size().unwrap(), mmr_size); { let pmmr:PMMR<TestElem, _> = PMMR::at(&mut backend, mmr_size); @@ -159,7 +159,7 @@ fn pmmr_reload() { #[test] fn pmmr_rewind() { let (data_dir, elems) = setup("rewind"); - let mut backend = store::pmmr::PMMRBackend::new(data_dir.clone()).unwrap(); + let mut backend = store::pmmr::PMMRBackend::new(data_dir.clone(), None).unwrap(); // adding elements and keeping the corresponding root let mut mmr_size = load(0, &elems[0..4], &mut backend); @@ -223,7 +223,7 @@ fn pmmr_compact_horizon() { let root: Hash; { // setup the mmr store with all elements - let mut backend = store::pmmr::PMMRBackend::new(data_dir.to_string()).unwrap(); + let mut backend = store::pmmr::PMMRBackend::new(data_dir.to_string(), None).unwrap(); let mmr_size = load(0, &elems[..], &mut backend); backend.sync().unwrap(); @@ -249,7 +249,7 @@ fn pmmr_compact_horizon() { // recheck stored data { // recreate backend - let mut backend = store::pmmr::PMMRBackend::<TestElem>::new(data_dir.to_string()).unwrap(); + let mut backend = store::pmmr::PMMRBackend::<TestElem>::new(data_dir.to_string(), None).unwrap(); // 9 elements total, minus 2 compacted assert_eq!(backend.data_size().unwrap(), 7); // 15 nodes total, 2 pruned and compacted @@ -262,7 +262,7 @@ fn pmmr_compact_horizon() { // recheck stored data { // recreate backend - let backend = store::pmmr::PMMRBackend::<TestElem>::new(data_dir.to_string()).unwrap(); + let backend = store::pmmr::PMMRBackend::<TestElem>::new(data_dir.to_string(), None).unwrap(); // 9 elements total, minus 4 compacted assert_eq!(backend.data_size().unwrap(), 5); // 15 nodes total, 6 pruned and compacted