From 22c521eec85cc82b6cd662fa0be8cc464d805feb Mon Sep 17 00:00:00 2001 From: Ignotus Peverell Date: Fri, 9 Feb 2018 22:32:16 +0000 Subject: [PATCH] [WIP] Abridged sync (#440) * Util to zip and unzip directories * First pass at sumtree request/response. Add message types, implement the exchange in the protocol, zip up the sumtree directory and stream the file over, with necessary adapter hooks. * Implement the sumtree archive receive logicGets the sumtree archive data stream from the network and write it to a file. Unzip the file, place it at the right spot and reconstruct the sumtree data structure, rewinding where to the right spot. * Sumtree hash structure validation * Simplify sumtree backend buffering logic. The backend for a sumtree has to implement some in-memory buffering logic to provide a commit/rollback interface. The backend itself is an aggregate of 3 underlying storages (an append only file, a remove log and a skip list). The buffering was previously implemented both by the backend and some of the underlying storages. Now pushing back all buffering logic to the storages to keep the backend simpler. * Add kernel append only store file to sumtrees. The chain sumtrees structure now also saves all kernels to a dedicated file. As that storage is implemented by the append only file wrapper, it's also rewind-aware. * Full state validation. Checks that: - MMRs are sane (hash and sum each node) - Tree roots match the corresponding header - Kernel signatures are valid - Sum of all kernel excesses equals the sum of UTXO commitments minus the supply * Fast sync handoff to body sync. Once the fast-sync state is fully setup, get bacj in body sync mode to get the full bodies of the last blocks we're missing. * First fully working fast sync * Facility in p2p conn to deal with attachments (raw binary after message). * Re-introduced sumtree send and receive message handling using the above. * Fixed test and finished updating all required db state after sumtree validation. * Massaged a little bit the pipeline orphan check to still work after the new sumtrees have been setup. * Various cleanup. Consolidated fast sync and full sync into a single function as they're very similar. Proper conditions to trigger a sumtree request and some checks on receiving it. --- chain/src/chain.rs | 128 +++++++++++++--- chain/src/pipe.rs | 26 ++-- chain/src/store.rs | 10 ++ chain/src/sumtree.rs | 250 +++++++++++++++++++++++++++---- chain/src/types.rs | 16 +- chain/tests/mine_simple_chain.rs | 1 + config/src/types.rs | 1 + core/src/core/pmmr.rs | 47 +++++- core/src/core/transaction.rs | 9 +- core/src/global.rs | 17 ++- grin/src/adapters.rs | 48 +++++- grin/src/server.rs | 6 + grin/src/sync.rs | 76 ++++++++-- grin/src/types.rs | 4 + grin/tests/simulnet.rs | 47 +++++- p2p/src/conn.rs | 83 ++++++++-- p2p/src/lib.rs | 4 +- p2p/src/msg.rs | 94 +++++++++++- p2p/src/peer.rs | 19 +++ p2p/src/peers.rs | 20 +++ p2p/src/protocol.rs | 92 +++++++++--- p2p/src/serv.rs | 12 +- p2p/src/types.rs | 25 ++++ pool/src/pool.rs | 7 +- store/src/sumtree.rs | 189 +++++++++++------------ util/Cargo.toml | 2 + util/src/lib.rs | 5 + util/src/zip.rs | 94 ++++++++++++ 28 files changed, 1104 insertions(+), 228 deletions(-) create mode 100644 util/src/zip.rs diff --git a/chain/src/chain.rs b/chain/src/chain.rs index a66f8aa42..c339e3d22 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -16,13 +16,16 @@ //! and mostly the chain pipeline. use std::collections::HashMap; +use std::fs::File; use std::sync::{Arc, Mutex, RwLock}; use std::time::{Duration, Instant}; use util::secp::pedersen::RangeProof; use core::core::{Input, OutputIdentifier, SumCommit}; +use core::core::hash::Hashed; use core::core::pmmr::{HashSum, NoSum}; +use core::global; use core::core::{Block, BlockHeader, TxKernel}; use core::core::target::Difficulty; @@ -112,6 +115,7 @@ impl OrphanBlockPool { /// the current view of the UTXO set according to the chain state. Also /// maintains locking for the pipeline to avoid conflicting processing. pub struct Chain { + db_root: String, store: Arc, adapter: Arc, @@ -183,9 +187,10 @@ impl Chain { ); let store = Arc::new(chain_store); - let sumtrees = sumtree::SumTrees::open(db_root, store.clone())?; + let sumtrees = sumtree::SumTrees::open(db_root.clone(), store.clone())?; Ok(Chain { + db_root: db_root, store: store, adapter: adapter, head: Arc::new(Mutex::new(head)), @@ -194,25 +199,26 @@ impl Chain { pow_verifier: pow_verifier, }) } -/// Processes a single block, then checks for orphans, processing -/// those as well if they're found -pub fn process_block(&self, b: Block, opts: Options) --> Result<(Option, Option), Error> -{ - let res = self.process_block_no_orphans(b, opts); - match res { - Ok((t, b)) => { - // We accepted a block, so see if we can accept any orphans - if b.is_some() { - self.check_orphans(&b.clone().unwrap()); + + /// Processes a single block, then checks for orphans, processing + /// those as well if they're found + pub fn process_block(&self, b: Block, opts: Options) + -> Result<(Option, Option), Error> + { + let res = self.process_block_no_orphans(b, opts); + match res { + Ok((t, b)) => { + // We accepted a block, so see if we can accept any orphans + if let Some(ref b) = b { + self.check_orphans(b.hash()); + } + Ok((t, b)) + }, + Err(e) => { + Err(e) + } } - Ok((t, b)) - }, - Err(e) => { - Err(e) } - } -} /// Attempt to add a new block to the chain. Returns the new chain tip if it /// has been added to the longest chain, None if it's added to an (as of @@ -348,13 +354,12 @@ pub fn process_block(&self, b: Block, opts: Options) /// Check for orphans, once a block is successfully added - pub fn check_orphans(&self, block: &Block) { + pub fn check_orphans(&self, mut last_block_hash: Hash) { debug!( LOGGER, "chain: check_orphans: # orphans {}", self.orphans.len(), ); - let mut last_block_hash = block.hash(); // Is there an orphan in our orphans that we can now process? // We just processed the given block, are there any orphans that have this block // as their "previous" block? @@ -390,6 +395,14 @@ pub fn process_block(&self, b: Block, opts: Options) sumtrees.is_unspent(output_ref) } + pub fn validate(&self) -> Result<(), Error> { + let header = self.store.head_header()?; + let mut sumtrees = self.sumtrees.write().unwrap(); + sumtree::extending(&mut sumtrees, |extension| { + extension.validate(&header) + }) + } + /// Check if the input has matured sufficiently for the given block height. /// This only applies to inputs spending coinbase outputs. /// An input spending a non-coinbase output will always pass this check. @@ -432,6 +445,76 @@ pub fn process_block(&self, b: Block, opts: Options) sumtrees.roots() } + /// Provides a reading view into the current sumtree state as well as + /// the required indexes for a consumer to rewind to a consistent state + /// at the provided block hash. + pub fn sumtrees_read(&self, h: Hash) -> Result<(u64, u64, File), Error> { + let b = self.get_block(&h)?; + + // get the indexes for the block + let out_index: u64; + let kernel_index: u64; + { + let sumtrees = self.sumtrees.read().unwrap(); + let (oi, ki) = sumtrees.indexes_at(&b)?; + out_index = oi; + kernel_index = ki; + } + + // prepares the zip and return the corresponding Read + let sumtree_reader = sumtree::zip_read(self.db_root.clone())?; + Ok((out_index, kernel_index, sumtree_reader)) + } + + /// Writes a reading view on a sumtree state that's been provided to us. + /// If we're willing to accept that new state, the data stream will be + /// read as a zip file, unzipped and the resulting state files should be + /// rewound to the provided indexes. + pub fn sumtrees_write( + &self, + h: Hash, + rewind_to_output: u64, + rewind_to_kernel: u64, + sumtree_data: File + ) -> Result<(), Error> { + + let head = self.head().unwrap(); + let header_head = self.get_header_head().unwrap(); + if header_head.height - head.height < global::cut_through_horizon() as u64 { + return Err(Error::InvalidSumtree("not needed".to_owned())); + } + + let header = self.store.get_block_header(&h)?; + sumtree::zip_write(self.db_root.clone(), sumtree_data)?; + + let mut sumtrees = sumtree::SumTrees::open(self.db_root.clone(), self.store.clone())?; + sumtree::extending(&mut sumtrees, |extension| { + extension.rewind_pos(header.height, rewind_to_output, rewind_to_kernel)?; + extension.validate(&header)?; + // TODO validate kernels and their sums with UTXOs + extension.rebuild_index()?; + Ok(()) + })?; + + // replace the chain sumtrees with the newly built one + { + let mut sumtrees_ref = self.sumtrees.write().unwrap(); + *sumtrees_ref = sumtrees; + } + + // setup new head + { + let mut head = self.head.lock().unwrap(); + *head = Tip::from_block(&header); + self.store.save_body_head(&head); + self.store.save_header_height(&header)?; + } + + self.check_orphans(header.hash()); + + Ok(()) + } + /// returns the last n nodes inserted into the utxo sum tree pub fn get_last_n_utxo(&self, distance: u64) -> Vec> { let mut sumtrees = self.sumtrees.write().unwrap(); @@ -455,6 +538,11 @@ pub fn process_block(&self, b: Block, opts: Options) self.head.lock().unwrap().clone().total_difficulty } + /// Total difficulty at the head of the header chain + pub fn total_header_difficulty(&self) -> Result { + Ok(self.store.get_header_head()?.total_difficulty) + } + /// Reset header_head and sync_head to head of current body chain pub fn reset_head(&self) -> Result<(), Error> { self.store diff --git a/chain/src/pipe.rs b/chain/src/pipe.rs index caee993ea..d8b3aead2 100644 --- a/chain/src/pipe.rs +++ b/chain/src/pipe.rs @@ -64,19 +64,23 @@ pub fn process_block(b: &Block, mut ctx: BlockContext) -> Result, Er validate_header(&b.header, &mut ctx)?; - // valid header, now check we actually have the previous block in the store + // valid header, now check we actually have the previous block in the store // not just the header but the block itself - // we cannot assume we can use the chain head for this as we may be dealing with a fork - // we cannot use heights here as the fork may have jumped in height - match ctx.store.get_block(&b.header.previous) { - Ok(_) => {}, - Err(grin_store::Error::NotFoundErr) => { - return Err(Error::Orphan); - }, - Err(e) => { - return Err(Error::StoreErr(e, "pipe get previous".to_owned())); + // short circuit the test first both for performance (in-mem vs db access) + // but also for the specific case of the first fast sync full block + if b.header.previous != ctx.head.last_block_h { + // we cannot assume we can use the chain head for this as we may be dealing with a fork + // we cannot use heights here as the fork may have jumped in height + match ctx.store.block_exists(&b.header.previous) { + Ok(true) => {}, + Ok(false) => { + return Err(Error::Orphan); + }, + Err(e) => { + return Err(Error::StoreErr(e, "pipe get previous".to_owned())); + } } - }; + } // valid header and we have a previous block, time to take the lock on the sum trees let local_sumtrees = ctx.sumtrees.clone(); diff --git a/chain/src/store.rs b/chain/src/store.rs index 05120cc5d..c0765c440 100644 --- a/chain/src/store.rs +++ b/chain/src/store.rs @@ -24,6 +24,7 @@ use core::core::{Block, BlockHeader}; use core::consensus::TargetError; use core::core::target::Difficulty; use grin_store::{self, option_to_not_found, to_key, Error, u64_to_key}; +use util::LOGGER; const STORE_SUBPATH: &'static str = "chain"; @@ -98,6 +99,10 @@ impl ChainStore for ChainKVStore { option_to_not_found(self.db.get_ser(&to_key(BLOCK_PREFIX, &mut h.to_vec()))) } + fn block_exists(&self, h: &Hash) -> Result { + self.db.exists(&to_key(BLOCK_PREFIX, &mut h.to_vec())) + } + fn get_block_header(&self, h: &Hash) -> Result { option_to_not_found( self.db.get_ser(&to_key(BLOCK_HEADER_PREFIX, &mut h.to_vec())), @@ -139,6 +144,11 @@ impl ChainStore for ChainKVStore { option_to_not_found(self.db.get_ser(&u64_to_key(HEADER_HEIGHT_PREFIX, height))) } + fn save_header_height(&self, bh: &BlockHeader) -> Result<(), Error> { + self.db + .put_ser(&u64_to_key(HEADER_HEIGHT_PREFIX, bh.height), bh) + } + fn delete_header_by_height(&self, height: u64) -> Result<(), Error> { self.db.delete(&u64_to_key(HEADER_HEIGHT_PREFIX, height)) } diff --git a/chain/src/sumtree.rs b/chain/src/sumtree.rs index 0a47a4a2c..96f8d412e 100644 --- a/chain/src/sumtree.rs +++ b/chain/src/sumtree.rs @@ -17,23 +17,31 @@ use std::fs; use std::collections::HashMap; -use std::path::Path; +use std::fs::File; +use std::ops::Deref; +use std::path::{Path, PathBuf}; use std::sync::Arc; -use core::core::{Block, SumCommit, Input, Output, OutputIdentifier, TxKernel, OutputFeatures}; -use core::core::pmmr::{HashSum, NoSum, Summable, PMMR}; +use util::{secp, static_secp_instance}; +use util::secp::pedersen::{RangeProof, Commitment}; + +use core::consensus::reward; +use core::core::{Block, BlockHeader, SumCommit, Input, Output, OutputIdentifier, OutputFeatures, TxKernel}; +use core::core::pmmr::{self, HashSum, NoSum, Summable, PMMR}; use core::core::hash::Hashed; +use core::ser::{self, Readable}; use grin_store; -use grin_store::sumtree::PMMRBackend; +use grin_store::sumtree::{PMMRBackend, AppendOnlyFile}; use types::ChainStore; use types::Error; -use util::LOGGER; -use util::secp::pedersen::{RangeProof, Commitment}; +use util::{LOGGER, zip}; const SUMTREES_SUBDIR: &'static str = "sumtrees"; const UTXO_SUBDIR: &'static str = "utxo"; const RANGE_PROOF_SUBDIR: &'static str = "rangeproof"; const KERNEL_SUBDIR: &'static str = "kernel"; +const KERNEL_FILE: &'static str = "kernel_full_data.bin"; +const SUMTREES_ZIP: &'static str = "sumtrees_snapshot.zip"; struct PMMRHandle where @@ -68,10 +76,14 @@ where /// guaranteed to indicate whether an output is spent or not. The index /// may have commitments that have already been spent, even with /// pruning enabled. +/// +/// In addition of the sumtrees, this maintains the full list of kernel +/// data so it can be easily packaged for sync or validation. pub struct SumTrees { output_pmmr_h: PMMRHandle, rproof_pmmr_h: PMMRHandle>, kernel_pmmr_h: PMMRHandle>, + kernel_file: AppendOnlyFile, // chain store used as index of commitments to MMR positions commit_index: Arc, @@ -80,10 +92,16 @@ pub struct SumTrees { impl SumTrees { /// Open an existing or new set of backends for the SumTrees pub fn open(root_dir: String, commit_index: Arc) -> Result { + let mut kernel_file_path: PathBuf = [&root_dir, SUMTREES_SUBDIR, KERNEL_SUBDIR].iter().collect(); + fs::create_dir_all(kernel_file_path.clone())?; + kernel_file_path.push(KERNEL_FILE); + let kernel_file = AppendOnlyFile::open(kernel_file_path.to_str().unwrap().to_owned())?; + Ok(SumTrees { output_pmmr_h: PMMRHandle::new(root_dir.clone(), UTXO_SUBDIR)?, rproof_pmmr_h: PMMRHandle::new(root_dir.clone(), RANGE_PROOF_SUBDIR)?, kernel_pmmr_h: PMMRHandle::new(root_dir.clone(), KERNEL_SUBDIR)?, + kernel_file: kernel_file, commit_index: commit_index, }) } @@ -163,6 +181,11 @@ impl SumTrees { kernel_pmmr.get_last_n_insertions(distance) } + /// Output and kernel MMR indexes at the end of the provided block + pub fn indexes_at(&self, block: &Block) -> Result<(u64, u64), Error> { + indexes_at(block, self.commit_index.deref()) + } + /// Get sum tree roots pub fn roots( &mut self, @@ -193,10 +216,12 @@ where let res: Result; let rollback: bool; { - debug!(LOGGER, "Starting new sumtree extension."); let commit_index = trees.commit_index.clone(); + + debug!(LOGGER, "Starting new sumtree extension."); let mut extension = Extension::new(trees, commit_index); res = inner(&mut extension); + rollback = extension.rollback; if res.is_ok() && !rollback { extension.save_pos_index()?; @@ -209,6 +234,7 @@ where trees.output_pmmr_h.backend.discard(); trees.rproof_pmmr_h.backend.discard(); trees.kernel_pmmr_h.backend.discard(); + trees.kernel_file.discard(); Err(e) } Ok(r) => { @@ -217,11 +243,13 @@ where trees.output_pmmr_h.backend.discard(); trees.rproof_pmmr_h.backend.discard(); trees.kernel_pmmr_h.backend.discard(); + trees.kernel_file.discard(); } else { debug!(LOGGER, "Committing sumtree extension."); trees.output_pmmr_h.backend.sync()?; trees.rproof_pmmr_h.backend.sync()?; trees.kernel_pmmr_h.backend.sync()?; + trees.kernel_file.flush()?; trees.output_pmmr_h.last_pos = sizes.0; trees.rproof_pmmr_h.last_pos = sizes.1; trees.kernel_pmmr_h.last_pos = sizes.2; @@ -241,6 +269,7 @@ pub struct Extension<'a> { rproof_pmmr: PMMR<'a, NoSum, PMMRBackend>>, kernel_pmmr: PMMR<'a, NoSum, PMMRBackend>>, + kernel_file: &'a mut AppendOnlyFile, commit_index: Arc, new_output_commits: HashMap, new_kernel_excesses: HashMap, @@ -249,7 +278,11 @@ pub struct Extension<'a> { impl<'a> Extension<'a> { // constructor - fn new(trees: &'a mut SumTrees, commit_index: Arc) -> Extension<'a> { + fn new( + trees: &'a mut SumTrees, + commit_index: Arc, + ) -> Extension<'a> { + Extension { output_pmmr: PMMR::at( &mut trees.output_pmmr_h.backend, @@ -263,6 +296,7 @@ impl<'a> Extension<'a> { &mut trees.kernel_pmmr_h.backend, trees.kernel_pmmr_h.last_pos, ), + kernel_file: &mut trees.kernel_file, commit_index: commit_index, new_output_commits: HashMap::new(), new_kernel_excesses: HashMap::new(), @@ -407,15 +441,18 @@ impl<'a> Extension<'a> { } } } - // push kernels in their MMR + + // push kernels in their MMR and file let pos = self.kernel_pmmr .push(NoSum(kernel.clone())) .map_err(&Error::SumTreeErr)?; self.new_kernel_excesses.insert(kernel.excess, pos); + self.kernel_file.append(&mut ser::ser_vec(&kernel).unwrap()); + Ok(()) } - /// Rewinds the MMRs to the provided position, given the last output and + /// Rewinds the MMRs to the provided block, using the last output and /// last kernel of the block we want to rewind to. pub fn rewind(&mut self, block: &Block) -> Result<(), Error> { debug!( @@ -425,22 +462,23 @@ impl<'a> Extension<'a> { block.header.height, ); - let out_pos_rew = match block.outputs.last() { - Some(output) => self.get_output_pos(&output.commitment()) - .map_err(|e| { - Error::StoreErr(e, format!("missing output pos for known block")) - })?, - None => 0, - }; + // rewind each MMR + let (out_pos_rew, kern_pos_rew) = indexes_at(block, self.commit_index.deref())?; + self.rewind_pos(block.header.height, out_pos_rew, kern_pos_rew)?; + + // rewind the kernel file store, the position is the number of kernels + // multiplied by their size + // the number of kernels is the number of leaves in the MMR, which is the + // sum of the number of leaf nodes under each peak in the MMR + let pos: u64 = pmmr::peaks(kern_pos_rew).iter().map(|n| (1 << n) as u64).sum(); + self.kernel_file.rewind(pos * (TxKernel::size() as u64)); - let kern_pos_rew = match block.kernels.last() { - Some(kernel) => self.get_kernel_pos(&kernel.excess) - .map_err(|e| { - Error::StoreErr(e, format!("missing kernel pos for known block")) - })?, - None => 0, - }; + Ok(()) + } + /// Rewinds the MMRs to the provided positions, given the output and + /// kernel we want to rewind to. + pub fn rewind_pos(&mut self, height: u64, out_pos_rew: u64, kern_pos_rew: u64) -> Result<(), Error> { debug!( LOGGER, "Rewind sumtrees to output pos: {}, kernel pos: {}", @@ -448,8 +486,6 @@ impl<'a> Extension<'a> { kern_pos_rew, ); - let height = block.header.height; - self.output_pmmr .rewind(out_pos_rew, height as u32) .map_err(&Error::SumTreeErr)?; @@ -496,14 +532,67 @@ impl<'a> Extension<'a> { ) } + /// Validate the current sumtree state against a block header + pub fn validate(&self, header: &BlockHeader) -> Result<(), Error> { + // validate all hashes and sums within the trees + if let Err(e) = self.output_pmmr.validate() { + return Err(Error::InvalidSumtree(e)); + } + if let Err(e) = self.rproof_pmmr.validate() { + return Err(Error::InvalidSumtree(e)); + } + if let Err(e) = self.kernel_pmmr.validate() { + return Err(Error::InvalidSumtree(e)); + } + + // validate the tree roots against the block header + let (utxo_root, rproof_root, kernel_root) = self.roots(); + if utxo_root.hash != header.utxo_root || rproof_root.hash != header.range_proof_root + || kernel_root.hash != header.kernel_root + { + return Err(Error::InvalidRoot); + } + + // the real magicking: the sum of all kernel excess should equal the sum + // of all UTXO commitments, minus the total supply + let (kernel_sum, fees) = self.sum_kernels()?; + let utxo_sum = self.sum_utxos()?; + { + let secp = static_secp_instance(); + let secp = secp.lock().unwrap(); + let over_commit = secp.commit_value(header.height * reward(0) - fees / 2)?; + let adjusted_sum_utxo = secp.commit_sum(vec![utxo_sum], vec![over_commit])?; + + if adjusted_sum_utxo != kernel_sum { + return Err(Error::InvalidSumtree("Differing UTXO commitment and kernel excess sums.".to_owned())); + } + } + + Ok(()) + } + + /// Rebuild the index of MMR positions to the corresponding UTXO and kernel + /// by iterating over the whole MMR data. This is a costly operation + /// performed only when we receive a full new chain state. + pub fn rebuild_index(&self) -> Result<(), Error> { + for n in 1..self.output_pmmr.unpruned_size()+1 { + // non-pruned leaves only + if pmmr::bintree_postorder_height(n) == 0 { + if let Some(hs) = self.output_pmmr.get(n) { + self.commit_index.save_output_pos(&hs.sum.commit, n)?; + } + } + } + Ok(()) + } + /// Force the rollback of this extension, no matter the result pub fn force_rollback(&mut self) { self.rollback = true; } /// Dumps the state of the 3 sum trees to stdout for debugging. Short - /// version - /// only prints the UTXO tree. + /// version only prints the UTXO tree. pub fn dump(&self, short: bool) { debug!(LOGGER, "-- outputs --"); self.output_pmmr.dump(short); @@ -523,4 +612,109 @@ impl<'a> Extension<'a> { self.kernel_pmmr.unpruned_size(), ) } + + /// Sums the excess of all our kernels, validating their signatures on the way + fn sum_kernels(&self) -> Result<(Commitment, u64), Error> { + // make sure we have the right count of kernels using the MMR, the storage + // file may have a few more + let mmr_sz = self.kernel_pmmr.unpruned_size(); + let count: u64 = pmmr::peaks(mmr_sz).iter().map(|n| { + (1 << pmmr::bintree_postorder_height(*n)) as u64 + }).sum(); + + let mut kernel_file = File::open(self.kernel_file.path())?; + let first: TxKernel = ser::deserialize(&mut kernel_file)?; + first.verify()?; + let mut sum_kernel = first.excess; + let mut fees = first.fee; + + let secp = static_secp_instance(); + let mut kern_count = 1; + loop { + match ser::deserialize::(&mut kernel_file) { + Ok(kernel) => { + kernel.verify()?; + let secp = secp.lock().unwrap(); + sum_kernel = secp.commit_sum(vec![sum_kernel, kernel.excess], vec![])?; + fees += kernel.fee; + kern_count += 1; + if kern_count == count { + break; + } + } + Err(_) => break, + } + } + debug!(LOGGER, "Validated and summed {} kernels", kern_count); + Ok((sum_kernel, fees)) + } + + /// Sums all our UTXO commitments + fn sum_utxos(&self) -> Result { + let mut sum_utxo = None; + let mut utxo_count = 0; + let secp = static_secp_instance(); + for n in 1..self.output_pmmr.unpruned_size()+1 { + if pmmr::bintree_postorder_height(n) == 0 { + if let Some(hs) = self.output_pmmr.get(n) { + if n == 1 { + sum_utxo = Some(hs.sum.commit); + } else { + let secp = secp.lock().unwrap(); + sum_utxo = Some(secp.commit_sum(vec![sum_utxo.unwrap(), hs.sum.commit], vec![])?); + } + utxo_count += 1; + } + } + } + debug!(LOGGER, "Summed {} UTXOs", utxo_count); + Ok(sum_utxo.unwrap()) + } +} + +/// Output and kernel MMR indexes at the end of the provided block +fn indexes_at(block: &Block, commit_index: &ChainStore) -> Result<(u64, u64), Error> { + let out_idx = match block.outputs.last() { + Some(output) => commit_index.get_output_pos(&output.commitment()) + .map_err(|e| { + Error::StoreErr(e, format!("missing output pos for known block")) + })?, + None => 0, + }; + + let kern_idx = match block.kernels.last() { + Some(kernel) => commit_index.get_kernel_pos(&kernel.excess) + .map_err(|e| { + Error::StoreErr(e, format!("missing kernel pos for known block")) + })?, + None => 0, + }; + Ok((out_idx, kern_idx)) +} + +/// Packages the sumtree data files into a zip and returns a Read to the +/// resulting file +pub fn zip_read(root_dir: String) -> Result { + let sumtrees_path = Path::new(&root_dir).join(SUMTREES_SUBDIR); + let zip_path = Path::new(&root_dir).join(SUMTREES_ZIP); + + // create the zip archive + { + zip::compress(&sumtrees_path, &File::create(zip_path.clone())?) + .map_err(|ze| Error::Other(ze.to_string()))?; + } + + // open it again to read it back + let zip_file = File::open(zip_path)?; + Ok(zip_file) +} + +/// Extract the sumtree data from a zip file and writes the content into the +/// sumtree storage dir +pub fn zip_write(root_dir: String, sumtree_data: File) -> Result<(), Error> { + let sumtrees_path = Path::new(&root_dir).join(SUMTREES_SUBDIR); + + fs::create_dir_all(sumtrees_path.clone())?; + zip::decompress(sumtree_data, &sumtrees_path) + .map_err(|ze| Error::Other(ze.to_string())) } diff --git a/chain/src/types.rs b/chain/src/types.rs index c02686031..d3f67cf0e 100644 --- a/chain/src/types.rs +++ b/chain/src/types.rs @@ -16,6 +16,7 @@ use std::io; +use util::secp; use util::secp::pedersen::Commitment; use grin_store as store; @@ -76,6 +77,8 @@ pub enum Error { OutputSpent, /// Invalid block version, either a mistake or outdated software InvalidBlockVersion(u16), + /// We've been provided a bad sumtree + InvalidSumtree(String), /// Internal issue when trying to save or load data from store StoreErr(grin_store::Error, String), /// Error serializing or deserializing a type @@ -105,10 +108,15 @@ impl From for Error { Error::SumTreeErr(e.to_string()) } } +impl From for Error { + fn from(e: secp::Error) -> Error { + Error::SumTreeErr(format!("Sum validation error: {}", e.to_string())) + } +} impl Error { /// Whether the error is due to a block that was intrinsically wrong - pub fn is_bad_block(&self) -> bool { + pub fn is_bad_data(&self) -> bool { // shorter to match on all the "not the block's fault" errors match *self { Error::Unfit(_) | @@ -211,6 +219,9 @@ pub trait ChainStore: Send + Sync { /// Gets a block header by hash fn get_block(&self, h: &Hash) -> Result; + /// Check whether we have a block without reading it + fn block_exists(&self, h: &Hash) -> Result; + /// Gets a block header by hash fn get_block_header(&self, h: &Hash) -> Result; @@ -238,6 +249,9 @@ pub trait ChainStore: Send + Sync { /// Gets the block header at the provided height fn get_header_by_height(&self, height: u64) -> Result; + /// Save a header as associated with its height + fn save_header_height(&self, header: &BlockHeader) -> Result<(), store::Error>; + /// Delete the block header at the height fn delete_header_by_height(&self, height: u64) -> Result<(), store::Error>; diff --git a/chain/tests/mine_simple_chain.rs b/chain/tests/mine_simple_chain.rs index cfc15749a..063746164 100644 --- a/chain/tests/mine_simple_chain.rs +++ b/chain/tests/mine_simple_chain.rs @@ -118,6 +118,7 @@ fn mine_empty_chain() { let header_by_height = chain.get_header_by_height(n).unwrap(); assert_eq!(header_by_height.hash(), bhash); } + chain.validate().unwrap(); } #[test] diff --git a/config/src/types.rs b/config/src/types.rs index 33d0ca903..c9d3737fa 100644 --- a/config/src/types.rs +++ b/config/src/types.rs @@ -96,6 +96,7 @@ pub struct GlobalConfig { #[derive(Debug, Serialize, Deserialize)] pub struct ConfigMembers { /// Server config + #[serde(default)] pub server: ServerConfig, /// Mining config pub mining: Option, diff --git a/core/src/core/pmmr.rs b/core/src/core/pmmr.rs index 045f3f0df..766bf6485 100644 --- a/core/src/core/pmmr.rs +++ b/core/src/core/pmmr.rs @@ -47,7 +47,7 @@ use util::LOGGER; /// the tree can sum over pub trait Summable { /// The type of the sum - type Sum: Clone + ops::Add + Readable + Writeable; + type Sum: Clone + ops::Add + Readable + Writeable + PartialEq; /// Obtain the sum of the element fn sum(&self) -> Self::Sum; @@ -80,6 +80,12 @@ impl Writeable for NullSum { } } +impl PartialEq for NullSum { + fn eq(&self, _other: &NullSum) -> bool { + true + } +} + /// Wrapper for a type that allows it to be inserted in a tree without summing #[derive(Clone, Debug)] pub struct NoSum(pub T); @@ -103,7 +109,7 @@ where /// A utility type to handle (Hash, Sum) pairs more conveniently. The addition /// of two HashSums is the (Hash(h1|h2), h1 + h2) HashSum. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, Eq)] pub struct HashSum where T: Summable, @@ -130,6 +136,15 @@ where } } +impl PartialEq for HashSum +where + T: Summable, +{ + fn eq(&self, other: &HashSum) -> bool { + self.hash == other.hash && self.sum == other.sum + } +} + impl Readable for HashSum where T: Summable, @@ -382,6 +397,32 @@ where return_vec } + /// Walks all unpruned nodes in the MMR and revalidate all parent hashes + /// and sums. + pub fn validate(&self) -> Result<(), String> { + // iterate on all parent nodes + for n in 1..(self.last_pos + 1) { + if bintree_postorder_height(n) > 0 { + if let Some(hs) = self.get(n) { + // take the left and right children, if they exist + let left_pos = bintree_move_down_left(n).unwrap(); + let right_pos = bintree_jump_right_sibling(left_pos); + + if let Some(left_child_hs) = self.get(left_pos) { + if let Some(right_child_hs) = self.get(right_pos) { + // sum and compare + if left_child_hs + right_child_hs != hs { + return Err(format!("Invalid MMR, hashsum of parent at {} does \ + not match children.", n)); + } + } + } + } + } + } + Ok(()) + } + /// Total size of the tree, including intermediary nodes an ignoring any /// pruning. pub fn unpruned_size(&self) -> u64 { @@ -583,7 +624,7 @@ impl PruneList { /// node's position. Starts with the top peak, which is always on the left /// side of the range, and navigates toward lower siblings toward the right /// of the range. -fn peaks(num: u64) -> Vec { +pub fn peaks(num: u64) -> Vec { // detecting an invalid mountain range, when siblings exist but no parent // exists if bintree_postorder_height(num + 1) > bintree_postorder_height(num) { diff --git a/core/src/core/transaction.rs b/core/src/core/transaction.rs index f1b4c4ec5..427c4a68e 100644 --- a/core/src/core/transaction.rs +++ b/core/src/core/transaction.rs @@ -167,6 +167,13 @@ impl TxKernel { } Ok(()) } + + /// Size in bytes of a kernel, necessary for binary storage + pub fn size() -> usize { + 17 + // features plus fee and lock_height + secp::constants::PEDERSEN_COMMITMENT_SIZE + + secp::constants::AGG_SIGNATURE_SIZE + } } /// A transaction @@ -770,7 +777,7 @@ impl Readable for OutputIdentifier { } /// Wrapper to Output commitments to provide the Summable trait. -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] pub struct SumCommit { /// Output features (coinbase vs. regular transaction output) /// We need to include this when hashing to ensure coinbase maturity can be enforced. diff --git a/core/src/global.rs b/core/src/global.rs index 00aa43c7d..a48b03282 100644 --- a/core/src/global.rs +++ b/core/src/global.rs @@ -26,7 +26,7 @@ use consensus::PROOFSIZE; use consensus::DEFAULT_SIZESHIFT; use consensus::COINBASE_MATURITY; use consensus::{MEDIAN_TIME_WINDOW, INITIAL_DIFFICULTY, - BLOCK_TIME_SEC, DIFFICULTY_ADJUST_WINDOW}; + BLOCK_TIME_SEC, DIFFICULTY_ADJUST_WINDOW, CUT_THROUGH_HORIZON}; use core::target::Difficulty; use consensus::TargetError; @@ -51,6 +51,9 @@ pub const AUTOMATED_TESTING_COINBASE_MATURITY: u64 = 3; /// User testing coinbase maturity pub const USER_TESTING_COINBASE_MATURITY: u64 = 3; +/// Testing cut through horizon in blocks +pub const TESTING_CUT_THROUGH_HORIZON: u32 = 20; + /// Testing initial block difficulty pub const TESTING_INITIAL_DIFFICULTY: u64 = 1; @@ -163,6 +166,18 @@ pub fn initial_block_difficulty() -> u64 { } } +/// Horizon at which we can cut-through and do full local pruning +pub fn cut_through_horizon() -> u32 { + let param_ref = CHAIN_TYPE.read().unwrap(); + match *param_ref { + ChainTypes::AutomatedTesting => TESTING_CUT_THROUGH_HORIZON, + ChainTypes::UserTesting => TESTING_CUT_THROUGH_HORIZON, + ChainTypes::Testnet1 => CUT_THROUGH_HORIZON, + ChainTypes::Testnet2 => CUT_THROUGH_HORIZON, + ChainTypes::Mainnet => CUT_THROUGH_HORIZON, + } +} + /// Are we in automated testing mode? pub fn is_automated_testing_mode() -> bool { let param_ref = CHAIN_TYPE.read().unwrap(); diff --git a/grin/src/adapters.rs b/grin/src/adapters.rs index 3c5dfbf3b..85ca06981 100644 --- a/grin/src/adapters.rs +++ b/grin/src/adapters.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fs::File; use std::net::SocketAddr; use std::sync::{Arc, RwLock}; use std::sync::atomic::{AtomicBool, Ordering}; @@ -123,7 +124,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { if let &Err(ref e) = &res { debug!(LOGGER, "Block header {} refused by chain: {:?}", bhash, e); - if e.is_bad_block() { + if e.is_bad_data() { debug!(LOGGER, "header_received: {} is a bad header, resetting header head", bhash); let _ = self.chain.reset_head(); return false; @@ -183,10 +184,14 @@ impl p2p::ChainAdapter for NetToChainAdapter { } } } + + let header_head = self.chain.get_header_head().unwrap(); info!( LOGGER, - "Added {} headers to the header chain.", - added_hs.len() + "Added {} headers to the header chain. Last: {} at {}.", + added_hs.len(), + header_head.last_block_h, + header_head.height, ); } @@ -241,6 +246,41 @@ impl p2p::ChainAdapter for NetToChainAdapter { } } + /// Provides a reading view into the current sumtree state as well as + /// the required indexes for a consumer to rewind to a consistant state + /// at the provided block hash. + fn sumtrees_read(&self, h: Hash) -> Option { + match self.chain.sumtrees_read(h.clone()) { + Ok((out_index, kernel_index, read)) => Some(p2p::SumtreesRead { + output_index: out_index, + kernel_index: kernel_index, + reader: read, + }), + Err(e) => { + warn!(LOGGER, "Couldn't produce sumtrees data for block {}: {:?}", + h, e); + None + } + } + } + + /// Writes a reading view on a sumtree state that's been provided to us. + /// If we're willing to accept that new state, the data stream will be + /// read as a zip file, unzipped and the resulting state files should be + /// rewound to the provided indexes. + fn sumtrees_write(&self, h: Hash, + rewind_to_output: u64, rewind_to_kernel: u64, + sumtree_data: File, peer_addr: SocketAddr) -> bool { + // TODO check whether we should accept any sumtree now + if let Err(e) = self.chain.sumtrees_write(h, rewind_to_output, rewind_to_kernel, sumtree_data) { + error!(LOGGER, "Failed to save sumtree archive: {:?}", e); + !e.is_bad_data() + } else { + info!(LOGGER, "Received valid sumtree data for {}.", h); + self.currently_syncing.store(true, Ordering::Relaxed); + true + } + } } impl NetToChainAdapter { @@ -302,7 +342,7 @@ impl NetToChainAdapter { let res = self.chain.process_block(b, self.chain_opts()); if let Err(ref e) = res { debug!(LOGGER, "Block {} refused by chain: {:?}", bhash, e); - if e.is_bad_block() { + if e.is_bad_data() { debug!(LOGGER, "adapter: process_block: {} is a bad block, resetting head", bhash); let _ = self.chain.reset_head(); return false; diff --git a/grin/src/server.rs b/grin/src/server.rs index 2c11c77f7..92e3df441 100644 --- a/grin/src/server.rs +++ b/grin/src/server.rs @@ -142,6 +142,7 @@ impl Server { p2p_server.peers.clone(), shared_chain.clone(), skip_sync_wait, + !config.archive_mode, ); let p2p_inner = p2p_server.clone(); @@ -206,6 +207,11 @@ impl Server { self.chain.head().unwrap() } + /// The head of the block header chain + pub fn header_head(&self) -> chain::Tip { + self.chain.get_header_head().unwrap() + } + /// Returns a set of stats about this server. This and the ServerStats /// structure /// can be updated over time to include any information needed by tests or diff --git a/grin/src/sync.rs b/grin/src/sync.rs index 91b61aa37..d8208c95b 100644 --- a/grin/src/sync.rs +++ b/grin/src/sync.rs @@ -19,8 +19,9 @@ use std::sync::atomic::{AtomicBool, Ordering}; use time; use chain; -use core::core::hash::{Hash, Hashed}; +use core::core::hash::{Hash, Hashed, ZERO_HASH}; use core::core::target::Difficulty; +use core::global; use p2p::{self, Peer, Peers, ChainAdapter}; use types::Error; use util::LOGGER; @@ -31,6 +32,7 @@ pub fn run_sync( peers: p2p::Peers, chain: Arc, skip_sync_wait: bool, + fast_sync: bool, ) { let chain = chain.clone(); @@ -39,18 +41,36 @@ pub fn run_sync( .spawn(move || { let mut prev_body_sync = time::now_utc(); let mut prev_header_sync = prev_body_sync.clone(); + let mut prev_state_sync = prev_body_sync.clone() - time::Duration::seconds(5 * 60); // initial sleep to give us time to peer with some nodes if !skip_sync_wait { thread::sleep(Duration::from_secs(30)); } - loop { - let syncing = needs_syncing( - currently_syncing.clone(), peers.clone(), chain.clone()); - if syncing { + // fast sync has 3 states: + // * syncing headers + // * once all headers are sync'd, requesting the sumtree state + // * once we have the state, get blocks after that + // + // full sync gets rid of the middle step and just starts from + // the genesis state - let current_time = time::now_utc(); + loop { + let horizon = global::cut_through_horizon() as u64; + let head = chain.head().unwrap(); + let header_head = chain.get_header_head().unwrap(); + + // in archival nodes (no fast sync) we just consider we have the whole + // state already + let have_sumtrees = !fast_sync || head.height > 0 && + header_head.height.saturating_sub(head.height) <= horizon; + + let syncing = needs_syncing( + currently_syncing.clone(), peers.clone(), chain.clone(), !have_sumtrees); + + let current_time = time::now_utc(); + if syncing { // run the header sync every 10s if current_time - prev_header_sync > time::Duration::seconds(10) { @@ -62,7 +82,7 @@ pub fn run_sync( } // run the body_sync every 5s - if current_time - prev_body_sync > time::Duration::seconds(5) { + if have_sumtrees && current_time - prev_body_sync > time::Duration::seconds(5) { body_sync( peers.clone(), chain.clone(), @@ -70,10 +90,29 @@ pub fn run_sync( prev_body_sync = current_time; } - thread::sleep(Duration::from_secs(1)); - } else { - thread::sleep(Duration::from_secs(10)); - } + } else if !have_sumtrees && + current_time - prev_state_sync > time::Duration::seconds(5*60) { + + if let Some(peer) = peers.most_work_peer() { + if let Ok(p) = peer.try_read() { + debug!(LOGGER, "Header head before sumtree request: {} / {}", + header_head.height, header_head.last_block_h); + + // just to handle corner case of a too early start + if header_head.height > horizon { + + // ask for sumtree at horizon + let mut sumtree_head = chain.get_block_header(&header_head.prev_block_h).unwrap(); + for _ in 0..horizon-2 { + sumtree_head = chain.get_block_header(&sumtree_head.previous).unwrap(); + } + p.send_sumtrees_request(sumtree_head.height, sumtree_head.hash()); + prev_state_sync = current_time; + } + } + } + } + thread::sleep(Duration::from_secs(1)); } }); } @@ -199,20 +238,29 @@ fn request_headers( pub fn needs_syncing( currently_syncing: Arc, peers: Peers, - chain: Arc) -> bool { + chain: Arc, + header_only: bool) -> bool { - let local_diff = peers.total_difficulty(); + let local_diff = if header_only { + chain.total_header_difficulty().unwrap() + } else { + chain.total_difficulty() + }; let peer = peers.most_work_peer(); + // if we're already syncing, we're caught up if no peer has a higher // difficulty than us if currently_syncing.load(Ordering::Relaxed) { if let Some(peer) = peer { if let Ok(peer) = peer.try_read() { + debug!(LOGGER, "needs_syncing {} {} {}", local_diff, peer.info.total_difficulty, header_only); if peer.info.total_difficulty <= local_diff { info!(LOGGER, "synchronized at {:?} @ {:?}", local_diff, chain.head().unwrap().height); currently_syncing.store(false, Ordering::Relaxed); - let _ = chain.reset_head(); + if !header_only { + let _ = chain.reset_head(); + } } } } else { diff --git a/grin/src/types.rs b/grin/src/types.rs index b997d5d14..f2cf800e9 100644 --- a/grin/src/types.rs +++ b/grin/src/types.rs @@ -116,6 +116,9 @@ pub struct ServerConfig { #[serde(default)] pub chain_type: ChainTypes, + /// Whether this node is a full archival node or a fast-sync, pruned node + pub archive_mode: bool, + /// Method used to get the list of seed nodes for initial bootstrap. #[serde(default)] pub seeding_type: Seeding, @@ -155,6 +158,7 @@ impl Default for ServerConfig { p2p_config: p2p::P2PConfig::default(), mining_config: Some(pow::types::MinerConfig::default()), chain_type: ChainTypes::default(), + archive_mode: false, pool_config: pool::PoolConfig::default(), skip_sync_wait: Some(true), } diff --git a/grin/tests/simulnet.rs b/grin/tests/simulnet.rs index 34899c84e..507df2e73 100644 --- a/grin/tests/simulnet.rs +++ b/grin/tests/simulnet.rs @@ -247,7 +247,6 @@ fn simulate_full_sync() { util::init_test_logger(); // we actually set the chain_type in the ServerConfig below - // TODO - avoid needing to set it in two places? global::set_mining_mode(ChainTypes::AutomatedTesting); let test_name_dir = "grin-sync"; @@ -271,16 +270,56 @@ fn simulate_full_sync() { let s1 = grin::Server::new(config(0, "grin-sync")).unwrap(); // mine a few blocks on server 1 s1.start_miner(miner_config); - thread::sleep(time::Duration::from_secs(5)); + thread::sleep(time::Duration::from_secs(8)); let mut conf = config(1, "grin-sync"); - conf.skip_sync_wait = Some(false); let s2 = grin::Server::new(conf).unwrap(); while s2.head().height < 4 { thread::sleep(time::Duration::from_millis(100)); } } +/// Creates 2 different disconnected servers, mine a few blocks on one, connect +/// them and check that the 2nd gets all using fast sync algo +#[test] +fn simulate_fast_sync() { + util::init_test_logger(); + + // we actually set the chain_type in the ServerConfig below + global::set_mining_mode(ChainTypes::AutomatedTesting); + + let test_name_dir = "grin-fast"; + framework::clean_all_output(test_name_dir); + + let mut plugin_config = pow::types::CuckooMinerPluginConfig::default(); + let mut plugin_config_vec: Vec = Vec::new(); + plugin_config.type_filter = String::from("mean_cpu"); + plugin_config_vec.push(plugin_config); + + let miner_config = pow::types::MinerConfig { + enable_mining: true, + burn_reward: true, + use_cuckoo_miner: false, + cuckoo_miner_async_mode: Some(false), + cuckoo_miner_plugin_dir: Some(String::from("../target/debug/deps")), + cuckoo_miner_plugin_config: Some(plugin_config_vec), + ..Default::default() + }; + + let s1 = grin::Server::new(config(1000, "grin-fast")).unwrap(); + // mine a few blocks on server 1 + s1.start_miner(miner_config); + thread::sleep(time::Duration::from_secs(8)); + + let mut conf = config(1001, "grin-fast"); + conf.archive_mode = false; + conf.seeds = Some(vec!["127.0.0.1:12000".to_string()]); + let s2 = grin::Server::new(conf).unwrap(); + while s2.head().height != s2.header_head().height || s2.head().height < 20 { + thread::sleep(time::Duration::from_millis(1000)); + } +} + fn config(n: u16, test_name_dir: &str) -> grin::ServerConfig { grin::ServerConfig { api_http_addr: format!("127.0.0.1:{}", 19000 + n), @@ -292,6 +331,8 @@ fn config(n: u16, test_name_dir: &str) -> grin::ServerConfig { seeding_type: grin::Seeding::List, seeds: Some(vec!["127.0.0.1:11000".to_string()]), chain_type: core::global::ChainTypes::AutomatedTesting, + archive_mode: true, + skip_sync_wait: Some(true), ..Default::default() } } diff --git a/p2p/src/conn.rs b/p2p/src/conn.rs index 9dd366893..17a45f713 100644 --- a/p2p/src/conn.rs +++ b/p2p/src/conn.rs @@ -20,7 +20,9 @@ //! forces us to go through some additional gymnastic to loop over the async //! stream and make sure we get the right number of bytes out. -use std::io::{self, Write}; +use std::cmp; +use std::fs::File; +use std::io::{self, Read, Write}; use std::sync::{Arc, Mutex, mpsc}; use std::net::TcpStream; use std::thread; @@ -31,8 +33,10 @@ use msg::*; use types::*; use util::LOGGER; +/// A trait to be implemented in order to receive messages from the +/// connection. Allows providing an optional response. pub trait MessageHandler: Send + 'static { - fn consume(&self, msg: &mut Message) -> Result, Type)>, Error>; + fn consume<'a>(&self, msg: Message<'a>) -> Result>, Error>; } // Macro to simplify the boilerplate around asyn I/O error handling, @@ -52,6 +56,8 @@ macro_rules! try_break { } } +/// A message as received by the connection. Provides access to the message +/// header lazily consumes the message body, handling its deserialization. pub struct Message<'a> { pub header: MsgHeader, conn: &'a mut TcpStream, @@ -63,9 +69,67 @@ impl<'a> Message<'a> { Message{header, conn} } + /// Read the message body from the underlying connection pub fn body(&mut self) -> Result where T: ser::Readable { read_body(&self.header, self.conn) } + + pub fn copy_attachment(&mut self, len: usize, writer: &mut Write) -> Result<(), Error> { + let mut written = 0; + while written < len { + let read_len = cmp::min(8000, len - written); + let mut buf = vec![0u8; read_len]; + read_exact(&mut self.conn, &mut buf[..], 10000, true)?; + writer.write_all(&mut buf)?; + written += read_len; + } + Ok(()) + } + + /// Respond to the message with the provided message type and body + pub fn respond(self, resp_type: Type, body: T) -> Response<'a> + where + T: ser::Writeable + { + let body = ser::ser_vec(&body).unwrap(); + Response{ + resp_type: resp_type, + body: body, + conn: self.conn, + attachment: None, + } + } +} + +/// Response to a `Message` +pub struct Response<'a> { + resp_type: Type, + body: Vec, + conn: &'a mut TcpStream, + attachment: Option, +} + +impl<'a> Response<'a> { + fn write(mut self) -> Result<(), Error> { + let mut msg = ser::ser_vec(&MsgHeader::new(self.resp_type, self.body.len() as u64)).unwrap(); + msg.append(&mut self.body); + write_all(&mut self.conn, &msg[..], 10000)?; + if let Some(mut file) = self.attachment { + let mut buf = [0u8; 8000]; + loop { + match file.read(&mut buf[..]) { + Ok(0) => break, + Ok(n) => write_all(&mut self.conn, &buf[..n], 10000)?, + Err(e) => return Err(From::from(e)), + } + } + } + Ok(()) + } + + pub fn add_attachment(&mut self, file: File) { + self.attachment = Some(file); + } } // TODO count sent and received @@ -105,7 +169,7 @@ where let (error_tx, error_rx) = mpsc::channel(); stream.set_nonblocking(true).expect("Non-blocking IO not available."); - poll(stream, handler, send_rx, send_tx.clone(), error_tx, close_rx); + poll(stream, handler, send_rx, error_tx, close_rx); Tracker { sent_bytes: Arc::new(Mutex::new(0)), @@ -120,7 +184,6 @@ fn poll( conn: TcpStream, handler: H, send_rx: mpsc::Receiver>, - send_tx: mpsc::Sender>, error_tx: mpsc::Sender, close_rx: mpsc::Receiver<()> ) @@ -137,10 +200,10 @@ where loop { // check the read end if let Some(h) = try_break!(error_tx, read_header(conn)) { - let mut msg = Message::from_header(h, conn); + let msg = Message::from_header(h, conn); debug!(LOGGER, "Received message header, type {:?}, len {}.", msg.header.msg_type, msg.header.msg_len); - if let Some(Some((body, typ))) = try_break!(error_tx, handler.consume(&mut msg)) { - respond(&send_tx, typ, body); + if let Some(Some(resp)) = try_break!(error_tx, handler.consume(msg)) { + try_break!(error_tx, resp.write()); } } @@ -173,9 +236,3 @@ where } }); } - -fn respond(send_tx: &mpsc::Sender>, msg_type: Type, mut body: Vec) { - let mut msg = ser::ser_vec(&MsgHeader::new(msg_type, body.len() as u64)).unwrap(); - msg.append(&mut body); - send_tx.send(msg).unwrap(); -} diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index 1cae88fbc..63a5ca840 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -52,6 +52,6 @@ mod types; pub use serv::{Server, DummyAdapter}; pub use peers::Peers; pub use peer::Peer; -pub use types::{Capabilities, Error, ChainAdapter, P2PConfig, PeerInfo, MAX_BLOCK_HEADERS, - MAX_PEER_ADDRS}; +pub use types::{Capabilities, Error, ChainAdapter, SumtreesRead, P2PConfig, + PeerInfo, MAX_BLOCK_HEADERS, MAX_PEER_ADDRS}; pub use store::{PeerData, State}; diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index 05b993785..386c02af9 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -65,6 +65,8 @@ enum_from_primitive! { GetCompactBlock, CompactBlock, Transaction, + SumtreesRequest, + SumtreesArchive } } @@ -120,6 +122,34 @@ pub fn read_exact( Ok(()) } +/// Same as `read_exact` but for writing. +pub fn write_all(conn: &mut Write, mut buf: &[u8], timeout: u32) -> io::Result<()> { + + let sleep_time = time::Duration::from_millis(1); + let mut count = 0; + + while !buf.is_empty() { + match conn.write(buf) { + Ok(0) => return Err(io::Error::new(io::ErrorKind::WriteZero, + "failed to write whole buffer")), + Ok(n) => buf = &buf[n..], + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {} + Err(e) => return Err(e), + } + if !buf.is_empty() { + thread::sleep(sleep_time); + count += 1; + } else { + break; + } + if count > timeout { + return Err(io::Error::new(io::ErrorKind::TimedOut, "reading from tcp stream")); + } + } + Ok(()) +} + /// Read a header from the provided connection without blocking if the /// underlying stream is async. Typically headers will be polled for, so /// we do not want to block. @@ -571,7 +601,7 @@ impl Readable for Ping { Ok(h) => h, Err(_) => 0, }; -Ok(Ping { total_difficulty, height }) + Ok(Ping { total_difficulty, height }) } } @@ -605,3 +635,65 @@ impl Readable for Pong { Ok(Pong { total_difficulty, height }) } } + +/// Request to get an archive of the full sumtree store, required to sync +/// a new node. +pub struct SumtreesRequest { + /// Hash of the block for which the sumtrees should be provided + pub hash: Hash, + /// Height of the corresponding block + pub height: u64 +} + +impl Writeable for SumtreesRequest { + fn write(&self, writer: &mut W) -> Result<(), ser::Error> { + self.hash.write(writer)?; + writer.write_u64(self.height)?; + Ok(()) + } +} + +impl Readable for SumtreesRequest { + fn read(reader: &mut Reader) -> Result { + Ok(SumtreesRequest { + hash: Hash::read(reader)?, + height: reader.read_u64()?, + }) + } +} + +/// Response to a sumtree archive request, must include a zip stream of the +/// archive after the message body. +pub struct SumtreesArchive { + /// Hash of the block for which the sumtrees are provided + pub hash: Hash, + /// Height of the corresponding block + pub height: u64, + /// Output tree index the receiver should rewind to + pub rewind_to_output: u64, + /// Kernel tree index the receiver should rewind to + pub rewind_to_kernel: u64, + /// Size in bytes of the archive + pub bytes: u64, +} + +impl Writeable for SumtreesArchive { + fn write(&self, writer: &mut W) -> Result<(), ser::Error> { + self.hash.write(writer)?; + ser_multiwrite!(writer, [write_u64, self.height], + [write_u64, self.rewind_to_output], + [write_u64, self.rewind_to_kernel], + [write_u64, self.bytes]); + Ok(()) + } +} + +impl Readable for SumtreesArchive { + fn read(reader: &mut Reader) -> Result { + let hash = Hash::read(reader)?; + let (height, rewind_to_output, rewind_to_kernel, bytes) = + ser_multiread!(reader, read_u64, read_u64, read_u64, read_u64); + + Ok(SumtreesArchive {hash, height, rewind_to_output, rewind_to_kernel, bytes}) + } +} diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 73899a8e5..907ecf01e 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fs::File; use std::net::{SocketAddr, TcpStream}; use std::sync::{Arc, RwLock}; @@ -230,6 +231,13 @@ impl Peer { msg::Type::GetPeerAddrs) } + pub fn send_sumtrees_request(&self, height: u64, hash: Hash) -> Result<(), Error> { + debug!(LOGGER, "Asking {} for sumtree archive at {} {}.", + self.info.addr, height, hash); + self.connection.as_ref().unwrap().send( + &SumtreesRequest {hash, height }, msg::Type::SumtreesRequest) + } + /// Stops the peer, closing its connection pub fn stop(&self) { let _ = self.connection.as_ref().unwrap().close_channel.send(()); @@ -326,6 +334,17 @@ impl ChainAdapter for TrackingAdapter { fn get_block(&self, h: Hash) -> Option { self.adapter.get_block(h) } + + fn sumtrees_read(&self, h: Hash) -> Option { + self.adapter.sumtrees_read(h) + } + + fn sumtrees_write(&self, h: Hash, + rewind_to_output: u64, rewind_to_kernel: u64, + sumtree_data: File, peer_addr: SocketAddr) -> bool { + self.adapter.sumtrees_write(h, rewind_to_output, rewind_to_kernel, + sumtree_data, peer_addr) + } } impl NetAdapter for TrackingAdapter { diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index c537875bd..94af4159d 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::HashMap; +use std::fs::File; use std::net::SocketAddr; use std::sync::{Arc, RwLock}; @@ -482,6 +483,25 @@ impl ChainAdapter for Peers { fn get_block(&self, h: Hash) -> Option { self.adapter.get_block(h) } + fn sumtrees_read(&self, h: Hash) -> Option { + self.adapter.sumtrees_read(h) + } + fn sumtrees_write( + &self, + h: Hash, + rewind_to_output: u64, + rewind_to_kernel: u64, + sumtree_data: File, + peer_addr: SocketAddr, + ) -> bool { + if !self.adapter.sumtrees_write(h, rewind_to_output, rewind_to_kernel, + sumtree_data, peer_addr) { + self.ban_peer(&peer_addr); + false + } else { + true + } + } } impl NetAdapter for Peers { diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index 32af59e3e..3b3a4ab49 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -12,12 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; +use std::env; +use std::fs::File; use std::net::SocketAddr; +use std::sync::Arc; use core::core; use core::core::hash::{Hash, Hashed}; -use core::ser; use conn::*; use msg::*; use rand; @@ -37,7 +38,7 @@ impl Protocol { } impl MessageHandler for Protocol { - fn consume(&self, msg: &mut Message) -> Result, Type)>, Error> { + fn consume<'a>(&self, mut msg: Message<'a>) -> Result>, Error> { let adapter = &self.adapter; match msg.header.msg_type { @@ -46,13 +47,14 @@ impl MessageHandler for Protocol { let ping: Ping = msg.body()?; adapter.peer_difficulty(self.addr, ping.total_difficulty, ping.height); - let pong_bytes = ser::ser_vec( - &Pong { - total_difficulty: adapter.total_difficulty(), - height: adapter.total_height(), - }).unwrap(); - - Ok(Some((pong_bytes, Type::Pong))) + Ok(Some( + msg.respond( + Type::Pong, + Pong { + total_difficulty: adapter.total_difficulty(), + height: adapter.total_height(), + }) + )) } Type::Pong => { @@ -73,8 +75,7 @@ impl MessageHandler for Protocol { let bo = adapter.get_block(h); if let Some(b) = bo { - let block_bytes = ser::ser_vec(&b).unwrap(); - return Ok(Some((block_bytes, Type::Block))); + return Ok(Some(msg.respond(Type::Block, b))); } Ok(None) } @@ -111,11 +112,9 @@ impl MessageHandler for Protocol { "handle_payload: GetCompactBlock: empty block, sending full block", ); - let block_bytes = ser::ser_vec(&b).unwrap(); - Ok(Some((block_bytes, Type::Block))) + Ok(Some(msg.respond(Type::Block, b))) } else { - let compact_block_bytes = ser::ser_vec(&cb).unwrap(); - Ok(Some((compact_block_bytes, Type::CompactBlock))) + Ok(Some(msg.respond(Type::CompactBlock, cb))) } } else { Ok(None) @@ -137,8 +136,7 @@ impl MessageHandler for Protocol { let headers = adapter.locate_headers(loc.hashes); // serialize and send all the headers over - let header_bytes = ser::ser_vec(&Headers { headers: headers }).unwrap(); - return Ok(Some((header_bytes, Type::Headers))); + Ok(Some(msg.respond(Type::Headers, Headers { headers: headers }))) } // "header first" block propagation - if we have not yet seen this block @@ -162,11 +160,13 @@ impl MessageHandler for Protocol { Type::GetPeerAddrs => { let get_peers: GetPeerAddrs = msg.body()?; let peer_addrs = adapter.find_peer_addrs(get_peers.capabilities); - let peer_addrs_bytes = ser::ser_vec( - &PeerAddrs { - peers: peer_addrs.iter().map(|sa| SockAddr(*sa)).collect(), - }).unwrap(); - return Ok(Some((peer_addrs_bytes, Type::PeerAddrs))); + Ok(Some( + msg.respond( + Type::PeerAddrs, + PeerAddrs { + peers: peer_addrs.iter().map(|sa| SockAddr(*sa)).collect(), + }) + )) } Type::PeerAddrs => { @@ -175,6 +175,52 @@ impl MessageHandler for Protocol { Ok(None) } + Type::SumtreesRequest => { + let sm_req: SumtreesRequest = msg.body()?; + debug!(LOGGER, "handle_payload: sumtree req for {} at {}", + sm_req.hash, sm_req.height); + + let sumtrees = self.adapter.sumtrees_read(sm_req.hash); + + if let Some(sumtrees) = sumtrees { + let file_sz = sumtrees.reader.metadata()?.len(); + let mut resp = msg.respond( + Type::SumtreesArchive, + &SumtreesArchive { + height: sm_req.height as u64, + hash: sm_req.hash, + rewind_to_output: sumtrees.output_index, + rewind_to_kernel: sumtrees.kernel_index, + bytes: file_sz, + }); + resp.add_attachment(sumtrees.reader); + Ok(Some(resp)) + } else { + Ok(None) + } + } + + Type::SumtreesArchive => { + let sm_arch: SumtreesArchive = msg.body()?; + debug!(LOGGER, "handle_payload: sumtree archive for {} at {} rewind to {}/{}", + sm_arch.hash, sm_arch.height, + sm_arch.rewind_to_output, sm_arch.rewind_to_kernel); + + let mut tmp = env::temp_dir(); + tmp.push("sumtree.zip"); + { + let mut tmp_zip = File::create(tmp.clone())?; + msg.copy_attachment(sm_arch.bytes as usize, &mut tmp_zip)?; + tmp_zip.sync_all()?; + } + + let tmp_zip = File::open(tmp)?; + self.adapter.sumtrees_write( + sm_arch.hash, sm_arch.rewind_to_output, + sm_arch.rewind_to_kernel, tmp_zip, self.addr); + Ok(None) + } + _ => { debug!(LOGGER, "unknown message type {:?}", msg.header.msg_type); Ok(None) diff --git a/p2p/src/serv.rs b/p2p/src/serv.rs index 4008ba33e..fee2de5cb 100644 --- a/p2p/src/serv.rs +++ b/p2p/src/serv.rs @@ -12,8 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::{Arc, RwLock}; +use std::fs::File; use std::net::{TcpListener, TcpStream, SocketAddr, Shutdown}; +use std::sync::{Arc, RwLock}; use std::thread; use std::time::Duration; @@ -194,6 +195,15 @@ impl ChainAdapter for DummyAdapter { fn get_block(&self, _: Hash) -> Option { None } + fn sumtrees_read(&self, _h: Hash) -> Option { + unimplemented!() + } + + fn sumtrees_write(&self, _h: Hash, + _rewind_to_output: u64, _rewind_to_kernel: u64, + _sumtree_data: File, _peer_addr: SocketAddr) -> bool { + false + } } impl NetAdapter for DummyAdapter { diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 98c8bc710..02e297b4f 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::convert::From; +use std::fs::File; use std::io; use std::net::{IpAddr, SocketAddr}; use std::sync::mpsc; @@ -132,6 +133,17 @@ pub struct PeerInfo { pub total_difficulty: Difficulty, } +/// The full sumtree data along with indexes required for a consumer to +/// rewind to a consistant requested state. +pub struct SumtreesRead { + /// Output tree index the receiver should rewind to + pub output_index: u64, + /// Kernel tree index the receiver should rewind to + pub kernel_index: u64, + /// Binary stream for the sumtree zipped data + pub reader: File, +} + /// Bridge between the networking layer and the rest of the system. Handles the /// forwarding or querying of blocks and transactions from the network among /// other things. @@ -167,6 +179,19 @@ pub trait ChainAdapter: Sync + Send { /// Gets a full block by its hash. fn get_block(&self, h: Hash) -> Option; + + /// Provides a reading view into the current sumtree state as well as + /// the required indexes for a consumer to rewind to a consistant state + /// at the provided block hash. + fn sumtrees_read(&self, h: Hash) -> Option; + + /// Writes a reading view on a sumtree state that's been provided to us. + /// If we're willing to accept that new state, the data stream will be + /// read as a zip file, unzipped and the resulting state files should be + /// rewound to the provided indexes. + fn sumtrees_write(&self, h: Hash, + rewind_to_output: u64, rewind_to_kernel: u64, + sumtree_data: File, peer_addr: SocketAddr) -> bool; } /// Additional methods required by the protocol that don't need to be diff --git a/pool/src/pool.rs b/pool/src/pool.rs index 7d3d8cffa..49c246f6b 100644 --- a/pool/src/pool.rs +++ b/pool/src/pool.rs @@ -17,11 +17,12 @@ use std::sync::Arc; use std::collections::{HashMap, HashSet}; -use core::core::transaction; -use core::core::OutputIdentifier; -use core::core::{block, hash}; use util::secp::pedersen::Commitment; +use core::core::transaction; +use core::core::{block, hash, OutputIdentifier}; +use core::global; + use types::*; pub use graph; diff --git a/store/src/sumtree.rs b/store/src/sumtree.rs index daf34710d..2f9368105 100644 --- a/store/src/sumtree.rs +++ b/store/src/sumtree.rs @@ -18,6 +18,7 @@ use memmap; use std::cmp; use std::fs::{self, File, OpenOptions}; use std::io::{self, BufRead, BufReader, ErrorKind, Write}; +use std::marker::PhantomData; use std::os::unix::io::AsRawFd; use std::path::Path; use std::io::Read; @@ -27,7 +28,7 @@ use libc::{ftruncate64, off64_t}; #[cfg(not(any(target_os = "linux", target_os = "android")))] use libc::{ftruncate as ftruncate64, off_t as off64_t}; -use core::core::pmmr::{self, Backend, HashSum, Summable, VecBackend}; +use core::core::pmmr::{self, Backend, HashSum, Summable}; use core::ser; use util::LOGGER; @@ -46,15 +47,18 @@ pub const RM_LOG_MAX_NODES: usize = 10000; /// Despite being append-only, the file can still be pruned and truncated. The /// former simply happens by rewriting it, ignoring some of the data. The /// latter by truncating the underlying file and re-creating the mmap. -struct AppendOnlyFile { +pub struct AppendOnlyFile { path: String, file: File, mmap: Option, + buffer_start: usize, + buffer: Vec, + buffer_start_bak: usize, } impl AppendOnlyFile { /// Open a file (existing or not) as append-only, backed by a mmap. - fn open(path: String) -> io::Result { + pub fn open(path: String) -> io::Result { let file = OpenOptions::new() .read(true) .append(true) @@ -64,31 +68,68 @@ impl AppendOnlyFile { path: path.clone(), file: file, mmap: None, + buffer_start: 0, + buffer: vec![], + buffer_start_bak: 0, }; if let Ok(sz) = aof.size() { if sz > 0 { - aof.sync()?; + aof.buffer_start = sz as usize; + aof.mmap = Some(unsafe { memmap::Mmap::map(&aof.file)? }); } } Ok(aof) } - /// Append data to the file. - fn append(&mut self, buf: &[u8]) -> io::Result<()> { - self.file.write_all(buf) + /// Append data to the file. Until the append-only file is synced, data is + /// only written to memory. + pub fn append(&mut self, buf: &mut Vec) { + self.buffer.append(buf); + } + + /// Rewinds the data file back to a lower position. The new position needs + /// to be the one of the first byte the next time data is appended. + pub fn rewind(&mut self, pos: u64) { + if self.buffer_start_bak > 0 || self.buffer.len() > 0 { + panic!("Can't rewind on a dirty state."); + } + self.buffer_start_bak = self.buffer_start; + self.buffer_start = pos as usize; } /// Syncs all writes (fsync), reallocating the memory map to make the newly /// written data accessible. - fn sync(&mut self) -> io::Result<()> { + pub fn flush(&mut self) -> io::Result<()> { + if self.buffer_start_bak > 0 { + // flushing a rewound state, we need to truncate before applying + self.truncate(self.buffer_start)?; + self.buffer_start_bak = 0; + } + self.buffer_start += self.buffer.len(); + self.file.write(&self.buffer[..])?; self.file.sync_data()?; + self.buffer = vec![]; self.mmap = Some(unsafe { memmap::Mmap::map(&self.file)? }); Ok(()) } + /// Discard the current non-flushed data. + pub fn discard(&mut self) { + if self.buffer_start_bak > 0 { + // discarding a rewound state, restore the buffer start + self.buffer_start = self.buffer_start_bak; + self.buffer_start_bak = 0; + } + self.buffer = vec![]; + } + /// Read length bytes of data at offset from the file. Leverages the memory /// map. fn read(&self, offset: usize, length: usize) -> Vec { + if offset >= self.buffer_start { + let offset = offset - self.buffer_start; + return self.buffer[offset..(offset+length)].to_vec(); + } if let None = self.mmap { return vec![]; } @@ -96,6 +137,17 @@ impl AppendOnlyFile { (&mmap[offset..(offset + length)]).to_vec() } + /// Truncates the underlying file to the provided offset + fn truncate(&self, offs: usize) -> io::Result<()> { + let fd = self.file.as_raw_fd(); + let res = unsafe { ftruncate64(fd, offs as off64_t) }; + if res == -1 { + Err(io::Error::last_os_error()) + } else { + Ok(()) + } + } + /// Saves a copy of the current file content, skipping data at the provided /// prune indices. The prune Vec must be ordered. fn save_prune(&self, target: String, prune_offs: Vec, prune_len: u64) -> io::Result<()> { @@ -116,7 +168,7 @@ impl AppendOnlyFile { } as u64; // write the buffer, except if we prune offsets in the current span, - // in which case we skip + // in which case we skip let mut buf_start = 0; while prune_offs[prune_pos] >= read && prune_offs[prune_pos] < read + len { let prune_at = prune_offs[prune_pos] as usize; @@ -135,21 +187,15 @@ impl AppendOnlyFile { } } - /// Truncates the underlying file to the provided offset - fn truncate(&self, offs: u64) -> io::Result<()> { - let fd = self.file.as_raw_fd(); - let res = unsafe { ftruncate64(fd, offs as off64_t) }; - if res == -1 { - Err(io::Error::last_os_error()) - } else { - Ok(()) - } - } - /// Current size of the file in bytes. fn size(&self) -> io::Result { fs::metadata(&self.path).map(|md| md.len()) } + + /// Path of the underlying file + pub fn path(&self) -> String { + self.path.clone() + } } /// Log file fully cached in memory containing all positions that should be @@ -181,11 +227,12 @@ impl RemoveLog { } /// Truncate and empties the remove log. - fn truncate(&mut self, last_offs: u32) -> io::Result<()> { + fn rewind(&mut self, last_offs: u32) -> io::Result<()> { // simplifying assumption: we always remove older than what's in tmp self.removed_tmp = vec![]; - // DEBUG - let _ = self.flush_truncate(last_offs); + + // backing it up before truncating + self.removed_bak = self.removed.clone(); // backing it up before truncating self.removed_bak = self.removed.clone(); @@ -202,15 +249,6 @@ impl RemoveLog { Ok(()) } - // DEBUG: saves the remove log to the side before each truncate - fn flush_truncate(&mut self, last_offs: u32) -> io::Result<()> { - let mut file = File::create(format!("{}.{}", self.path.clone(), last_offs))?; - for elmt in &self.removed { - file.write_all(&ser::ser_vec(&elmt).unwrap()[..])?; - } - file.sync_data() - } - /// Append a set of new positions to the remove log. Both adds those /// positions the ordered in-memory set and to the file. fn append(&mut self, elmts: Vec, index: u32) -> io::Result<()> { @@ -291,12 +329,7 @@ where hashsum_file: AppendOnlyFile, remove_log: RemoveLog, pruned_nodes: pmmr::PruneList, - // buffers addition of new elements until they're fully written to disk - buffer: VecBackend, - buffer_index: usize, - // whether a rewind occurred since last flush, the rewind position, index - // and buffer index are captured - rewind: Option<(u64, u32, usize)>, + phantom: PhantomData, } impl Backend for PMMRBackend @@ -306,25 +339,19 @@ where /// Append the provided HashSums to the backend storage. #[allow(unused_variables)] fn append(&mut self, position: u64, data: Vec>) -> Result<(), String> { - self.buffer - .append(position - (self.buffer_index as u64), data.clone())?; + for d in data { + self.hashsum_file.append(&mut ser::ser_vec(&d).unwrap()); + } Ok(()) } /// Get a HashSum by insertion position fn get(&self, position: u64) -> Option> { - // First, check if it's in our temporary write buffer - let pos_sz = position as usize; - if pos_sz > self.buffer_index && pos_sz - 1 < self.buffer_index + self.buffer.len() { - return self.buffer.get((pos_sz - self.buffer_index) as u64); - } - - // Second, check if this position has been pruned in the remove log + // Check if this position has been pruned in the remove log or the + // pruned list if self.remove_log.includes(position) { return None; } - - // Third, check if it's in the pruned list or its offset let shift = self.pruned_nodes.get_shift(position); if let None = shift { return None; @@ -351,26 +378,19 @@ where } fn rewind(&mut self, position: u64, index: u32) -> Result<(), String> { - assert!(self.buffer.len() == 0, "Rewind on non empty buffer."); self.remove_log - .truncate(index) + .rewind(index) .map_err(|e| format!("Could not truncate remove log: {}", e))?; - self.rewind = Some((position, index, self.buffer_index)); - self.buffer_index = position as usize; + + let shift = self.pruned_nodes.get_shift(position).unwrap_or(0); + let record_len = 32 + T::sum_len(); + let file_pos = (position - shift) * (record_len as u64); + self.hashsum_file.rewind(file_pos); Ok(()) } /// Remove HashSums by insertion position fn remove(&mut self, positions: Vec, index: u32) -> Result<(), String> { - if self.buffer.used_size() > 0 { - for position in &positions { - let pos_sz = *position as usize; - if pos_sz > self.buffer_index && pos_sz - 1 < self.buffer_index + self.buffer.len() - { - self.buffer.remove(vec![*position - self.buffer_index as u64], index).unwrap(); - } - } - } self.remove_log.append(positions, index).map_err(|e| { format!("Could not write to log storage, disk full? {:?}", e) }) @@ -385,8 +405,6 @@ where /// store its files. pub fn new(data_dir: String) -> io::Result> { let hs_file = AppendOnlyFile::open(format!("{}/{}", data_dir, PMMR_DATA_FILE))?; - let sz = hs_file.size()?; - let record_len = 32 + T::sum_len(); let rm_log = RemoveLog::open(format!("{}/{}", data_dir, PMMR_RM_LOG_FILE))?; let prune_list = read_ordered_vec(format!("{}/{}", data_dir, PMMR_PRUNED_FILE), 8)?; @@ -394,12 +412,10 @@ where data_dir: data_dir, hashsum_file: hs_file, remove_log: rm_log, - buffer: VecBackend::new(), - buffer_index: (sz as usize) / record_len, pruned_nodes: pmmr::PruneList { pruned_nodes: prune_list, }, - rewind: None, + phantom: PhantomData, }) } @@ -415,45 +431,21 @@ where /// Syncs all files to disk. A call to sync is required to ensure all the /// data has been successfully written to disk. pub fn sync(&mut self) -> io::Result<()> { - // truncating the storage file if a rewind occurred - let record_len = 32 + T::sum_len() as u64; - if let Some((pos, _, _)) = self.rewind { - self.hashsum_file.truncate(pos * record_len)?; - } - - for elem in &self.buffer.elems { - let res = if let Some(ref hs) = *elem { - self.hashsum_file.append(&ser::ser_vec(&hs).unwrap()[..]) - } else { - // the element has alredy been pruned in the buffer, we just insert - // zeros until compaction to avoid wrong hashum store offsets - self.hashsum_file.append(&vec![0; record_len as usize]) - }; - - if let Err(e) = res { - return Err(io::Error::new( + if let Err(e) = self.hashsum_file.flush() { + return Err(io::Error::new( io::ErrorKind::Interrupted, format!("Could not write to log storage, disk full? {:?}", e), - )); - } + )); } - self.buffer_index = self.buffer_index + self.buffer.len(); - self.buffer.clear(); self.remove_log.flush()?; - self.hashsum_file.sync()?; - self.rewind = None; Ok(()) } /// Discard the current, non synced state of the backend. pub fn discard(&mut self) { - if let Some((_, _, bi)) = self.rewind { - self.buffer_index = bi; - } - self.buffer.clear(); + self.hashsum_file.discard(); self.remove_log.discard(); - self.rewind = None; } /// Checks the length of the remove log to see if it should get compacted. @@ -475,7 +467,7 @@ where } // 0. validate none of the nodes in the rm log are in the prune list (to - // avoid accidental double compaction) + // avoid accidental double compaction) for pos in &self.remove_log.removed[..] { if let None = self.pruned_nodes.pruned_pos(pos.0) { // TODO we likely can recover from this by directly jumping to 3 @@ -489,7 +481,7 @@ where } // 1. save hashsum file to a compact copy, skipping data that's in the - // remove list + // remove list let tmp_prune_file = format!("{}/{}.prune", self.data_dir, PMMR_DATA_FILE); let record_len = (32 + T::sum_len()) as u64; let to_rm = self.remove_log @@ -518,10 +510,9 @@ where format!("{}/{}", self.data_dir, PMMR_DATA_FILE), )?; self.hashsum_file = AppendOnlyFile::open(format!("{}/{}", self.data_dir, PMMR_DATA_FILE))?; - self.hashsum_file.sync()?; // 4. truncate the rm log - self.remove_log.truncate(0)?; + self.remove_log.rewind(0)?; self.remove_log.flush()?; Ok(()) diff --git a/util/Cargo.toml b/util/Cargo.toml index a45b65ebd..b53449a44 100644 --- a/util/Cargo.toml +++ b/util/Cargo.toml @@ -16,3 +16,5 @@ serde = "~1.0.8" serde_derive = "~1.0.8" secp256k1zkp = { git = "https://github.com/mimblewimble/rust-secp256k1-zkp", tag="grin_integration_7" } #secp256k1zkp = { path = "../../rust-secp256k1-zkp" } +walkdir = "^2.0.1" +zip = "^0.2.6" diff --git a/util/src/lib.rs b/util/src/lib.rs index 30b921b69..d9d26ad88 100644 --- a/util/src/lib.rs +++ b/util/src/lib.rs @@ -34,6 +34,8 @@ extern crate lazy_static; extern crate serde; #[macro_use] extern crate serde_derive; +extern crate walkdir; +extern crate zip as zip_rs; // Re-export so only has to be included once pub extern crate secp256k1zkp as secp_; @@ -59,6 +61,9 @@ use byteorder::{BigEndian, ByteOrder}; mod hex; pub use hex::*; +/// Compress and decompress zip bz2 archives +pub mod zip; + /// Encapsulation of a RefCell> for one-time initialization after /// construction. This implementation will purposefully fail hard if not used /// properly, for example if it's not initialized before being first used diff --git a/util/src/zip.rs b/util/src/zip.rs new file mode 100644 index 000000000..e5fcd9085 --- /dev/null +++ b/util/src/zip.rs @@ -0,0 +1,94 @@ +// Copyright 2017 The Grin Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// Wrappers around the `zip-rs` library to compress and decompress zip +/// bzip2 archives. + +use std::io; +use std::path::Path; +use std::fs::{self, File}; +use walkdir::WalkDir; + +use zip_rs; +use zip_rs::result::{ZipResult, ZipError}; +use zip_rs::write::FileOptions; + +/// Compress a source directory recursively into a zip file using the +/// bzip2 format. Permissions are set to 644 by default to avoid any +/// unwanted execution bits. +pub fn compress(src_dir: &Path, dst_file: &File) -> ZipResult<()> { + if !Path::new(src_dir).is_dir() { + return Err(ZipError::Io( + io::Error::new(io::ErrorKind::Other, "Source must be a directory."))); + } + + let options = FileOptions::default() + .compression_method(zip_rs::CompressionMethod::Bzip2) + .unix_permissions(0o644); + + let mut zip = zip_rs::ZipWriter::new(dst_file); + let walkdir = WalkDir::new(src_dir.to_str().unwrap()); + let it = walkdir.into_iter(); + + for dent in it.filter_map(|e| e.ok()) { + let path = dent.path(); + let name = path.strip_prefix(Path::new(src_dir)) + .unwrap() + .to_str() + .unwrap(); + + if path.is_file() { + zip.start_file(name, options)?; + let mut f = File::open(path)?; + io::copy(&mut f, &mut zip)?; + } + } + + zip.finish()?; + dst_file.sync_all()?; + Ok(()) +} + +/// Decompress a source file into the provided destination path. +pub fn decompress(src_file: R, dest: &Path) -> ZipResult<()> where R: io::Read + io::Seek { + let mut archive = zip_rs::ZipArchive::new(src_file)?; + + for i in 0..archive.len() { + let mut file = archive.by_index(i)?; + let file_path = dest.join(file.name()); + + if (&*file.name()).ends_with('/') { + fs::create_dir_all(&file_path)?; + } else { + if let Some(p) = file_path.parent() { + if !p.exists() { + fs::create_dir_all(&p)?; + } + } + let mut outfile = fs::File::create(&file_path)?; + io::copy(&mut file, &mut outfile)?; + } + + // Get and Set permissions + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + if let Some(mode) = file.unix_mode() { + fs::set_permissions(&file_path.to_str().unwrap(), PermissionsExt::from_mode(mode))?; + } + } + + } + Ok(()) +}