diff --git a/chain/src/chain.rs b/chain/src/chain.rs index a66f8aa42..c339e3d22 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -16,13 +16,16 @@ //! and mostly the chain pipeline. use std::collections::HashMap; +use std::fs::File; use std::sync::{Arc, Mutex, RwLock}; use std::time::{Duration, Instant}; use util::secp::pedersen::RangeProof; use core::core::{Input, OutputIdentifier, SumCommit}; +use core::core::hash::Hashed; use core::core::pmmr::{HashSum, NoSum}; +use core::global; use core::core::{Block, BlockHeader, TxKernel}; use core::core::target::Difficulty; @@ -112,6 +115,7 @@ impl OrphanBlockPool { /// the current view of the UTXO set according to the chain state. Also /// maintains locking for the pipeline to avoid conflicting processing. pub struct Chain { + db_root: String, store: Arc, adapter: Arc, @@ -183,9 +187,10 @@ impl Chain { ); let store = Arc::new(chain_store); - let sumtrees = sumtree::SumTrees::open(db_root, store.clone())?; + let sumtrees = sumtree::SumTrees::open(db_root.clone(), store.clone())?; Ok(Chain { + db_root: db_root, store: store, adapter: adapter, head: Arc::new(Mutex::new(head)), @@ -194,25 +199,26 @@ impl Chain { pow_verifier: pow_verifier, }) } -/// Processes a single block, then checks for orphans, processing -/// those as well if they're found -pub fn process_block(&self, b: Block, opts: Options) --> Result<(Option, Option), Error> -{ - let res = self.process_block_no_orphans(b, opts); - match res { - Ok((t, b)) => { - // We accepted a block, so see if we can accept any orphans - if b.is_some() { - self.check_orphans(&b.clone().unwrap()); + + /// Processes a single block, then checks for orphans, processing + /// those as well if they're found + pub fn process_block(&self, b: Block, opts: Options) + -> Result<(Option, Option), Error> + { + let res = self.process_block_no_orphans(b, opts); + match res { + Ok((t, b)) => { + // We accepted a block, so see if we can accept any orphans + if let Some(ref b) = b { + self.check_orphans(b.hash()); + } + Ok((t, b)) + }, + Err(e) => { + Err(e) + } } - Ok((t, b)) - }, - Err(e) => { - Err(e) } - } -} /// Attempt to add a new block to the chain. Returns the new chain tip if it /// has been added to the longest chain, None if it's added to an (as of @@ -348,13 +354,12 @@ pub fn process_block(&self, b: Block, opts: Options) /// Check for orphans, once a block is successfully added - pub fn check_orphans(&self, block: &Block) { + pub fn check_orphans(&self, mut last_block_hash: Hash) { debug!( LOGGER, "chain: check_orphans: # orphans {}", self.orphans.len(), ); - let mut last_block_hash = block.hash(); // Is there an orphan in our orphans that we can now process? // We just processed the given block, are there any orphans that have this block // as their "previous" block? @@ -390,6 +395,14 @@ pub fn process_block(&self, b: Block, opts: Options) sumtrees.is_unspent(output_ref) } + pub fn validate(&self) -> Result<(), Error> { + let header = self.store.head_header()?; + let mut sumtrees = self.sumtrees.write().unwrap(); + sumtree::extending(&mut sumtrees, |extension| { + extension.validate(&header) + }) + } + /// Check if the input has matured sufficiently for the given block height. /// This only applies to inputs spending coinbase outputs. /// An input spending a non-coinbase output will always pass this check. @@ -432,6 +445,76 @@ pub fn process_block(&self, b: Block, opts: Options) sumtrees.roots() } + /// Provides a reading view into the current sumtree state as well as + /// the required indexes for a consumer to rewind to a consistent state + /// at the provided block hash. + pub fn sumtrees_read(&self, h: Hash) -> Result<(u64, u64, File), Error> { + let b = self.get_block(&h)?; + + // get the indexes for the block + let out_index: u64; + let kernel_index: u64; + { + let sumtrees = self.sumtrees.read().unwrap(); + let (oi, ki) = sumtrees.indexes_at(&b)?; + out_index = oi; + kernel_index = ki; + } + + // prepares the zip and return the corresponding Read + let sumtree_reader = sumtree::zip_read(self.db_root.clone())?; + Ok((out_index, kernel_index, sumtree_reader)) + } + + /// Writes a reading view on a sumtree state that's been provided to us. + /// If we're willing to accept that new state, the data stream will be + /// read as a zip file, unzipped and the resulting state files should be + /// rewound to the provided indexes. + pub fn sumtrees_write( + &self, + h: Hash, + rewind_to_output: u64, + rewind_to_kernel: u64, + sumtree_data: File + ) -> Result<(), Error> { + + let head = self.head().unwrap(); + let header_head = self.get_header_head().unwrap(); + if header_head.height - head.height < global::cut_through_horizon() as u64 { + return Err(Error::InvalidSumtree("not needed".to_owned())); + } + + let header = self.store.get_block_header(&h)?; + sumtree::zip_write(self.db_root.clone(), sumtree_data)?; + + let mut sumtrees = sumtree::SumTrees::open(self.db_root.clone(), self.store.clone())?; + sumtree::extending(&mut sumtrees, |extension| { + extension.rewind_pos(header.height, rewind_to_output, rewind_to_kernel)?; + extension.validate(&header)?; + // TODO validate kernels and their sums with UTXOs + extension.rebuild_index()?; + Ok(()) + })?; + + // replace the chain sumtrees with the newly built one + { + let mut sumtrees_ref = self.sumtrees.write().unwrap(); + *sumtrees_ref = sumtrees; + } + + // setup new head + { + let mut head = self.head.lock().unwrap(); + *head = Tip::from_block(&header); + self.store.save_body_head(&head); + self.store.save_header_height(&header)?; + } + + self.check_orphans(header.hash()); + + Ok(()) + } + /// returns the last n nodes inserted into the utxo sum tree pub fn get_last_n_utxo(&self, distance: u64) -> Vec> { let mut sumtrees = self.sumtrees.write().unwrap(); @@ -455,6 +538,11 @@ pub fn process_block(&self, b: Block, opts: Options) self.head.lock().unwrap().clone().total_difficulty } + /// Total difficulty at the head of the header chain + pub fn total_header_difficulty(&self) -> Result { + Ok(self.store.get_header_head()?.total_difficulty) + } + /// Reset header_head and sync_head to head of current body chain pub fn reset_head(&self) -> Result<(), Error> { self.store diff --git a/chain/src/pipe.rs b/chain/src/pipe.rs index caee993ea..d8b3aead2 100644 --- a/chain/src/pipe.rs +++ b/chain/src/pipe.rs @@ -64,19 +64,23 @@ pub fn process_block(b: &Block, mut ctx: BlockContext) -> Result, Er validate_header(&b.header, &mut ctx)?; - // valid header, now check we actually have the previous block in the store + // valid header, now check we actually have the previous block in the store // not just the header but the block itself - // we cannot assume we can use the chain head for this as we may be dealing with a fork - // we cannot use heights here as the fork may have jumped in height - match ctx.store.get_block(&b.header.previous) { - Ok(_) => {}, - Err(grin_store::Error::NotFoundErr) => { - return Err(Error::Orphan); - }, - Err(e) => { - return Err(Error::StoreErr(e, "pipe get previous".to_owned())); + // short circuit the test first both for performance (in-mem vs db access) + // but also for the specific case of the first fast sync full block + if b.header.previous != ctx.head.last_block_h { + // we cannot assume we can use the chain head for this as we may be dealing with a fork + // we cannot use heights here as the fork may have jumped in height + match ctx.store.block_exists(&b.header.previous) { + Ok(true) => {}, + Ok(false) => { + return Err(Error::Orphan); + }, + Err(e) => { + return Err(Error::StoreErr(e, "pipe get previous".to_owned())); + } } - }; + } // valid header and we have a previous block, time to take the lock on the sum trees let local_sumtrees = ctx.sumtrees.clone(); diff --git a/chain/src/store.rs b/chain/src/store.rs index 05120cc5d..c0765c440 100644 --- a/chain/src/store.rs +++ b/chain/src/store.rs @@ -24,6 +24,7 @@ use core::core::{Block, BlockHeader}; use core::consensus::TargetError; use core::core::target::Difficulty; use grin_store::{self, option_to_not_found, to_key, Error, u64_to_key}; +use util::LOGGER; const STORE_SUBPATH: &'static str = "chain"; @@ -98,6 +99,10 @@ impl ChainStore for ChainKVStore { option_to_not_found(self.db.get_ser(&to_key(BLOCK_PREFIX, &mut h.to_vec()))) } + fn block_exists(&self, h: &Hash) -> Result { + self.db.exists(&to_key(BLOCK_PREFIX, &mut h.to_vec())) + } + fn get_block_header(&self, h: &Hash) -> Result { option_to_not_found( self.db.get_ser(&to_key(BLOCK_HEADER_PREFIX, &mut h.to_vec())), @@ -139,6 +144,11 @@ impl ChainStore for ChainKVStore { option_to_not_found(self.db.get_ser(&u64_to_key(HEADER_HEIGHT_PREFIX, height))) } + fn save_header_height(&self, bh: &BlockHeader) -> Result<(), Error> { + self.db + .put_ser(&u64_to_key(HEADER_HEIGHT_PREFIX, bh.height), bh) + } + fn delete_header_by_height(&self, height: u64) -> Result<(), Error> { self.db.delete(&u64_to_key(HEADER_HEIGHT_PREFIX, height)) } diff --git a/chain/src/sumtree.rs b/chain/src/sumtree.rs index 0a47a4a2c..96f8d412e 100644 --- a/chain/src/sumtree.rs +++ b/chain/src/sumtree.rs @@ -17,23 +17,31 @@ use std::fs; use std::collections::HashMap; -use std::path::Path; +use std::fs::File; +use std::ops::Deref; +use std::path::{Path, PathBuf}; use std::sync::Arc; -use core::core::{Block, SumCommit, Input, Output, OutputIdentifier, TxKernel, OutputFeatures}; -use core::core::pmmr::{HashSum, NoSum, Summable, PMMR}; +use util::{secp, static_secp_instance}; +use util::secp::pedersen::{RangeProof, Commitment}; + +use core::consensus::reward; +use core::core::{Block, BlockHeader, SumCommit, Input, Output, OutputIdentifier, OutputFeatures, TxKernel}; +use core::core::pmmr::{self, HashSum, NoSum, Summable, PMMR}; use core::core::hash::Hashed; +use core::ser::{self, Readable}; use grin_store; -use grin_store::sumtree::PMMRBackend; +use grin_store::sumtree::{PMMRBackend, AppendOnlyFile}; use types::ChainStore; use types::Error; -use util::LOGGER; -use util::secp::pedersen::{RangeProof, Commitment}; +use util::{LOGGER, zip}; const SUMTREES_SUBDIR: &'static str = "sumtrees"; const UTXO_SUBDIR: &'static str = "utxo"; const RANGE_PROOF_SUBDIR: &'static str = "rangeproof"; const KERNEL_SUBDIR: &'static str = "kernel"; +const KERNEL_FILE: &'static str = "kernel_full_data.bin"; +const SUMTREES_ZIP: &'static str = "sumtrees_snapshot.zip"; struct PMMRHandle where @@ -68,10 +76,14 @@ where /// guaranteed to indicate whether an output is spent or not. The index /// may have commitments that have already been spent, even with /// pruning enabled. +/// +/// In addition of the sumtrees, this maintains the full list of kernel +/// data so it can be easily packaged for sync or validation. pub struct SumTrees { output_pmmr_h: PMMRHandle, rproof_pmmr_h: PMMRHandle>, kernel_pmmr_h: PMMRHandle>, + kernel_file: AppendOnlyFile, // chain store used as index of commitments to MMR positions commit_index: Arc, @@ -80,10 +92,16 @@ pub struct SumTrees { impl SumTrees { /// Open an existing or new set of backends for the SumTrees pub fn open(root_dir: String, commit_index: Arc) -> Result { + let mut kernel_file_path: PathBuf = [&root_dir, SUMTREES_SUBDIR, KERNEL_SUBDIR].iter().collect(); + fs::create_dir_all(kernel_file_path.clone())?; + kernel_file_path.push(KERNEL_FILE); + let kernel_file = AppendOnlyFile::open(kernel_file_path.to_str().unwrap().to_owned())?; + Ok(SumTrees { output_pmmr_h: PMMRHandle::new(root_dir.clone(), UTXO_SUBDIR)?, rproof_pmmr_h: PMMRHandle::new(root_dir.clone(), RANGE_PROOF_SUBDIR)?, kernel_pmmr_h: PMMRHandle::new(root_dir.clone(), KERNEL_SUBDIR)?, + kernel_file: kernel_file, commit_index: commit_index, }) } @@ -163,6 +181,11 @@ impl SumTrees { kernel_pmmr.get_last_n_insertions(distance) } + /// Output and kernel MMR indexes at the end of the provided block + pub fn indexes_at(&self, block: &Block) -> Result<(u64, u64), Error> { + indexes_at(block, self.commit_index.deref()) + } + /// Get sum tree roots pub fn roots( &mut self, @@ -193,10 +216,12 @@ where let res: Result; let rollback: bool; { - debug!(LOGGER, "Starting new sumtree extension."); let commit_index = trees.commit_index.clone(); + + debug!(LOGGER, "Starting new sumtree extension."); let mut extension = Extension::new(trees, commit_index); res = inner(&mut extension); + rollback = extension.rollback; if res.is_ok() && !rollback { extension.save_pos_index()?; @@ -209,6 +234,7 @@ where trees.output_pmmr_h.backend.discard(); trees.rproof_pmmr_h.backend.discard(); trees.kernel_pmmr_h.backend.discard(); + trees.kernel_file.discard(); Err(e) } Ok(r) => { @@ -217,11 +243,13 @@ where trees.output_pmmr_h.backend.discard(); trees.rproof_pmmr_h.backend.discard(); trees.kernel_pmmr_h.backend.discard(); + trees.kernel_file.discard(); } else { debug!(LOGGER, "Committing sumtree extension."); trees.output_pmmr_h.backend.sync()?; trees.rproof_pmmr_h.backend.sync()?; trees.kernel_pmmr_h.backend.sync()?; + trees.kernel_file.flush()?; trees.output_pmmr_h.last_pos = sizes.0; trees.rproof_pmmr_h.last_pos = sizes.1; trees.kernel_pmmr_h.last_pos = sizes.2; @@ -241,6 +269,7 @@ pub struct Extension<'a> { rproof_pmmr: PMMR<'a, NoSum, PMMRBackend>>, kernel_pmmr: PMMR<'a, NoSum, PMMRBackend>>, + kernel_file: &'a mut AppendOnlyFile, commit_index: Arc, new_output_commits: HashMap, new_kernel_excesses: HashMap, @@ -249,7 +278,11 @@ pub struct Extension<'a> { impl<'a> Extension<'a> { // constructor - fn new(trees: &'a mut SumTrees, commit_index: Arc) -> Extension<'a> { + fn new( + trees: &'a mut SumTrees, + commit_index: Arc, + ) -> Extension<'a> { + Extension { output_pmmr: PMMR::at( &mut trees.output_pmmr_h.backend, @@ -263,6 +296,7 @@ impl<'a> Extension<'a> { &mut trees.kernel_pmmr_h.backend, trees.kernel_pmmr_h.last_pos, ), + kernel_file: &mut trees.kernel_file, commit_index: commit_index, new_output_commits: HashMap::new(), new_kernel_excesses: HashMap::new(), @@ -407,15 +441,18 @@ impl<'a> Extension<'a> { } } } - // push kernels in their MMR + + // push kernels in their MMR and file let pos = self.kernel_pmmr .push(NoSum(kernel.clone())) .map_err(&Error::SumTreeErr)?; self.new_kernel_excesses.insert(kernel.excess, pos); + self.kernel_file.append(&mut ser::ser_vec(&kernel).unwrap()); + Ok(()) } - /// Rewinds the MMRs to the provided position, given the last output and + /// Rewinds the MMRs to the provided block, using the last output and /// last kernel of the block we want to rewind to. pub fn rewind(&mut self, block: &Block) -> Result<(), Error> { debug!( @@ -425,22 +462,23 @@ impl<'a> Extension<'a> { block.header.height, ); - let out_pos_rew = match block.outputs.last() { - Some(output) => self.get_output_pos(&output.commitment()) - .map_err(|e| { - Error::StoreErr(e, format!("missing output pos for known block")) - })?, - None => 0, - }; + // rewind each MMR + let (out_pos_rew, kern_pos_rew) = indexes_at(block, self.commit_index.deref())?; + self.rewind_pos(block.header.height, out_pos_rew, kern_pos_rew)?; + + // rewind the kernel file store, the position is the number of kernels + // multiplied by their size + // the number of kernels is the number of leaves in the MMR, which is the + // sum of the number of leaf nodes under each peak in the MMR + let pos: u64 = pmmr::peaks(kern_pos_rew).iter().map(|n| (1 << n) as u64).sum(); + self.kernel_file.rewind(pos * (TxKernel::size() as u64)); - let kern_pos_rew = match block.kernels.last() { - Some(kernel) => self.get_kernel_pos(&kernel.excess) - .map_err(|e| { - Error::StoreErr(e, format!("missing kernel pos for known block")) - })?, - None => 0, - }; + Ok(()) + } + /// Rewinds the MMRs to the provided positions, given the output and + /// kernel we want to rewind to. + pub fn rewind_pos(&mut self, height: u64, out_pos_rew: u64, kern_pos_rew: u64) -> Result<(), Error> { debug!( LOGGER, "Rewind sumtrees to output pos: {}, kernel pos: {}", @@ -448,8 +486,6 @@ impl<'a> Extension<'a> { kern_pos_rew, ); - let height = block.header.height; - self.output_pmmr .rewind(out_pos_rew, height as u32) .map_err(&Error::SumTreeErr)?; @@ -496,14 +532,67 @@ impl<'a> Extension<'a> { ) } + /// Validate the current sumtree state against a block header + pub fn validate(&self, header: &BlockHeader) -> Result<(), Error> { + // validate all hashes and sums within the trees + if let Err(e) = self.output_pmmr.validate() { + return Err(Error::InvalidSumtree(e)); + } + if let Err(e) = self.rproof_pmmr.validate() { + return Err(Error::InvalidSumtree(e)); + } + if let Err(e) = self.kernel_pmmr.validate() { + return Err(Error::InvalidSumtree(e)); + } + + // validate the tree roots against the block header + let (utxo_root, rproof_root, kernel_root) = self.roots(); + if utxo_root.hash != header.utxo_root || rproof_root.hash != header.range_proof_root + || kernel_root.hash != header.kernel_root + { + return Err(Error::InvalidRoot); + } + + // the real magicking: the sum of all kernel excess should equal the sum + // of all UTXO commitments, minus the total supply + let (kernel_sum, fees) = self.sum_kernels()?; + let utxo_sum = self.sum_utxos()?; + { + let secp = static_secp_instance(); + let secp = secp.lock().unwrap(); + let over_commit = secp.commit_value(header.height * reward(0) - fees / 2)?; + let adjusted_sum_utxo = secp.commit_sum(vec![utxo_sum], vec![over_commit])?; + + if adjusted_sum_utxo != kernel_sum { + return Err(Error::InvalidSumtree("Differing UTXO commitment and kernel excess sums.".to_owned())); + } + } + + Ok(()) + } + + /// Rebuild the index of MMR positions to the corresponding UTXO and kernel + /// by iterating over the whole MMR data. This is a costly operation + /// performed only when we receive a full new chain state. + pub fn rebuild_index(&self) -> Result<(), Error> { + for n in 1..self.output_pmmr.unpruned_size()+1 { + // non-pruned leaves only + if pmmr::bintree_postorder_height(n) == 0 { + if let Some(hs) = self.output_pmmr.get(n) { + self.commit_index.save_output_pos(&hs.sum.commit, n)?; + } + } + } + Ok(()) + } + /// Force the rollback of this extension, no matter the result pub fn force_rollback(&mut self) { self.rollback = true; } /// Dumps the state of the 3 sum trees to stdout for debugging. Short - /// version - /// only prints the UTXO tree. + /// version only prints the UTXO tree. pub fn dump(&self, short: bool) { debug!(LOGGER, "-- outputs --"); self.output_pmmr.dump(short); @@ -523,4 +612,109 @@ impl<'a> Extension<'a> { self.kernel_pmmr.unpruned_size(), ) } + + /// Sums the excess of all our kernels, validating their signatures on the way + fn sum_kernels(&self) -> Result<(Commitment, u64), Error> { + // make sure we have the right count of kernels using the MMR, the storage + // file may have a few more + let mmr_sz = self.kernel_pmmr.unpruned_size(); + let count: u64 = pmmr::peaks(mmr_sz).iter().map(|n| { + (1 << pmmr::bintree_postorder_height(*n)) as u64 + }).sum(); + + let mut kernel_file = File::open(self.kernel_file.path())?; + let first: TxKernel = ser::deserialize(&mut kernel_file)?; + first.verify()?; + let mut sum_kernel = first.excess; + let mut fees = first.fee; + + let secp = static_secp_instance(); + let mut kern_count = 1; + loop { + match ser::deserialize::(&mut kernel_file) { + Ok(kernel) => { + kernel.verify()?; + let secp = secp.lock().unwrap(); + sum_kernel = secp.commit_sum(vec![sum_kernel, kernel.excess], vec![])?; + fees += kernel.fee; + kern_count += 1; + if kern_count == count { + break; + } + } + Err(_) => break, + } + } + debug!(LOGGER, "Validated and summed {} kernels", kern_count); + Ok((sum_kernel, fees)) + } + + /// Sums all our UTXO commitments + fn sum_utxos(&self) -> Result { + let mut sum_utxo = None; + let mut utxo_count = 0; + let secp = static_secp_instance(); + for n in 1..self.output_pmmr.unpruned_size()+1 { + if pmmr::bintree_postorder_height(n) == 0 { + if let Some(hs) = self.output_pmmr.get(n) { + if n == 1 { + sum_utxo = Some(hs.sum.commit); + } else { + let secp = secp.lock().unwrap(); + sum_utxo = Some(secp.commit_sum(vec![sum_utxo.unwrap(), hs.sum.commit], vec![])?); + } + utxo_count += 1; + } + } + } + debug!(LOGGER, "Summed {} UTXOs", utxo_count); + Ok(sum_utxo.unwrap()) + } +} + +/// Output and kernel MMR indexes at the end of the provided block +fn indexes_at(block: &Block, commit_index: &ChainStore) -> Result<(u64, u64), Error> { + let out_idx = match block.outputs.last() { + Some(output) => commit_index.get_output_pos(&output.commitment()) + .map_err(|e| { + Error::StoreErr(e, format!("missing output pos for known block")) + })?, + None => 0, + }; + + let kern_idx = match block.kernels.last() { + Some(kernel) => commit_index.get_kernel_pos(&kernel.excess) + .map_err(|e| { + Error::StoreErr(e, format!("missing kernel pos for known block")) + })?, + None => 0, + }; + Ok((out_idx, kern_idx)) +} + +/// Packages the sumtree data files into a zip and returns a Read to the +/// resulting file +pub fn zip_read(root_dir: String) -> Result { + let sumtrees_path = Path::new(&root_dir).join(SUMTREES_SUBDIR); + let zip_path = Path::new(&root_dir).join(SUMTREES_ZIP); + + // create the zip archive + { + zip::compress(&sumtrees_path, &File::create(zip_path.clone())?) + .map_err(|ze| Error::Other(ze.to_string()))?; + } + + // open it again to read it back + let zip_file = File::open(zip_path)?; + Ok(zip_file) +} + +/// Extract the sumtree data from a zip file and writes the content into the +/// sumtree storage dir +pub fn zip_write(root_dir: String, sumtree_data: File) -> Result<(), Error> { + let sumtrees_path = Path::new(&root_dir).join(SUMTREES_SUBDIR); + + fs::create_dir_all(sumtrees_path.clone())?; + zip::decompress(sumtree_data, &sumtrees_path) + .map_err(|ze| Error::Other(ze.to_string())) } diff --git a/chain/src/types.rs b/chain/src/types.rs index c02686031..d3f67cf0e 100644 --- a/chain/src/types.rs +++ b/chain/src/types.rs @@ -16,6 +16,7 @@ use std::io; +use util::secp; use util::secp::pedersen::Commitment; use grin_store as store; @@ -76,6 +77,8 @@ pub enum Error { OutputSpent, /// Invalid block version, either a mistake or outdated software InvalidBlockVersion(u16), + /// We've been provided a bad sumtree + InvalidSumtree(String), /// Internal issue when trying to save or load data from store StoreErr(grin_store::Error, String), /// Error serializing or deserializing a type @@ -105,10 +108,15 @@ impl From for Error { Error::SumTreeErr(e.to_string()) } } +impl From for Error { + fn from(e: secp::Error) -> Error { + Error::SumTreeErr(format!("Sum validation error: {}", e.to_string())) + } +} impl Error { /// Whether the error is due to a block that was intrinsically wrong - pub fn is_bad_block(&self) -> bool { + pub fn is_bad_data(&self) -> bool { // shorter to match on all the "not the block's fault" errors match *self { Error::Unfit(_) | @@ -211,6 +219,9 @@ pub trait ChainStore: Send + Sync { /// Gets a block header by hash fn get_block(&self, h: &Hash) -> Result; + /// Check whether we have a block without reading it + fn block_exists(&self, h: &Hash) -> Result; + /// Gets a block header by hash fn get_block_header(&self, h: &Hash) -> Result; @@ -238,6 +249,9 @@ pub trait ChainStore: Send + Sync { /// Gets the block header at the provided height fn get_header_by_height(&self, height: u64) -> Result; + /// Save a header as associated with its height + fn save_header_height(&self, header: &BlockHeader) -> Result<(), store::Error>; + /// Delete the block header at the height fn delete_header_by_height(&self, height: u64) -> Result<(), store::Error>; diff --git a/chain/tests/mine_simple_chain.rs b/chain/tests/mine_simple_chain.rs index cfc15749a..063746164 100644 --- a/chain/tests/mine_simple_chain.rs +++ b/chain/tests/mine_simple_chain.rs @@ -118,6 +118,7 @@ fn mine_empty_chain() { let header_by_height = chain.get_header_by_height(n).unwrap(); assert_eq!(header_by_height.hash(), bhash); } + chain.validate().unwrap(); } #[test] diff --git a/config/src/types.rs b/config/src/types.rs index 33d0ca903..c9d3737fa 100644 --- a/config/src/types.rs +++ b/config/src/types.rs @@ -96,6 +96,7 @@ pub struct GlobalConfig { #[derive(Debug, Serialize, Deserialize)] pub struct ConfigMembers { /// Server config + #[serde(default)] pub server: ServerConfig, /// Mining config pub mining: Option, diff --git a/core/src/core/pmmr.rs b/core/src/core/pmmr.rs index 045f3f0df..766bf6485 100644 --- a/core/src/core/pmmr.rs +++ b/core/src/core/pmmr.rs @@ -47,7 +47,7 @@ use util::LOGGER; /// the tree can sum over pub trait Summable { /// The type of the sum - type Sum: Clone + ops::Add + Readable + Writeable; + type Sum: Clone + ops::Add + Readable + Writeable + PartialEq; /// Obtain the sum of the element fn sum(&self) -> Self::Sum; @@ -80,6 +80,12 @@ impl Writeable for NullSum { } } +impl PartialEq for NullSum { + fn eq(&self, _other: &NullSum) -> bool { + true + } +} + /// Wrapper for a type that allows it to be inserted in a tree without summing #[derive(Clone, Debug)] pub struct NoSum(pub T); @@ -103,7 +109,7 @@ where /// A utility type to handle (Hash, Sum) pairs more conveniently. The addition /// of two HashSums is the (Hash(h1|h2), h1 + h2) HashSum. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, Eq)] pub struct HashSum where T: Summable, @@ -130,6 +136,15 @@ where } } +impl PartialEq for HashSum +where + T: Summable, +{ + fn eq(&self, other: &HashSum) -> bool { + self.hash == other.hash && self.sum == other.sum + } +} + impl Readable for HashSum where T: Summable, @@ -382,6 +397,32 @@ where return_vec } + /// Walks all unpruned nodes in the MMR and revalidate all parent hashes + /// and sums. + pub fn validate(&self) -> Result<(), String> { + // iterate on all parent nodes + for n in 1..(self.last_pos + 1) { + if bintree_postorder_height(n) > 0 { + if let Some(hs) = self.get(n) { + // take the left and right children, if they exist + let left_pos = bintree_move_down_left(n).unwrap(); + let right_pos = bintree_jump_right_sibling(left_pos); + + if let Some(left_child_hs) = self.get(left_pos) { + if let Some(right_child_hs) = self.get(right_pos) { + // sum and compare + if left_child_hs + right_child_hs != hs { + return Err(format!("Invalid MMR, hashsum of parent at {} does \ + not match children.", n)); + } + } + } + } + } + } + Ok(()) + } + /// Total size of the tree, including intermediary nodes an ignoring any /// pruning. pub fn unpruned_size(&self) -> u64 { @@ -583,7 +624,7 @@ impl PruneList { /// node's position. Starts with the top peak, which is always on the left /// side of the range, and navigates toward lower siblings toward the right /// of the range. -fn peaks(num: u64) -> Vec { +pub fn peaks(num: u64) -> Vec { // detecting an invalid mountain range, when siblings exist but no parent // exists if bintree_postorder_height(num + 1) > bintree_postorder_height(num) { diff --git a/core/src/core/transaction.rs b/core/src/core/transaction.rs index f1b4c4ec5..427c4a68e 100644 --- a/core/src/core/transaction.rs +++ b/core/src/core/transaction.rs @@ -167,6 +167,13 @@ impl TxKernel { } Ok(()) } + + /// Size in bytes of a kernel, necessary for binary storage + pub fn size() -> usize { + 17 + // features plus fee and lock_height + secp::constants::PEDERSEN_COMMITMENT_SIZE + + secp::constants::AGG_SIGNATURE_SIZE + } } /// A transaction @@ -770,7 +777,7 @@ impl Readable for OutputIdentifier { } /// Wrapper to Output commitments to provide the Summable trait. -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] pub struct SumCommit { /// Output features (coinbase vs. regular transaction output) /// We need to include this when hashing to ensure coinbase maturity can be enforced. diff --git a/core/src/global.rs b/core/src/global.rs index 00aa43c7d..a48b03282 100644 --- a/core/src/global.rs +++ b/core/src/global.rs @@ -26,7 +26,7 @@ use consensus::PROOFSIZE; use consensus::DEFAULT_SIZESHIFT; use consensus::COINBASE_MATURITY; use consensus::{MEDIAN_TIME_WINDOW, INITIAL_DIFFICULTY, - BLOCK_TIME_SEC, DIFFICULTY_ADJUST_WINDOW}; + BLOCK_TIME_SEC, DIFFICULTY_ADJUST_WINDOW, CUT_THROUGH_HORIZON}; use core::target::Difficulty; use consensus::TargetError; @@ -51,6 +51,9 @@ pub const AUTOMATED_TESTING_COINBASE_MATURITY: u64 = 3; /// User testing coinbase maturity pub const USER_TESTING_COINBASE_MATURITY: u64 = 3; +/// Testing cut through horizon in blocks +pub const TESTING_CUT_THROUGH_HORIZON: u32 = 20; + /// Testing initial block difficulty pub const TESTING_INITIAL_DIFFICULTY: u64 = 1; @@ -163,6 +166,18 @@ pub fn initial_block_difficulty() -> u64 { } } +/// Horizon at which we can cut-through and do full local pruning +pub fn cut_through_horizon() -> u32 { + let param_ref = CHAIN_TYPE.read().unwrap(); + match *param_ref { + ChainTypes::AutomatedTesting => TESTING_CUT_THROUGH_HORIZON, + ChainTypes::UserTesting => TESTING_CUT_THROUGH_HORIZON, + ChainTypes::Testnet1 => CUT_THROUGH_HORIZON, + ChainTypes::Testnet2 => CUT_THROUGH_HORIZON, + ChainTypes::Mainnet => CUT_THROUGH_HORIZON, + } +} + /// Are we in automated testing mode? pub fn is_automated_testing_mode() -> bool { let param_ref = CHAIN_TYPE.read().unwrap(); diff --git a/grin/src/adapters.rs b/grin/src/adapters.rs index 3c5dfbf3b..85ca06981 100644 --- a/grin/src/adapters.rs +++ b/grin/src/adapters.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fs::File; use std::net::SocketAddr; use std::sync::{Arc, RwLock}; use std::sync::atomic::{AtomicBool, Ordering}; @@ -123,7 +124,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { if let &Err(ref e) = &res { debug!(LOGGER, "Block header {} refused by chain: {:?}", bhash, e); - if e.is_bad_block() { + if e.is_bad_data() { debug!(LOGGER, "header_received: {} is a bad header, resetting header head", bhash); let _ = self.chain.reset_head(); return false; @@ -183,10 +184,14 @@ impl p2p::ChainAdapter for NetToChainAdapter { } } } + + let header_head = self.chain.get_header_head().unwrap(); info!( LOGGER, - "Added {} headers to the header chain.", - added_hs.len() + "Added {} headers to the header chain. Last: {} at {}.", + added_hs.len(), + header_head.last_block_h, + header_head.height, ); } @@ -241,6 +246,41 @@ impl p2p::ChainAdapter for NetToChainAdapter { } } + /// Provides a reading view into the current sumtree state as well as + /// the required indexes for a consumer to rewind to a consistant state + /// at the provided block hash. + fn sumtrees_read(&self, h: Hash) -> Option { + match self.chain.sumtrees_read(h.clone()) { + Ok((out_index, kernel_index, read)) => Some(p2p::SumtreesRead { + output_index: out_index, + kernel_index: kernel_index, + reader: read, + }), + Err(e) => { + warn!(LOGGER, "Couldn't produce sumtrees data for block {}: {:?}", + h, e); + None + } + } + } + + /// Writes a reading view on a sumtree state that's been provided to us. + /// If we're willing to accept that new state, the data stream will be + /// read as a zip file, unzipped and the resulting state files should be + /// rewound to the provided indexes. + fn sumtrees_write(&self, h: Hash, + rewind_to_output: u64, rewind_to_kernel: u64, + sumtree_data: File, peer_addr: SocketAddr) -> bool { + // TODO check whether we should accept any sumtree now + if let Err(e) = self.chain.sumtrees_write(h, rewind_to_output, rewind_to_kernel, sumtree_data) { + error!(LOGGER, "Failed to save sumtree archive: {:?}", e); + !e.is_bad_data() + } else { + info!(LOGGER, "Received valid sumtree data for {}.", h); + self.currently_syncing.store(true, Ordering::Relaxed); + true + } + } } impl NetToChainAdapter { @@ -302,7 +342,7 @@ impl NetToChainAdapter { let res = self.chain.process_block(b, self.chain_opts()); if let Err(ref e) = res { debug!(LOGGER, "Block {} refused by chain: {:?}", bhash, e); - if e.is_bad_block() { + if e.is_bad_data() { debug!(LOGGER, "adapter: process_block: {} is a bad block, resetting head", bhash); let _ = self.chain.reset_head(); return false; diff --git a/grin/src/server.rs b/grin/src/server.rs index 2c11c77f7..92e3df441 100644 --- a/grin/src/server.rs +++ b/grin/src/server.rs @@ -142,6 +142,7 @@ impl Server { p2p_server.peers.clone(), shared_chain.clone(), skip_sync_wait, + !config.archive_mode, ); let p2p_inner = p2p_server.clone(); @@ -206,6 +207,11 @@ impl Server { self.chain.head().unwrap() } + /// The head of the block header chain + pub fn header_head(&self) -> chain::Tip { + self.chain.get_header_head().unwrap() + } + /// Returns a set of stats about this server. This and the ServerStats /// structure /// can be updated over time to include any information needed by tests or diff --git a/grin/src/sync.rs b/grin/src/sync.rs index 91b61aa37..d8208c95b 100644 --- a/grin/src/sync.rs +++ b/grin/src/sync.rs @@ -19,8 +19,9 @@ use std::sync::atomic::{AtomicBool, Ordering}; use time; use chain; -use core::core::hash::{Hash, Hashed}; +use core::core::hash::{Hash, Hashed, ZERO_HASH}; use core::core::target::Difficulty; +use core::global; use p2p::{self, Peer, Peers, ChainAdapter}; use types::Error; use util::LOGGER; @@ -31,6 +32,7 @@ pub fn run_sync( peers: p2p::Peers, chain: Arc, skip_sync_wait: bool, + fast_sync: bool, ) { let chain = chain.clone(); @@ -39,18 +41,36 @@ pub fn run_sync( .spawn(move || { let mut prev_body_sync = time::now_utc(); let mut prev_header_sync = prev_body_sync.clone(); + let mut prev_state_sync = prev_body_sync.clone() - time::Duration::seconds(5 * 60); // initial sleep to give us time to peer with some nodes if !skip_sync_wait { thread::sleep(Duration::from_secs(30)); } - loop { - let syncing = needs_syncing( - currently_syncing.clone(), peers.clone(), chain.clone()); - if syncing { + // fast sync has 3 states: + // * syncing headers + // * once all headers are sync'd, requesting the sumtree state + // * once we have the state, get blocks after that + // + // full sync gets rid of the middle step and just starts from + // the genesis state - let current_time = time::now_utc(); + loop { + let horizon = global::cut_through_horizon() as u64; + let head = chain.head().unwrap(); + let header_head = chain.get_header_head().unwrap(); + + // in archival nodes (no fast sync) we just consider we have the whole + // state already + let have_sumtrees = !fast_sync || head.height > 0 && + header_head.height.saturating_sub(head.height) <= horizon; + + let syncing = needs_syncing( + currently_syncing.clone(), peers.clone(), chain.clone(), !have_sumtrees); + + let current_time = time::now_utc(); + if syncing { // run the header sync every 10s if current_time - prev_header_sync > time::Duration::seconds(10) { @@ -62,7 +82,7 @@ pub fn run_sync( } // run the body_sync every 5s - if current_time - prev_body_sync > time::Duration::seconds(5) { + if have_sumtrees && current_time - prev_body_sync > time::Duration::seconds(5) { body_sync( peers.clone(), chain.clone(), @@ -70,10 +90,29 @@ pub fn run_sync( prev_body_sync = current_time; } - thread::sleep(Duration::from_secs(1)); - } else { - thread::sleep(Duration::from_secs(10)); - } + } else if !have_sumtrees && + current_time - prev_state_sync > time::Duration::seconds(5*60) { + + if let Some(peer) = peers.most_work_peer() { + if let Ok(p) = peer.try_read() { + debug!(LOGGER, "Header head before sumtree request: {} / {}", + header_head.height, header_head.last_block_h); + + // just to handle corner case of a too early start + if header_head.height > horizon { + + // ask for sumtree at horizon + let mut sumtree_head = chain.get_block_header(&header_head.prev_block_h).unwrap(); + for _ in 0..horizon-2 { + sumtree_head = chain.get_block_header(&sumtree_head.previous).unwrap(); + } + p.send_sumtrees_request(sumtree_head.height, sumtree_head.hash()); + prev_state_sync = current_time; + } + } + } + } + thread::sleep(Duration::from_secs(1)); } }); } @@ -199,20 +238,29 @@ fn request_headers( pub fn needs_syncing( currently_syncing: Arc, peers: Peers, - chain: Arc) -> bool { + chain: Arc, + header_only: bool) -> bool { - let local_diff = peers.total_difficulty(); + let local_diff = if header_only { + chain.total_header_difficulty().unwrap() + } else { + chain.total_difficulty() + }; let peer = peers.most_work_peer(); + // if we're already syncing, we're caught up if no peer has a higher // difficulty than us if currently_syncing.load(Ordering::Relaxed) { if let Some(peer) = peer { if let Ok(peer) = peer.try_read() { + debug!(LOGGER, "needs_syncing {} {} {}", local_diff, peer.info.total_difficulty, header_only); if peer.info.total_difficulty <= local_diff { info!(LOGGER, "synchronized at {:?} @ {:?}", local_diff, chain.head().unwrap().height); currently_syncing.store(false, Ordering::Relaxed); - let _ = chain.reset_head(); + if !header_only { + let _ = chain.reset_head(); + } } } } else { diff --git a/grin/src/types.rs b/grin/src/types.rs index b997d5d14..f2cf800e9 100644 --- a/grin/src/types.rs +++ b/grin/src/types.rs @@ -116,6 +116,9 @@ pub struct ServerConfig { #[serde(default)] pub chain_type: ChainTypes, + /// Whether this node is a full archival node or a fast-sync, pruned node + pub archive_mode: bool, + /// Method used to get the list of seed nodes for initial bootstrap. #[serde(default)] pub seeding_type: Seeding, @@ -155,6 +158,7 @@ impl Default for ServerConfig { p2p_config: p2p::P2PConfig::default(), mining_config: Some(pow::types::MinerConfig::default()), chain_type: ChainTypes::default(), + archive_mode: false, pool_config: pool::PoolConfig::default(), skip_sync_wait: Some(true), } diff --git a/grin/tests/simulnet.rs b/grin/tests/simulnet.rs index 34899c84e..507df2e73 100644 --- a/grin/tests/simulnet.rs +++ b/grin/tests/simulnet.rs @@ -247,7 +247,6 @@ fn simulate_full_sync() { util::init_test_logger(); // we actually set the chain_type in the ServerConfig below - // TODO - avoid needing to set it in two places? global::set_mining_mode(ChainTypes::AutomatedTesting); let test_name_dir = "grin-sync"; @@ -271,16 +270,56 @@ fn simulate_full_sync() { let s1 = grin::Server::new(config(0, "grin-sync")).unwrap(); // mine a few blocks on server 1 s1.start_miner(miner_config); - thread::sleep(time::Duration::from_secs(5)); + thread::sleep(time::Duration::from_secs(8)); let mut conf = config(1, "grin-sync"); - conf.skip_sync_wait = Some(false); let s2 = grin::Server::new(conf).unwrap(); while s2.head().height < 4 { thread::sleep(time::Duration::from_millis(100)); } } +/// Creates 2 different disconnected servers, mine a few blocks on one, connect +/// them and check that the 2nd gets all using fast sync algo +#[test] +fn simulate_fast_sync() { + util::init_test_logger(); + + // we actually set the chain_type in the ServerConfig below + global::set_mining_mode(ChainTypes::AutomatedTesting); + + let test_name_dir = "grin-fast"; + framework::clean_all_output(test_name_dir); + + let mut plugin_config = pow::types::CuckooMinerPluginConfig::default(); + let mut plugin_config_vec: Vec = Vec::new(); + plugin_config.type_filter = String::from("mean_cpu"); + plugin_config_vec.push(plugin_config); + + let miner_config = pow::types::MinerConfig { + enable_mining: true, + burn_reward: true, + use_cuckoo_miner: false, + cuckoo_miner_async_mode: Some(false), + cuckoo_miner_plugin_dir: Some(String::from("../target/debug/deps")), + cuckoo_miner_plugin_config: Some(plugin_config_vec), + ..Default::default() + }; + + let s1 = grin::Server::new(config(1000, "grin-fast")).unwrap(); + // mine a few blocks on server 1 + s1.start_miner(miner_config); + thread::sleep(time::Duration::from_secs(8)); + + let mut conf = config(1001, "grin-fast"); + conf.archive_mode = false; + conf.seeds = Some(vec!["127.0.0.1:12000".to_string()]); + let s2 = grin::Server::new(conf).unwrap(); + while s2.head().height != s2.header_head().height || s2.head().height < 20 { + thread::sleep(time::Duration::from_millis(1000)); + } +} + fn config(n: u16, test_name_dir: &str) -> grin::ServerConfig { grin::ServerConfig { api_http_addr: format!("127.0.0.1:{}", 19000 + n), @@ -292,6 +331,8 @@ fn config(n: u16, test_name_dir: &str) -> grin::ServerConfig { seeding_type: grin::Seeding::List, seeds: Some(vec!["127.0.0.1:11000".to_string()]), chain_type: core::global::ChainTypes::AutomatedTesting, + archive_mode: true, + skip_sync_wait: Some(true), ..Default::default() } } diff --git a/p2p/src/conn.rs b/p2p/src/conn.rs index 9dd366893..17a45f713 100644 --- a/p2p/src/conn.rs +++ b/p2p/src/conn.rs @@ -20,7 +20,9 @@ //! forces us to go through some additional gymnastic to loop over the async //! stream and make sure we get the right number of bytes out. -use std::io::{self, Write}; +use std::cmp; +use std::fs::File; +use std::io::{self, Read, Write}; use std::sync::{Arc, Mutex, mpsc}; use std::net::TcpStream; use std::thread; @@ -31,8 +33,10 @@ use msg::*; use types::*; use util::LOGGER; +/// A trait to be implemented in order to receive messages from the +/// connection. Allows providing an optional response. pub trait MessageHandler: Send + 'static { - fn consume(&self, msg: &mut Message) -> Result, Type)>, Error>; + fn consume<'a>(&self, msg: Message<'a>) -> Result>, Error>; } // Macro to simplify the boilerplate around asyn I/O error handling, @@ -52,6 +56,8 @@ macro_rules! try_break { } } +/// A message as received by the connection. Provides access to the message +/// header lazily consumes the message body, handling its deserialization. pub struct Message<'a> { pub header: MsgHeader, conn: &'a mut TcpStream, @@ -63,9 +69,67 @@ impl<'a> Message<'a> { Message{header, conn} } + /// Read the message body from the underlying connection pub fn body(&mut self) -> Result where T: ser::Readable { read_body(&self.header, self.conn) } + + pub fn copy_attachment(&mut self, len: usize, writer: &mut Write) -> Result<(), Error> { + let mut written = 0; + while written < len { + let read_len = cmp::min(8000, len - written); + let mut buf = vec![0u8; read_len]; + read_exact(&mut self.conn, &mut buf[..], 10000, true)?; + writer.write_all(&mut buf)?; + written += read_len; + } + Ok(()) + } + + /// Respond to the message with the provided message type and body + pub fn respond(self, resp_type: Type, body: T) -> Response<'a> + where + T: ser::Writeable + { + let body = ser::ser_vec(&body).unwrap(); + Response{ + resp_type: resp_type, + body: body, + conn: self.conn, + attachment: None, + } + } +} + +/// Response to a `Message` +pub struct Response<'a> { + resp_type: Type, + body: Vec, + conn: &'a mut TcpStream, + attachment: Option, +} + +impl<'a> Response<'a> { + fn write(mut self) -> Result<(), Error> { + let mut msg = ser::ser_vec(&MsgHeader::new(self.resp_type, self.body.len() as u64)).unwrap(); + msg.append(&mut self.body); + write_all(&mut self.conn, &msg[..], 10000)?; + if let Some(mut file) = self.attachment { + let mut buf = [0u8; 8000]; + loop { + match file.read(&mut buf[..]) { + Ok(0) => break, + Ok(n) => write_all(&mut self.conn, &buf[..n], 10000)?, + Err(e) => return Err(From::from(e)), + } + } + } + Ok(()) + } + + pub fn add_attachment(&mut self, file: File) { + self.attachment = Some(file); + } } // TODO count sent and received @@ -105,7 +169,7 @@ where let (error_tx, error_rx) = mpsc::channel(); stream.set_nonblocking(true).expect("Non-blocking IO not available."); - poll(stream, handler, send_rx, send_tx.clone(), error_tx, close_rx); + poll(stream, handler, send_rx, error_tx, close_rx); Tracker { sent_bytes: Arc::new(Mutex::new(0)), @@ -120,7 +184,6 @@ fn poll( conn: TcpStream, handler: H, send_rx: mpsc::Receiver>, - send_tx: mpsc::Sender>, error_tx: mpsc::Sender, close_rx: mpsc::Receiver<()> ) @@ -137,10 +200,10 @@ where loop { // check the read end if let Some(h) = try_break!(error_tx, read_header(conn)) { - let mut msg = Message::from_header(h, conn); + let msg = Message::from_header(h, conn); debug!(LOGGER, "Received message header, type {:?}, len {}.", msg.header.msg_type, msg.header.msg_len); - if let Some(Some((body, typ))) = try_break!(error_tx, handler.consume(&mut msg)) { - respond(&send_tx, typ, body); + if let Some(Some(resp)) = try_break!(error_tx, handler.consume(msg)) { + try_break!(error_tx, resp.write()); } } @@ -173,9 +236,3 @@ where } }); } - -fn respond(send_tx: &mpsc::Sender>, msg_type: Type, mut body: Vec) { - let mut msg = ser::ser_vec(&MsgHeader::new(msg_type, body.len() as u64)).unwrap(); - msg.append(&mut body); - send_tx.send(msg).unwrap(); -} diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index 1cae88fbc..63a5ca840 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -52,6 +52,6 @@ mod types; pub use serv::{Server, DummyAdapter}; pub use peers::Peers; pub use peer::Peer; -pub use types::{Capabilities, Error, ChainAdapter, P2PConfig, PeerInfo, MAX_BLOCK_HEADERS, - MAX_PEER_ADDRS}; +pub use types::{Capabilities, Error, ChainAdapter, SumtreesRead, P2PConfig, + PeerInfo, MAX_BLOCK_HEADERS, MAX_PEER_ADDRS}; pub use store::{PeerData, State}; diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index 05b993785..386c02af9 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -65,6 +65,8 @@ enum_from_primitive! { GetCompactBlock, CompactBlock, Transaction, + SumtreesRequest, + SumtreesArchive } } @@ -120,6 +122,34 @@ pub fn read_exact( Ok(()) } +/// Same as `read_exact` but for writing. +pub fn write_all(conn: &mut Write, mut buf: &[u8], timeout: u32) -> io::Result<()> { + + let sleep_time = time::Duration::from_millis(1); + let mut count = 0; + + while !buf.is_empty() { + match conn.write(buf) { + Ok(0) => return Err(io::Error::new(io::ErrorKind::WriteZero, + "failed to write whole buffer")), + Ok(n) => buf = &buf[n..], + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {} + Err(e) => return Err(e), + } + if !buf.is_empty() { + thread::sleep(sleep_time); + count += 1; + } else { + break; + } + if count > timeout { + return Err(io::Error::new(io::ErrorKind::TimedOut, "reading from tcp stream")); + } + } + Ok(()) +} + /// Read a header from the provided connection without blocking if the /// underlying stream is async. Typically headers will be polled for, so /// we do not want to block. @@ -571,7 +601,7 @@ impl Readable for Ping { Ok(h) => h, Err(_) => 0, }; -Ok(Ping { total_difficulty, height }) + Ok(Ping { total_difficulty, height }) } } @@ -605,3 +635,65 @@ impl Readable for Pong { Ok(Pong { total_difficulty, height }) } } + +/// Request to get an archive of the full sumtree store, required to sync +/// a new node. +pub struct SumtreesRequest { + /// Hash of the block for which the sumtrees should be provided + pub hash: Hash, + /// Height of the corresponding block + pub height: u64 +} + +impl Writeable for SumtreesRequest { + fn write(&self, writer: &mut W) -> Result<(), ser::Error> { + self.hash.write(writer)?; + writer.write_u64(self.height)?; + Ok(()) + } +} + +impl Readable for SumtreesRequest { + fn read(reader: &mut Reader) -> Result { + Ok(SumtreesRequest { + hash: Hash::read(reader)?, + height: reader.read_u64()?, + }) + } +} + +/// Response to a sumtree archive request, must include a zip stream of the +/// archive after the message body. +pub struct SumtreesArchive { + /// Hash of the block for which the sumtrees are provided + pub hash: Hash, + /// Height of the corresponding block + pub height: u64, + /// Output tree index the receiver should rewind to + pub rewind_to_output: u64, + /// Kernel tree index the receiver should rewind to + pub rewind_to_kernel: u64, + /// Size in bytes of the archive + pub bytes: u64, +} + +impl Writeable for SumtreesArchive { + fn write(&self, writer: &mut W) -> Result<(), ser::Error> { + self.hash.write(writer)?; + ser_multiwrite!(writer, [write_u64, self.height], + [write_u64, self.rewind_to_output], + [write_u64, self.rewind_to_kernel], + [write_u64, self.bytes]); + Ok(()) + } +} + +impl Readable for SumtreesArchive { + fn read(reader: &mut Reader) -> Result { + let hash = Hash::read(reader)?; + let (height, rewind_to_output, rewind_to_kernel, bytes) = + ser_multiread!(reader, read_u64, read_u64, read_u64, read_u64); + + Ok(SumtreesArchive {hash, height, rewind_to_output, rewind_to_kernel, bytes}) + } +} diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 73899a8e5..907ecf01e 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fs::File; use std::net::{SocketAddr, TcpStream}; use std::sync::{Arc, RwLock}; @@ -230,6 +231,13 @@ impl Peer { msg::Type::GetPeerAddrs) } + pub fn send_sumtrees_request(&self, height: u64, hash: Hash) -> Result<(), Error> { + debug!(LOGGER, "Asking {} for sumtree archive at {} {}.", + self.info.addr, height, hash); + self.connection.as_ref().unwrap().send( + &SumtreesRequest {hash, height }, msg::Type::SumtreesRequest) + } + /// Stops the peer, closing its connection pub fn stop(&self) { let _ = self.connection.as_ref().unwrap().close_channel.send(()); @@ -326,6 +334,17 @@ impl ChainAdapter for TrackingAdapter { fn get_block(&self, h: Hash) -> Option { self.adapter.get_block(h) } + + fn sumtrees_read(&self, h: Hash) -> Option { + self.adapter.sumtrees_read(h) + } + + fn sumtrees_write(&self, h: Hash, + rewind_to_output: u64, rewind_to_kernel: u64, + sumtree_data: File, peer_addr: SocketAddr) -> bool { + self.adapter.sumtrees_write(h, rewind_to_output, rewind_to_kernel, + sumtree_data, peer_addr) + } } impl NetAdapter for TrackingAdapter { diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index c537875bd..94af4159d 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::HashMap; +use std::fs::File; use std::net::SocketAddr; use std::sync::{Arc, RwLock}; @@ -482,6 +483,25 @@ impl ChainAdapter for Peers { fn get_block(&self, h: Hash) -> Option { self.adapter.get_block(h) } + fn sumtrees_read(&self, h: Hash) -> Option { + self.adapter.sumtrees_read(h) + } + fn sumtrees_write( + &self, + h: Hash, + rewind_to_output: u64, + rewind_to_kernel: u64, + sumtree_data: File, + peer_addr: SocketAddr, + ) -> bool { + if !self.adapter.sumtrees_write(h, rewind_to_output, rewind_to_kernel, + sumtree_data, peer_addr) { + self.ban_peer(&peer_addr); + false + } else { + true + } + } } impl NetAdapter for Peers { diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index 32af59e3e..3b3a4ab49 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -12,12 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; +use std::env; +use std::fs::File; use std::net::SocketAddr; +use std::sync::Arc; use core::core; use core::core::hash::{Hash, Hashed}; -use core::ser; use conn::*; use msg::*; use rand; @@ -37,7 +38,7 @@ impl Protocol { } impl MessageHandler for Protocol { - fn consume(&self, msg: &mut Message) -> Result, Type)>, Error> { + fn consume<'a>(&self, mut msg: Message<'a>) -> Result>, Error> { let adapter = &self.adapter; match msg.header.msg_type { @@ -46,13 +47,14 @@ impl MessageHandler for Protocol { let ping: Ping = msg.body()?; adapter.peer_difficulty(self.addr, ping.total_difficulty, ping.height); - let pong_bytes = ser::ser_vec( - &Pong { - total_difficulty: adapter.total_difficulty(), - height: adapter.total_height(), - }).unwrap(); - - Ok(Some((pong_bytes, Type::Pong))) + Ok(Some( + msg.respond( + Type::Pong, + Pong { + total_difficulty: adapter.total_difficulty(), + height: adapter.total_height(), + }) + )) } Type::Pong => { @@ -73,8 +75,7 @@ impl MessageHandler for Protocol { let bo = adapter.get_block(h); if let Some(b) = bo { - let block_bytes = ser::ser_vec(&b).unwrap(); - return Ok(Some((block_bytes, Type::Block))); + return Ok(Some(msg.respond(Type::Block, b))); } Ok(None) } @@ -111,11 +112,9 @@ impl MessageHandler for Protocol { "handle_payload: GetCompactBlock: empty block, sending full block", ); - let block_bytes = ser::ser_vec(&b).unwrap(); - Ok(Some((block_bytes, Type::Block))) + Ok(Some(msg.respond(Type::Block, b))) } else { - let compact_block_bytes = ser::ser_vec(&cb).unwrap(); - Ok(Some((compact_block_bytes, Type::CompactBlock))) + Ok(Some(msg.respond(Type::CompactBlock, cb))) } } else { Ok(None) @@ -137,8 +136,7 @@ impl MessageHandler for Protocol { let headers = adapter.locate_headers(loc.hashes); // serialize and send all the headers over - let header_bytes = ser::ser_vec(&Headers { headers: headers }).unwrap(); - return Ok(Some((header_bytes, Type::Headers))); + Ok(Some(msg.respond(Type::Headers, Headers { headers: headers }))) } // "header first" block propagation - if we have not yet seen this block @@ -162,11 +160,13 @@ impl MessageHandler for Protocol { Type::GetPeerAddrs => { let get_peers: GetPeerAddrs = msg.body()?; let peer_addrs = adapter.find_peer_addrs(get_peers.capabilities); - let peer_addrs_bytes = ser::ser_vec( - &PeerAddrs { - peers: peer_addrs.iter().map(|sa| SockAddr(*sa)).collect(), - }).unwrap(); - return Ok(Some((peer_addrs_bytes, Type::PeerAddrs))); + Ok(Some( + msg.respond( + Type::PeerAddrs, + PeerAddrs { + peers: peer_addrs.iter().map(|sa| SockAddr(*sa)).collect(), + }) + )) } Type::PeerAddrs => { @@ -175,6 +175,52 @@ impl MessageHandler for Protocol { Ok(None) } + Type::SumtreesRequest => { + let sm_req: SumtreesRequest = msg.body()?; + debug!(LOGGER, "handle_payload: sumtree req for {} at {}", + sm_req.hash, sm_req.height); + + let sumtrees = self.adapter.sumtrees_read(sm_req.hash); + + if let Some(sumtrees) = sumtrees { + let file_sz = sumtrees.reader.metadata()?.len(); + let mut resp = msg.respond( + Type::SumtreesArchive, + &SumtreesArchive { + height: sm_req.height as u64, + hash: sm_req.hash, + rewind_to_output: sumtrees.output_index, + rewind_to_kernel: sumtrees.kernel_index, + bytes: file_sz, + }); + resp.add_attachment(sumtrees.reader); + Ok(Some(resp)) + } else { + Ok(None) + } + } + + Type::SumtreesArchive => { + let sm_arch: SumtreesArchive = msg.body()?; + debug!(LOGGER, "handle_payload: sumtree archive for {} at {} rewind to {}/{}", + sm_arch.hash, sm_arch.height, + sm_arch.rewind_to_output, sm_arch.rewind_to_kernel); + + let mut tmp = env::temp_dir(); + tmp.push("sumtree.zip"); + { + let mut tmp_zip = File::create(tmp.clone())?; + msg.copy_attachment(sm_arch.bytes as usize, &mut tmp_zip)?; + tmp_zip.sync_all()?; + } + + let tmp_zip = File::open(tmp)?; + self.adapter.sumtrees_write( + sm_arch.hash, sm_arch.rewind_to_output, + sm_arch.rewind_to_kernel, tmp_zip, self.addr); + Ok(None) + } + _ => { debug!(LOGGER, "unknown message type {:?}", msg.header.msg_type); Ok(None) diff --git a/p2p/src/serv.rs b/p2p/src/serv.rs index 4008ba33e..fee2de5cb 100644 --- a/p2p/src/serv.rs +++ b/p2p/src/serv.rs @@ -12,8 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::{Arc, RwLock}; +use std::fs::File; use std::net::{TcpListener, TcpStream, SocketAddr, Shutdown}; +use std::sync::{Arc, RwLock}; use std::thread; use std::time::Duration; @@ -194,6 +195,15 @@ impl ChainAdapter for DummyAdapter { fn get_block(&self, _: Hash) -> Option { None } + fn sumtrees_read(&self, _h: Hash) -> Option { + unimplemented!() + } + + fn sumtrees_write(&self, _h: Hash, + _rewind_to_output: u64, _rewind_to_kernel: u64, + _sumtree_data: File, _peer_addr: SocketAddr) -> bool { + false + } } impl NetAdapter for DummyAdapter { diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 98c8bc710..02e297b4f 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::convert::From; +use std::fs::File; use std::io; use std::net::{IpAddr, SocketAddr}; use std::sync::mpsc; @@ -132,6 +133,17 @@ pub struct PeerInfo { pub total_difficulty: Difficulty, } +/// The full sumtree data along with indexes required for a consumer to +/// rewind to a consistant requested state. +pub struct SumtreesRead { + /// Output tree index the receiver should rewind to + pub output_index: u64, + /// Kernel tree index the receiver should rewind to + pub kernel_index: u64, + /// Binary stream for the sumtree zipped data + pub reader: File, +} + /// Bridge between the networking layer and the rest of the system. Handles the /// forwarding or querying of blocks and transactions from the network among /// other things. @@ -167,6 +179,19 @@ pub trait ChainAdapter: Sync + Send { /// Gets a full block by its hash. fn get_block(&self, h: Hash) -> Option; + + /// Provides a reading view into the current sumtree state as well as + /// the required indexes for a consumer to rewind to a consistant state + /// at the provided block hash. + fn sumtrees_read(&self, h: Hash) -> Option; + + /// Writes a reading view on a sumtree state that's been provided to us. + /// If we're willing to accept that new state, the data stream will be + /// read as a zip file, unzipped and the resulting state files should be + /// rewound to the provided indexes. + fn sumtrees_write(&self, h: Hash, + rewind_to_output: u64, rewind_to_kernel: u64, + sumtree_data: File, peer_addr: SocketAddr) -> bool; } /// Additional methods required by the protocol that don't need to be diff --git a/pool/src/pool.rs b/pool/src/pool.rs index 7d3d8cffa..49c246f6b 100644 --- a/pool/src/pool.rs +++ b/pool/src/pool.rs @@ -17,11 +17,12 @@ use std::sync::Arc; use std::collections::{HashMap, HashSet}; -use core::core::transaction; -use core::core::OutputIdentifier; -use core::core::{block, hash}; use util::secp::pedersen::Commitment; +use core::core::transaction; +use core::core::{block, hash, OutputIdentifier}; +use core::global; + use types::*; pub use graph; diff --git a/store/src/sumtree.rs b/store/src/sumtree.rs index daf34710d..2f9368105 100644 --- a/store/src/sumtree.rs +++ b/store/src/sumtree.rs @@ -18,6 +18,7 @@ use memmap; use std::cmp; use std::fs::{self, File, OpenOptions}; use std::io::{self, BufRead, BufReader, ErrorKind, Write}; +use std::marker::PhantomData; use std::os::unix::io::AsRawFd; use std::path::Path; use std::io::Read; @@ -27,7 +28,7 @@ use libc::{ftruncate64, off64_t}; #[cfg(not(any(target_os = "linux", target_os = "android")))] use libc::{ftruncate as ftruncate64, off_t as off64_t}; -use core::core::pmmr::{self, Backend, HashSum, Summable, VecBackend}; +use core::core::pmmr::{self, Backend, HashSum, Summable}; use core::ser; use util::LOGGER; @@ -46,15 +47,18 @@ pub const RM_LOG_MAX_NODES: usize = 10000; /// Despite being append-only, the file can still be pruned and truncated. The /// former simply happens by rewriting it, ignoring some of the data. The /// latter by truncating the underlying file and re-creating the mmap. -struct AppendOnlyFile { +pub struct AppendOnlyFile { path: String, file: File, mmap: Option, + buffer_start: usize, + buffer: Vec, + buffer_start_bak: usize, } impl AppendOnlyFile { /// Open a file (existing or not) as append-only, backed by a mmap. - fn open(path: String) -> io::Result { + pub fn open(path: String) -> io::Result { let file = OpenOptions::new() .read(true) .append(true) @@ -64,31 +68,68 @@ impl AppendOnlyFile { path: path.clone(), file: file, mmap: None, + buffer_start: 0, + buffer: vec![], + buffer_start_bak: 0, }; if let Ok(sz) = aof.size() { if sz > 0 { - aof.sync()?; + aof.buffer_start = sz as usize; + aof.mmap = Some(unsafe { memmap::Mmap::map(&aof.file)? }); } } Ok(aof) } - /// Append data to the file. - fn append(&mut self, buf: &[u8]) -> io::Result<()> { - self.file.write_all(buf) + /// Append data to the file. Until the append-only file is synced, data is + /// only written to memory. + pub fn append(&mut self, buf: &mut Vec) { + self.buffer.append(buf); + } + + /// Rewinds the data file back to a lower position. The new position needs + /// to be the one of the first byte the next time data is appended. + pub fn rewind(&mut self, pos: u64) { + if self.buffer_start_bak > 0 || self.buffer.len() > 0 { + panic!("Can't rewind on a dirty state."); + } + self.buffer_start_bak = self.buffer_start; + self.buffer_start = pos as usize; } /// Syncs all writes (fsync), reallocating the memory map to make the newly /// written data accessible. - fn sync(&mut self) -> io::Result<()> { + pub fn flush(&mut self) -> io::Result<()> { + if self.buffer_start_bak > 0 { + // flushing a rewound state, we need to truncate before applying + self.truncate(self.buffer_start)?; + self.buffer_start_bak = 0; + } + self.buffer_start += self.buffer.len(); + self.file.write(&self.buffer[..])?; self.file.sync_data()?; + self.buffer = vec![]; self.mmap = Some(unsafe { memmap::Mmap::map(&self.file)? }); Ok(()) } + /// Discard the current non-flushed data. + pub fn discard(&mut self) { + if self.buffer_start_bak > 0 { + // discarding a rewound state, restore the buffer start + self.buffer_start = self.buffer_start_bak; + self.buffer_start_bak = 0; + } + self.buffer = vec![]; + } + /// Read length bytes of data at offset from the file. Leverages the memory /// map. fn read(&self, offset: usize, length: usize) -> Vec { + if offset >= self.buffer_start { + let offset = offset - self.buffer_start; + return self.buffer[offset..(offset+length)].to_vec(); + } if let None = self.mmap { return vec![]; } @@ -96,6 +137,17 @@ impl AppendOnlyFile { (&mmap[offset..(offset + length)]).to_vec() } + /// Truncates the underlying file to the provided offset + fn truncate(&self, offs: usize) -> io::Result<()> { + let fd = self.file.as_raw_fd(); + let res = unsafe { ftruncate64(fd, offs as off64_t) }; + if res == -1 { + Err(io::Error::last_os_error()) + } else { + Ok(()) + } + } + /// Saves a copy of the current file content, skipping data at the provided /// prune indices. The prune Vec must be ordered. fn save_prune(&self, target: String, prune_offs: Vec, prune_len: u64) -> io::Result<()> { @@ -116,7 +168,7 @@ impl AppendOnlyFile { } as u64; // write the buffer, except if we prune offsets in the current span, - // in which case we skip + // in which case we skip let mut buf_start = 0; while prune_offs[prune_pos] >= read && prune_offs[prune_pos] < read + len { let prune_at = prune_offs[prune_pos] as usize; @@ -135,21 +187,15 @@ impl AppendOnlyFile { } } - /// Truncates the underlying file to the provided offset - fn truncate(&self, offs: u64) -> io::Result<()> { - let fd = self.file.as_raw_fd(); - let res = unsafe { ftruncate64(fd, offs as off64_t) }; - if res == -1 { - Err(io::Error::last_os_error()) - } else { - Ok(()) - } - } - /// Current size of the file in bytes. fn size(&self) -> io::Result { fs::metadata(&self.path).map(|md| md.len()) } + + /// Path of the underlying file + pub fn path(&self) -> String { + self.path.clone() + } } /// Log file fully cached in memory containing all positions that should be @@ -181,11 +227,12 @@ impl RemoveLog { } /// Truncate and empties the remove log. - fn truncate(&mut self, last_offs: u32) -> io::Result<()> { + fn rewind(&mut self, last_offs: u32) -> io::Result<()> { // simplifying assumption: we always remove older than what's in tmp self.removed_tmp = vec![]; - // DEBUG - let _ = self.flush_truncate(last_offs); + + // backing it up before truncating + self.removed_bak = self.removed.clone(); // backing it up before truncating self.removed_bak = self.removed.clone(); @@ -202,15 +249,6 @@ impl RemoveLog { Ok(()) } - // DEBUG: saves the remove log to the side before each truncate - fn flush_truncate(&mut self, last_offs: u32) -> io::Result<()> { - let mut file = File::create(format!("{}.{}", self.path.clone(), last_offs))?; - for elmt in &self.removed { - file.write_all(&ser::ser_vec(&elmt).unwrap()[..])?; - } - file.sync_data() - } - /// Append a set of new positions to the remove log. Both adds those /// positions the ordered in-memory set and to the file. fn append(&mut self, elmts: Vec, index: u32) -> io::Result<()> { @@ -291,12 +329,7 @@ where hashsum_file: AppendOnlyFile, remove_log: RemoveLog, pruned_nodes: pmmr::PruneList, - // buffers addition of new elements until they're fully written to disk - buffer: VecBackend, - buffer_index: usize, - // whether a rewind occurred since last flush, the rewind position, index - // and buffer index are captured - rewind: Option<(u64, u32, usize)>, + phantom: PhantomData, } impl Backend for PMMRBackend @@ -306,25 +339,19 @@ where /// Append the provided HashSums to the backend storage. #[allow(unused_variables)] fn append(&mut self, position: u64, data: Vec>) -> Result<(), String> { - self.buffer - .append(position - (self.buffer_index as u64), data.clone())?; + for d in data { + self.hashsum_file.append(&mut ser::ser_vec(&d).unwrap()); + } Ok(()) } /// Get a HashSum by insertion position fn get(&self, position: u64) -> Option> { - // First, check if it's in our temporary write buffer - let pos_sz = position as usize; - if pos_sz > self.buffer_index && pos_sz - 1 < self.buffer_index + self.buffer.len() { - return self.buffer.get((pos_sz - self.buffer_index) as u64); - } - - // Second, check if this position has been pruned in the remove log + // Check if this position has been pruned in the remove log or the + // pruned list if self.remove_log.includes(position) { return None; } - - // Third, check if it's in the pruned list or its offset let shift = self.pruned_nodes.get_shift(position); if let None = shift { return None; @@ -351,26 +378,19 @@ where } fn rewind(&mut self, position: u64, index: u32) -> Result<(), String> { - assert!(self.buffer.len() == 0, "Rewind on non empty buffer."); self.remove_log - .truncate(index) + .rewind(index) .map_err(|e| format!("Could not truncate remove log: {}", e))?; - self.rewind = Some((position, index, self.buffer_index)); - self.buffer_index = position as usize; + + let shift = self.pruned_nodes.get_shift(position).unwrap_or(0); + let record_len = 32 + T::sum_len(); + let file_pos = (position - shift) * (record_len as u64); + self.hashsum_file.rewind(file_pos); Ok(()) } /// Remove HashSums by insertion position fn remove(&mut self, positions: Vec, index: u32) -> Result<(), String> { - if self.buffer.used_size() > 0 { - for position in &positions { - let pos_sz = *position as usize; - if pos_sz > self.buffer_index && pos_sz - 1 < self.buffer_index + self.buffer.len() - { - self.buffer.remove(vec![*position - self.buffer_index as u64], index).unwrap(); - } - } - } self.remove_log.append(positions, index).map_err(|e| { format!("Could not write to log storage, disk full? {:?}", e) }) @@ -385,8 +405,6 @@ where /// store its files. pub fn new(data_dir: String) -> io::Result> { let hs_file = AppendOnlyFile::open(format!("{}/{}", data_dir, PMMR_DATA_FILE))?; - let sz = hs_file.size()?; - let record_len = 32 + T::sum_len(); let rm_log = RemoveLog::open(format!("{}/{}", data_dir, PMMR_RM_LOG_FILE))?; let prune_list = read_ordered_vec(format!("{}/{}", data_dir, PMMR_PRUNED_FILE), 8)?; @@ -394,12 +412,10 @@ where data_dir: data_dir, hashsum_file: hs_file, remove_log: rm_log, - buffer: VecBackend::new(), - buffer_index: (sz as usize) / record_len, pruned_nodes: pmmr::PruneList { pruned_nodes: prune_list, }, - rewind: None, + phantom: PhantomData, }) } @@ -415,45 +431,21 @@ where /// Syncs all files to disk. A call to sync is required to ensure all the /// data has been successfully written to disk. pub fn sync(&mut self) -> io::Result<()> { - // truncating the storage file if a rewind occurred - let record_len = 32 + T::sum_len() as u64; - if let Some((pos, _, _)) = self.rewind { - self.hashsum_file.truncate(pos * record_len)?; - } - - for elem in &self.buffer.elems { - let res = if let Some(ref hs) = *elem { - self.hashsum_file.append(&ser::ser_vec(&hs).unwrap()[..]) - } else { - // the element has alredy been pruned in the buffer, we just insert - // zeros until compaction to avoid wrong hashum store offsets - self.hashsum_file.append(&vec![0; record_len as usize]) - }; - - if let Err(e) = res { - return Err(io::Error::new( + if let Err(e) = self.hashsum_file.flush() { + return Err(io::Error::new( io::ErrorKind::Interrupted, format!("Could not write to log storage, disk full? {:?}", e), - )); - } + )); } - self.buffer_index = self.buffer_index + self.buffer.len(); - self.buffer.clear(); self.remove_log.flush()?; - self.hashsum_file.sync()?; - self.rewind = None; Ok(()) } /// Discard the current, non synced state of the backend. pub fn discard(&mut self) { - if let Some((_, _, bi)) = self.rewind { - self.buffer_index = bi; - } - self.buffer.clear(); + self.hashsum_file.discard(); self.remove_log.discard(); - self.rewind = None; } /// Checks the length of the remove log to see if it should get compacted. @@ -475,7 +467,7 @@ where } // 0. validate none of the nodes in the rm log are in the prune list (to - // avoid accidental double compaction) + // avoid accidental double compaction) for pos in &self.remove_log.removed[..] { if let None = self.pruned_nodes.pruned_pos(pos.0) { // TODO we likely can recover from this by directly jumping to 3 @@ -489,7 +481,7 @@ where } // 1. save hashsum file to a compact copy, skipping data that's in the - // remove list + // remove list let tmp_prune_file = format!("{}/{}.prune", self.data_dir, PMMR_DATA_FILE); let record_len = (32 + T::sum_len()) as u64; let to_rm = self.remove_log @@ -518,10 +510,9 @@ where format!("{}/{}", self.data_dir, PMMR_DATA_FILE), )?; self.hashsum_file = AppendOnlyFile::open(format!("{}/{}", self.data_dir, PMMR_DATA_FILE))?; - self.hashsum_file.sync()?; // 4. truncate the rm log - self.remove_log.truncate(0)?; + self.remove_log.rewind(0)?; self.remove_log.flush()?; Ok(()) diff --git a/util/Cargo.toml b/util/Cargo.toml index a45b65ebd..b53449a44 100644 --- a/util/Cargo.toml +++ b/util/Cargo.toml @@ -16,3 +16,5 @@ serde = "~1.0.8" serde_derive = "~1.0.8" secp256k1zkp = { git = "https://github.com/mimblewimble/rust-secp256k1-zkp", tag="grin_integration_7" } #secp256k1zkp = { path = "../../rust-secp256k1-zkp" } +walkdir = "^2.0.1" +zip = "^0.2.6" diff --git a/util/src/lib.rs b/util/src/lib.rs index 30b921b69..d9d26ad88 100644 --- a/util/src/lib.rs +++ b/util/src/lib.rs @@ -34,6 +34,8 @@ extern crate lazy_static; extern crate serde; #[macro_use] extern crate serde_derive; +extern crate walkdir; +extern crate zip as zip_rs; // Re-export so only has to be included once pub extern crate secp256k1zkp as secp_; @@ -59,6 +61,9 @@ use byteorder::{BigEndian, ByteOrder}; mod hex; pub use hex::*; +/// Compress and decompress zip bz2 archives +pub mod zip; + /// Encapsulation of a RefCell> for one-time initialization after /// construction. This implementation will purposefully fail hard if not used /// properly, for example if it's not initialized before being first used diff --git a/util/src/zip.rs b/util/src/zip.rs new file mode 100644 index 000000000..e5fcd9085 --- /dev/null +++ b/util/src/zip.rs @@ -0,0 +1,94 @@ +// Copyright 2017 The Grin Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// Wrappers around the `zip-rs` library to compress and decompress zip +/// bzip2 archives. + +use std::io; +use std::path::Path; +use std::fs::{self, File}; +use walkdir::WalkDir; + +use zip_rs; +use zip_rs::result::{ZipResult, ZipError}; +use zip_rs::write::FileOptions; + +/// Compress a source directory recursively into a zip file using the +/// bzip2 format. Permissions are set to 644 by default to avoid any +/// unwanted execution bits. +pub fn compress(src_dir: &Path, dst_file: &File) -> ZipResult<()> { + if !Path::new(src_dir).is_dir() { + return Err(ZipError::Io( + io::Error::new(io::ErrorKind::Other, "Source must be a directory."))); + } + + let options = FileOptions::default() + .compression_method(zip_rs::CompressionMethod::Bzip2) + .unix_permissions(0o644); + + let mut zip = zip_rs::ZipWriter::new(dst_file); + let walkdir = WalkDir::new(src_dir.to_str().unwrap()); + let it = walkdir.into_iter(); + + for dent in it.filter_map(|e| e.ok()) { + let path = dent.path(); + let name = path.strip_prefix(Path::new(src_dir)) + .unwrap() + .to_str() + .unwrap(); + + if path.is_file() { + zip.start_file(name, options)?; + let mut f = File::open(path)?; + io::copy(&mut f, &mut zip)?; + } + } + + zip.finish()?; + dst_file.sync_all()?; + Ok(()) +} + +/// Decompress a source file into the provided destination path. +pub fn decompress(src_file: R, dest: &Path) -> ZipResult<()> where R: io::Read + io::Seek { + let mut archive = zip_rs::ZipArchive::new(src_file)?; + + for i in 0..archive.len() { + let mut file = archive.by_index(i)?; + let file_path = dest.join(file.name()); + + if (&*file.name()).ends_with('/') { + fs::create_dir_all(&file_path)?; + } else { + if let Some(p) = file_path.parent() { + if !p.exists() { + fs::create_dir_all(&p)?; + } + } + let mut outfile = fs::File::create(&file_path)?; + io::copy(&mut file, &mut outfile)?; + } + + // Get and Set permissions + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + if let Some(mode) = file.unix_mode() { + fs::set_permissions(&file_path.to_str().unwrap(), PermissionsExt::from_mode(mode))?; + } + } + + } + Ok(()) +}