[WIP] Abridged sync (#440)

* Util to zip and unzip directories
* First pass at sumtree request/response. Add message types, implement the exchange in the protocol, zip up the sumtree directory and stream the file over, with necessary adapter hooks.
* Implement the sumtree archive receive logicGets the sumtree archive data stream from the network and write it to a file. Unzip the file, place it at the right spot and reconstruct the sumtree data structure, rewinding where to the right spot.
* Sumtree hash structure validation
* Simplify sumtree backend buffering logic. The backend for a sumtree has to implement some in-memory buffering logic to provide a commit/rollback interface. The backend itself is an aggregate of 3 underlying storages (an append only file, a remove log and a skip list). The buffering was previously implemented both by the backend and some of the underlying storages. Now pushing back all buffering logic to the storages to keep the backend simpler.
* Add kernel append only store file to sumtrees. The chain sumtrees structure now also saves all kernels to a dedicated file. As that storage is implemented by the append only file wrapper, it's also rewind-aware.
* Full state validation. Checks that:
    - MMRs are sane (hash and sum each node)
    - Tree roots match the corresponding header
    - Kernel signatures are valid
    - Sum of all kernel excesses equals the sum of UTXO commitments
minus the supply
* Fast sync handoff to body sync. Once the fast-sync state is fully setup, get bacj in body sync
mode to get the full bodies of the last blocks we're missing.
* First fully working fast sync
* Facility in p2p conn to deal with attachments (raw binary after message).
* Re-introduced sumtree send and receive message handling using the above.
* Fixed test and finished updating all required db state after sumtree validation.
* Massaged a little bit the pipeline orphan check to still work after the new sumtrees have been setup.
* Various cleanup. Consolidated fast sync and full sync into a single function as they're very similar. Proper conditions to trigger a sumtree request and some checks on receiving it.
This commit is contained in:
Ignotus Peverell 2018-02-09 22:32:16 +00:00 committed by GitHub
parent 75f721039e
commit 22c521eec8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
28 changed files with 1104 additions and 228 deletions

View file

@ -16,13 +16,16 @@
//! and mostly the chain pipeline.
use std::collections::HashMap;
use std::fs::File;
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant};
use util::secp::pedersen::RangeProof;
use core::core::{Input, OutputIdentifier, SumCommit};
use core::core::hash::Hashed;
use core::core::pmmr::{HashSum, NoSum};
use core::global;
use core::core::{Block, BlockHeader, TxKernel};
use core::core::target::Difficulty;
@ -112,6 +115,7 @@ impl OrphanBlockPool {
/// the current view of the UTXO set according to the chain state. Also
/// maintains locking for the pipeline to avoid conflicting processing.
pub struct Chain {
db_root: String,
store: Arc<ChainStore>,
adapter: Arc<ChainAdapter>,
@ -183,9 +187,10 @@ impl Chain {
);
let store = Arc::new(chain_store);
let sumtrees = sumtree::SumTrees::open(db_root, store.clone())?;
let sumtrees = sumtree::SumTrees::open(db_root.clone(), store.clone())?;
Ok(Chain {
db_root: db_root,
store: store,
adapter: adapter,
head: Arc::new(Mutex::new(head)),
@ -194,25 +199,26 @@ impl Chain {
pow_verifier: pow_verifier,
})
}
/// Processes a single block, then checks for orphans, processing
/// those as well if they're found
pub fn process_block(&self, b: Block, opts: Options)
-> Result<(Option<Tip>, Option<Block>), Error>
{
let res = self.process_block_no_orphans(b, opts);
match res {
Ok((t, b)) => {
// We accepted a block, so see if we can accept any orphans
if b.is_some() {
self.check_orphans(&b.clone().unwrap());
/// Processes a single block, then checks for orphans, processing
/// those as well if they're found
pub fn process_block(&self, b: Block, opts: Options)
-> Result<(Option<Tip>, Option<Block>), Error>
{
let res = self.process_block_no_orphans(b, opts);
match res {
Ok((t, b)) => {
// We accepted a block, so see if we can accept any orphans
if let Some(ref b) = b {
self.check_orphans(b.hash());
}
Ok((t, b))
},
Err(e) => {
Err(e)
}
}
Ok((t, b))
},
Err(e) => {
Err(e)
}
}
}
/// Attempt to add a new block to the chain. Returns the new chain tip if it
/// has been added to the longest chain, None if it's added to an (as of
@ -348,13 +354,12 @@ pub fn process_block(&self, b: Block, opts: Options)
/// Check for orphans, once a block is successfully added
pub fn check_orphans(&self, block: &Block) {
pub fn check_orphans(&self, mut last_block_hash: Hash) {
debug!(
LOGGER,
"chain: check_orphans: # orphans {}",
self.orphans.len(),
);
let mut last_block_hash = block.hash();
// Is there an orphan in our orphans that we can now process?
// We just processed the given block, are there any orphans that have this block
// as their "previous" block?
@ -390,6 +395,14 @@ pub fn process_block(&self, b: Block, opts: Options)
sumtrees.is_unspent(output_ref)
}
pub fn validate(&self) -> Result<(), Error> {
let header = self.store.head_header()?;
let mut sumtrees = self.sumtrees.write().unwrap();
sumtree::extending(&mut sumtrees, |extension| {
extension.validate(&header)
})
}
/// Check if the input has matured sufficiently for the given block height.
/// This only applies to inputs spending coinbase outputs.
/// An input spending a non-coinbase output will always pass this check.
@ -432,6 +445,76 @@ pub fn process_block(&self, b: Block, opts: Options)
sumtrees.roots()
}
/// Provides a reading view into the current sumtree state as well as
/// the required indexes for a consumer to rewind to a consistent state
/// at the provided block hash.
pub fn sumtrees_read(&self, h: Hash) -> Result<(u64, u64, File), Error> {
let b = self.get_block(&h)?;
// get the indexes for the block
let out_index: u64;
let kernel_index: u64;
{
let sumtrees = self.sumtrees.read().unwrap();
let (oi, ki) = sumtrees.indexes_at(&b)?;
out_index = oi;
kernel_index = ki;
}
// prepares the zip and return the corresponding Read
let sumtree_reader = sumtree::zip_read(self.db_root.clone())?;
Ok((out_index, kernel_index, sumtree_reader))
}
/// Writes a reading view on a sumtree state that's been provided to us.
/// If we're willing to accept that new state, the data stream will be
/// read as a zip file, unzipped and the resulting state files should be
/// rewound to the provided indexes.
pub fn sumtrees_write(
&self,
h: Hash,
rewind_to_output: u64,
rewind_to_kernel: u64,
sumtree_data: File
) -> Result<(), Error> {
let head = self.head().unwrap();
let header_head = self.get_header_head().unwrap();
if header_head.height - head.height < global::cut_through_horizon() as u64 {
return Err(Error::InvalidSumtree("not needed".to_owned()));
}
let header = self.store.get_block_header(&h)?;
sumtree::zip_write(self.db_root.clone(), sumtree_data)?;
let mut sumtrees = sumtree::SumTrees::open(self.db_root.clone(), self.store.clone())?;
sumtree::extending(&mut sumtrees, |extension| {
extension.rewind_pos(header.height, rewind_to_output, rewind_to_kernel)?;
extension.validate(&header)?;
// TODO validate kernels and their sums with UTXOs
extension.rebuild_index()?;
Ok(())
})?;
// replace the chain sumtrees with the newly built one
{
let mut sumtrees_ref = self.sumtrees.write().unwrap();
*sumtrees_ref = sumtrees;
}
// setup new head
{
let mut head = self.head.lock().unwrap();
*head = Tip::from_block(&header);
self.store.save_body_head(&head);
self.store.save_header_height(&header)?;
}
self.check_orphans(header.hash());
Ok(())
}
/// returns the last n nodes inserted into the utxo sum tree
pub fn get_last_n_utxo(&self, distance: u64) -> Vec<HashSum<SumCommit>> {
let mut sumtrees = self.sumtrees.write().unwrap();
@ -455,6 +538,11 @@ pub fn process_block(&self, b: Block, opts: Options)
self.head.lock().unwrap().clone().total_difficulty
}
/// Total difficulty at the head of the header chain
pub fn total_header_difficulty(&self) -> Result<Difficulty, Error> {
Ok(self.store.get_header_head()?.total_difficulty)
}
/// Reset header_head and sync_head to head of current body chain
pub fn reset_head(&self) -> Result<(), Error> {
self.store

View file

@ -64,19 +64,23 @@ pub fn process_block(b: &Block, mut ctx: BlockContext) -> Result<Option<Tip>, Er
validate_header(&b.header, &mut ctx)?;
// valid header, now check we actually have the previous block in the store
// valid header, now check we actually have the previous block in the store
// not just the header but the block itself
// we cannot assume we can use the chain head for this as we may be dealing with a fork
// we cannot use heights here as the fork may have jumped in height
match ctx.store.get_block(&b.header.previous) {
Ok(_) => {},
Err(grin_store::Error::NotFoundErr) => {
return Err(Error::Orphan);
},
Err(e) => {
return Err(Error::StoreErr(e, "pipe get previous".to_owned()));
// short circuit the test first both for performance (in-mem vs db access)
// but also for the specific case of the first fast sync full block
if b.header.previous != ctx.head.last_block_h {
// we cannot assume we can use the chain head for this as we may be dealing with a fork
// we cannot use heights here as the fork may have jumped in height
match ctx.store.block_exists(&b.header.previous) {
Ok(true) => {},
Ok(false) => {
return Err(Error::Orphan);
},
Err(e) => {
return Err(Error::StoreErr(e, "pipe get previous".to_owned()));
}
}
};
}
// valid header and we have a previous block, time to take the lock on the sum trees
let local_sumtrees = ctx.sumtrees.clone();

View file

@ -24,6 +24,7 @@ use core::core::{Block, BlockHeader};
use core::consensus::TargetError;
use core::core::target::Difficulty;
use grin_store::{self, option_to_not_found, to_key, Error, u64_to_key};
use util::LOGGER;
const STORE_SUBPATH: &'static str = "chain";
@ -98,6 +99,10 @@ impl ChainStore for ChainKVStore {
option_to_not_found(self.db.get_ser(&to_key(BLOCK_PREFIX, &mut h.to_vec())))
}
fn block_exists(&self, h: &Hash) -> Result<bool, Error> {
self.db.exists(&to_key(BLOCK_PREFIX, &mut h.to_vec()))
}
fn get_block_header(&self, h: &Hash) -> Result<BlockHeader, Error> {
option_to_not_found(
self.db.get_ser(&to_key(BLOCK_HEADER_PREFIX, &mut h.to_vec())),
@ -139,6 +144,11 @@ impl ChainStore for ChainKVStore {
option_to_not_found(self.db.get_ser(&u64_to_key(HEADER_HEIGHT_PREFIX, height)))
}
fn save_header_height(&self, bh: &BlockHeader) -> Result<(), Error> {
self.db
.put_ser(&u64_to_key(HEADER_HEIGHT_PREFIX, bh.height), bh)
}
fn delete_header_by_height(&self, height: u64) -> Result<(), Error> {
self.db.delete(&u64_to_key(HEADER_HEIGHT_PREFIX, height))
}

View file

@ -17,23 +17,31 @@
use std::fs;
use std::collections::HashMap;
use std::path::Path;
use std::fs::File;
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use core::core::{Block, SumCommit, Input, Output, OutputIdentifier, TxKernel, OutputFeatures};
use core::core::pmmr::{HashSum, NoSum, Summable, PMMR};
use util::{secp, static_secp_instance};
use util::secp::pedersen::{RangeProof, Commitment};
use core::consensus::reward;
use core::core::{Block, BlockHeader, SumCommit, Input, Output, OutputIdentifier, OutputFeatures, TxKernel};
use core::core::pmmr::{self, HashSum, NoSum, Summable, PMMR};
use core::core::hash::Hashed;
use core::ser::{self, Readable};
use grin_store;
use grin_store::sumtree::PMMRBackend;
use grin_store::sumtree::{PMMRBackend, AppendOnlyFile};
use types::ChainStore;
use types::Error;
use util::LOGGER;
use util::secp::pedersen::{RangeProof, Commitment};
use util::{LOGGER, zip};
const SUMTREES_SUBDIR: &'static str = "sumtrees";
const UTXO_SUBDIR: &'static str = "utxo";
const RANGE_PROOF_SUBDIR: &'static str = "rangeproof";
const KERNEL_SUBDIR: &'static str = "kernel";
const KERNEL_FILE: &'static str = "kernel_full_data.bin";
const SUMTREES_ZIP: &'static str = "sumtrees_snapshot.zip";
struct PMMRHandle<T>
where
@ -68,10 +76,14 @@ where
/// guaranteed to indicate whether an output is spent or not. The index
/// may have commitments that have already been spent, even with
/// pruning enabled.
///
/// In addition of the sumtrees, this maintains the full list of kernel
/// data so it can be easily packaged for sync or validation.
pub struct SumTrees {
output_pmmr_h: PMMRHandle<SumCommit>,
rproof_pmmr_h: PMMRHandle<NoSum<RangeProof>>,
kernel_pmmr_h: PMMRHandle<NoSum<TxKernel>>,
kernel_file: AppendOnlyFile,
// chain store used as index of commitments to MMR positions
commit_index: Arc<ChainStore>,
@ -80,10 +92,16 @@ pub struct SumTrees {
impl SumTrees {
/// Open an existing or new set of backends for the SumTrees
pub fn open(root_dir: String, commit_index: Arc<ChainStore>) -> Result<SumTrees, Error> {
let mut kernel_file_path: PathBuf = [&root_dir, SUMTREES_SUBDIR, KERNEL_SUBDIR].iter().collect();
fs::create_dir_all(kernel_file_path.clone())?;
kernel_file_path.push(KERNEL_FILE);
let kernel_file = AppendOnlyFile::open(kernel_file_path.to_str().unwrap().to_owned())?;
Ok(SumTrees {
output_pmmr_h: PMMRHandle::new(root_dir.clone(), UTXO_SUBDIR)?,
rproof_pmmr_h: PMMRHandle::new(root_dir.clone(), RANGE_PROOF_SUBDIR)?,
kernel_pmmr_h: PMMRHandle::new(root_dir.clone(), KERNEL_SUBDIR)?,
kernel_file: kernel_file,
commit_index: commit_index,
})
}
@ -163,6 +181,11 @@ impl SumTrees {
kernel_pmmr.get_last_n_insertions(distance)
}
/// Output and kernel MMR indexes at the end of the provided block
pub fn indexes_at(&self, block: &Block) -> Result<(u64, u64), Error> {
indexes_at(block, self.commit_index.deref())
}
/// Get sum tree roots
pub fn roots(
&mut self,
@ -193,10 +216,12 @@ where
let res: Result<T, Error>;
let rollback: bool;
{
debug!(LOGGER, "Starting new sumtree extension.");
let commit_index = trees.commit_index.clone();
debug!(LOGGER, "Starting new sumtree extension.");
let mut extension = Extension::new(trees, commit_index);
res = inner(&mut extension);
rollback = extension.rollback;
if res.is_ok() && !rollback {
extension.save_pos_index()?;
@ -209,6 +234,7 @@ where
trees.output_pmmr_h.backend.discard();
trees.rproof_pmmr_h.backend.discard();
trees.kernel_pmmr_h.backend.discard();
trees.kernel_file.discard();
Err(e)
}
Ok(r) => {
@ -217,11 +243,13 @@ where
trees.output_pmmr_h.backend.discard();
trees.rproof_pmmr_h.backend.discard();
trees.kernel_pmmr_h.backend.discard();
trees.kernel_file.discard();
} else {
debug!(LOGGER, "Committing sumtree extension.");
trees.output_pmmr_h.backend.sync()?;
trees.rproof_pmmr_h.backend.sync()?;
trees.kernel_pmmr_h.backend.sync()?;
trees.kernel_file.flush()?;
trees.output_pmmr_h.last_pos = sizes.0;
trees.rproof_pmmr_h.last_pos = sizes.1;
trees.kernel_pmmr_h.last_pos = sizes.2;
@ -241,6 +269,7 @@ pub struct Extension<'a> {
rproof_pmmr: PMMR<'a, NoSum<RangeProof>, PMMRBackend<NoSum<RangeProof>>>,
kernel_pmmr: PMMR<'a, NoSum<TxKernel>, PMMRBackend<NoSum<TxKernel>>>,
kernel_file: &'a mut AppendOnlyFile,
commit_index: Arc<ChainStore>,
new_output_commits: HashMap<Commitment, u64>,
new_kernel_excesses: HashMap<Commitment, u64>,
@ -249,7 +278,11 @@ pub struct Extension<'a> {
impl<'a> Extension<'a> {
// constructor
fn new(trees: &'a mut SumTrees, commit_index: Arc<ChainStore>) -> Extension<'a> {
fn new(
trees: &'a mut SumTrees,
commit_index: Arc<ChainStore>,
) -> Extension<'a> {
Extension {
output_pmmr: PMMR::at(
&mut trees.output_pmmr_h.backend,
@ -263,6 +296,7 @@ impl<'a> Extension<'a> {
&mut trees.kernel_pmmr_h.backend,
trees.kernel_pmmr_h.last_pos,
),
kernel_file: &mut trees.kernel_file,
commit_index: commit_index,
new_output_commits: HashMap::new(),
new_kernel_excesses: HashMap::new(),
@ -407,15 +441,18 @@ impl<'a> Extension<'a> {
}
}
}
// push kernels in their MMR
// push kernels in their MMR and file
let pos = self.kernel_pmmr
.push(NoSum(kernel.clone()))
.map_err(&Error::SumTreeErr)?;
self.new_kernel_excesses.insert(kernel.excess, pos);
self.kernel_file.append(&mut ser::ser_vec(&kernel).unwrap());
Ok(())
}
/// Rewinds the MMRs to the provided position, given the last output and
/// Rewinds the MMRs to the provided block, using the last output and
/// last kernel of the block we want to rewind to.
pub fn rewind(&mut self, block: &Block) -> Result<(), Error> {
debug!(
@ -425,22 +462,23 @@ impl<'a> Extension<'a> {
block.header.height,
);
let out_pos_rew = match block.outputs.last() {
Some(output) => self.get_output_pos(&output.commitment())
.map_err(|e| {
Error::StoreErr(e, format!("missing output pos for known block"))
})?,
None => 0,
};
// rewind each MMR
let (out_pos_rew, kern_pos_rew) = indexes_at(block, self.commit_index.deref())?;
self.rewind_pos(block.header.height, out_pos_rew, kern_pos_rew)?;
// rewind the kernel file store, the position is the number of kernels
// multiplied by their size
// the number of kernels is the number of leaves in the MMR, which is the
// sum of the number of leaf nodes under each peak in the MMR
let pos: u64 = pmmr::peaks(kern_pos_rew).iter().map(|n| (1 << n) as u64).sum();
self.kernel_file.rewind(pos * (TxKernel::size() as u64));
let kern_pos_rew = match block.kernels.last() {
Some(kernel) => self.get_kernel_pos(&kernel.excess)
.map_err(|e| {
Error::StoreErr(e, format!("missing kernel pos for known block"))
})?,
None => 0,
};
Ok(())
}
/// Rewinds the MMRs to the provided positions, given the output and
/// kernel we want to rewind to.
pub fn rewind_pos(&mut self, height: u64, out_pos_rew: u64, kern_pos_rew: u64) -> Result<(), Error> {
debug!(
LOGGER,
"Rewind sumtrees to output pos: {}, kernel pos: {}",
@ -448,8 +486,6 @@ impl<'a> Extension<'a> {
kern_pos_rew,
);
let height = block.header.height;
self.output_pmmr
.rewind(out_pos_rew, height as u32)
.map_err(&Error::SumTreeErr)?;
@ -496,14 +532,67 @@ impl<'a> Extension<'a> {
)
}
/// Validate the current sumtree state against a block header
pub fn validate(&self, header: &BlockHeader) -> Result<(), Error> {
// validate all hashes and sums within the trees
if let Err(e) = self.output_pmmr.validate() {
return Err(Error::InvalidSumtree(e));
}
if let Err(e) = self.rproof_pmmr.validate() {
return Err(Error::InvalidSumtree(e));
}
if let Err(e) = self.kernel_pmmr.validate() {
return Err(Error::InvalidSumtree(e));
}
// validate the tree roots against the block header
let (utxo_root, rproof_root, kernel_root) = self.roots();
if utxo_root.hash != header.utxo_root || rproof_root.hash != header.range_proof_root
|| kernel_root.hash != header.kernel_root
{
return Err(Error::InvalidRoot);
}
// the real magicking: the sum of all kernel excess should equal the sum
// of all UTXO commitments, minus the total supply
let (kernel_sum, fees) = self.sum_kernels()?;
let utxo_sum = self.sum_utxos()?;
{
let secp = static_secp_instance();
let secp = secp.lock().unwrap();
let over_commit = secp.commit_value(header.height * reward(0) - fees / 2)?;
let adjusted_sum_utxo = secp.commit_sum(vec![utxo_sum], vec![over_commit])?;
if adjusted_sum_utxo != kernel_sum {
return Err(Error::InvalidSumtree("Differing UTXO commitment and kernel excess sums.".to_owned()));
}
}
Ok(())
}
/// Rebuild the index of MMR positions to the corresponding UTXO and kernel
/// by iterating over the whole MMR data. This is a costly operation
/// performed only when we receive a full new chain state.
pub fn rebuild_index(&self) -> Result<(), Error> {
for n in 1..self.output_pmmr.unpruned_size()+1 {
// non-pruned leaves only
if pmmr::bintree_postorder_height(n) == 0 {
if let Some(hs) = self.output_pmmr.get(n) {
self.commit_index.save_output_pos(&hs.sum.commit, n)?;
}
}
}
Ok(())
}
/// Force the rollback of this extension, no matter the result
pub fn force_rollback(&mut self) {
self.rollback = true;
}
/// Dumps the state of the 3 sum trees to stdout for debugging. Short
/// version
/// only prints the UTXO tree.
/// version only prints the UTXO tree.
pub fn dump(&self, short: bool) {
debug!(LOGGER, "-- outputs --");
self.output_pmmr.dump(short);
@ -523,4 +612,109 @@ impl<'a> Extension<'a> {
self.kernel_pmmr.unpruned_size(),
)
}
/// Sums the excess of all our kernels, validating their signatures on the way
fn sum_kernels(&self) -> Result<(Commitment, u64), Error> {
// make sure we have the right count of kernels using the MMR, the storage
// file may have a few more
let mmr_sz = self.kernel_pmmr.unpruned_size();
let count: u64 = pmmr::peaks(mmr_sz).iter().map(|n| {
(1 << pmmr::bintree_postorder_height(*n)) as u64
}).sum();
let mut kernel_file = File::open(self.kernel_file.path())?;
let first: TxKernel = ser::deserialize(&mut kernel_file)?;
first.verify()?;
let mut sum_kernel = first.excess;
let mut fees = first.fee;
let secp = static_secp_instance();
let mut kern_count = 1;
loop {
match ser::deserialize::<TxKernel>(&mut kernel_file) {
Ok(kernel) => {
kernel.verify()?;
let secp = secp.lock().unwrap();
sum_kernel = secp.commit_sum(vec![sum_kernel, kernel.excess], vec![])?;
fees += kernel.fee;
kern_count += 1;
if kern_count == count {
break;
}
}
Err(_) => break,
}
}
debug!(LOGGER, "Validated and summed {} kernels", kern_count);
Ok((sum_kernel, fees))
}
/// Sums all our UTXO commitments
fn sum_utxos(&self) -> Result<Commitment, Error> {
let mut sum_utxo = None;
let mut utxo_count = 0;
let secp = static_secp_instance();
for n in 1..self.output_pmmr.unpruned_size()+1 {
if pmmr::bintree_postorder_height(n) == 0 {
if let Some(hs) = self.output_pmmr.get(n) {
if n == 1 {
sum_utxo = Some(hs.sum.commit);
} else {
let secp = secp.lock().unwrap();
sum_utxo = Some(secp.commit_sum(vec![sum_utxo.unwrap(), hs.sum.commit], vec![])?);
}
utxo_count += 1;
}
}
}
debug!(LOGGER, "Summed {} UTXOs", utxo_count);
Ok(sum_utxo.unwrap())
}
}
/// Output and kernel MMR indexes at the end of the provided block
fn indexes_at(block: &Block, commit_index: &ChainStore) -> Result<(u64, u64), Error> {
let out_idx = match block.outputs.last() {
Some(output) => commit_index.get_output_pos(&output.commitment())
.map_err(|e| {
Error::StoreErr(e, format!("missing output pos for known block"))
})?,
None => 0,
};
let kern_idx = match block.kernels.last() {
Some(kernel) => commit_index.get_kernel_pos(&kernel.excess)
.map_err(|e| {
Error::StoreErr(e, format!("missing kernel pos for known block"))
})?,
None => 0,
};
Ok((out_idx, kern_idx))
}
/// Packages the sumtree data files into a zip and returns a Read to the
/// resulting file
pub fn zip_read(root_dir: String) -> Result<File, Error> {
let sumtrees_path = Path::new(&root_dir).join(SUMTREES_SUBDIR);
let zip_path = Path::new(&root_dir).join(SUMTREES_ZIP);
// create the zip archive
{
zip::compress(&sumtrees_path, &File::create(zip_path.clone())?)
.map_err(|ze| Error::Other(ze.to_string()))?;
}
// open it again to read it back
let zip_file = File::open(zip_path)?;
Ok(zip_file)
}
/// Extract the sumtree data from a zip file and writes the content into the
/// sumtree storage dir
pub fn zip_write(root_dir: String, sumtree_data: File) -> Result<(), Error> {
let sumtrees_path = Path::new(&root_dir).join(SUMTREES_SUBDIR);
fs::create_dir_all(sumtrees_path.clone())?;
zip::decompress(sumtree_data, &sumtrees_path)
.map_err(|ze| Error::Other(ze.to_string()))
}

View file

@ -16,6 +16,7 @@
use std::io;
use util::secp;
use util::secp::pedersen::Commitment;
use grin_store as store;
@ -76,6 +77,8 @@ pub enum Error {
OutputSpent,
/// Invalid block version, either a mistake or outdated software
InvalidBlockVersion(u16),
/// We've been provided a bad sumtree
InvalidSumtree(String),
/// Internal issue when trying to save or load data from store
StoreErr(grin_store::Error, String),
/// Error serializing or deserializing a type
@ -105,10 +108,15 @@ impl From<io::Error> for Error {
Error::SumTreeErr(e.to_string())
}
}
impl From<secp::Error> for Error {
fn from(e: secp::Error) -> Error {
Error::SumTreeErr(format!("Sum validation error: {}", e.to_string()))
}
}
impl Error {
/// Whether the error is due to a block that was intrinsically wrong
pub fn is_bad_block(&self) -> bool {
pub fn is_bad_data(&self) -> bool {
// shorter to match on all the "not the block's fault" errors
match *self {
Error::Unfit(_) |
@ -211,6 +219,9 @@ pub trait ChainStore: Send + Sync {
/// Gets a block header by hash
fn get_block(&self, h: &Hash) -> Result<Block, store::Error>;
/// Check whether we have a block without reading it
fn block_exists(&self, h: &Hash) -> Result<bool, store::Error>;
/// Gets a block header by hash
fn get_block_header(&self, h: &Hash) -> Result<BlockHeader, store::Error>;
@ -238,6 +249,9 @@ pub trait ChainStore: Send + Sync {
/// Gets the block header at the provided height
fn get_header_by_height(&self, height: u64) -> Result<BlockHeader, store::Error>;
/// Save a header as associated with its height
fn save_header_height(&self, header: &BlockHeader) -> Result<(), store::Error>;
/// Delete the block header at the height
fn delete_header_by_height(&self, height: u64) -> Result<(), store::Error>;

View file

@ -118,6 +118,7 @@ fn mine_empty_chain() {
let header_by_height = chain.get_header_by_height(n).unwrap();
assert_eq!(header_by_height.hash(), bhash);
}
chain.validate().unwrap();
}
#[test]

View file

@ -96,6 +96,7 @@ pub struct GlobalConfig {
#[derive(Debug, Serialize, Deserialize)]
pub struct ConfigMembers {
/// Server config
#[serde(default)]
pub server: ServerConfig,
/// Mining config
pub mining: Option<MinerConfig>,

View file

@ -47,7 +47,7 @@ use util::LOGGER;
/// the tree can sum over
pub trait Summable {
/// The type of the sum
type Sum: Clone + ops::Add<Output = Self::Sum> + Readable + Writeable;
type Sum: Clone + ops::Add<Output = Self::Sum> + Readable + Writeable + PartialEq;
/// Obtain the sum of the element
fn sum(&self) -> Self::Sum;
@ -80,6 +80,12 @@ impl Writeable for NullSum {
}
}
impl PartialEq for NullSum {
fn eq(&self, _other: &NullSum) -> bool {
true
}
}
/// Wrapper for a type that allows it to be inserted in a tree without summing
#[derive(Clone, Debug)]
pub struct NoSum<T>(pub T);
@ -103,7 +109,7 @@ where
/// A utility type to handle (Hash, Sum) pairs more conveniently. The addition
/// of two HashSums is the (Hash(h1|h2), h1 + h2) HashSum.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, Eq)]
pub struct HashSum<T>
where
T: Summable,
@ -130,6 +136,15 @@ where
}
}
impl<T> PartialEq for HashSum<T>
where
T: Summable,
{
fn eq(&self, other: &HashSum<T>) -> bool {
self.hash == other.hash && self.sum == other.sum
}
}
impl<T> Readable for HashSum<T>
where
T: Summable,
@ -382,6 +397,32 @@ where
return_vec
}
/// Walks all unpruned nodes in the MMR and revalidate all parent hashes
/// and sums.
pub fn validate(&self) -> Result<(), String> {
// iterate on all parent nodes
for n in 1..(self.last_pos + 1) {
if bintree_postorder_height(n) > 0 {
if let Some(hs) = self.get(n) {
// take the left and right children, if they exist
let left_pos = bintree_move_down_left(n).unwrap();
let right_pos = bintree_jump_right_sibling(left_pos);
if let Some(left_child_hs) = self.get(left_pos) {
if let Some(right_child_hs) = self.get(right_pos) {
// sum and compare
if left_child_hs + right_child_hs != hs {
return Err(format!("Invalid MMR, hashsum of parent at {} does \
not match children.", n));
}
}
}
}
}
}
Ok(())
}
/// Total size of the tree, including intermediary nodes an ignoring any
/// pruning.
pub fn unpruned_size(&self) -> u64 {
@ -583,7 +624,7 @@ impl PruneList {
/// node's position. Starts with the top peak, which is always on the left
/// side of the range, and navigates toward lower siblings toward the right
/// of the range.
fn peaks(num: u64) -> Vec<u64> {
pub fn peaks(num: u64) -> Vec<u64> {
// detecting an invalid mountain range, when siblings exist but no parent
// exists
if bintree_postorder_height(num + 1) > bintree_postorder_height(num) {

View file

@ -167,6 +167,13 @@ impl TxKernel {
}
Ok(())
}
/// Size in bytes of a kernel, necessary for binary storage
pub fn size() -> usize {
17 + // features plus fee and lock_height
secp::constants::PEDERSEN_COMMITMENT_SIZE +
secp::constants::AGG_SIGNATURE_SIZE
}
}
/// A transaction
@ -770,7 +777,7 @@ impl Readable for OutputIdentifier {
}
/// Wrapper to Output commitments to provide the Summable trait.
#[derive(Serialize, Deserialize, Debug, Clone)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub struct SumCommit {
/// Output features (coinbase vs. regular transaction output)
/// We need to include this when hashing to ensure coinbase maturity can be enforced.

View file

@ -26,7 +26,7 @@ use consensus::PROOFSIZE;
use consensus::DEFAULT_SIZESHIFT;
use consensus::COINBASE_MATURITY;
use consensus::{MEDIAN_TIME_WINDOW, INITIAL_DIFFICULTY,
BLOCK_TIME_SEC, DIFFICULTY_ADJUST_WINDOW};
BLOCK_TIME_SEC, DIFFICULTY_ADJUST_WINDOW, CUT_THROUGH_HORIZON};
use core::target::Difficulty;
use consensus::TargetError;
@ -51,6 +51,9 @@ pub const AUTOMATED_TESTING_COINBASE_MATURITY: u64 = 3;
/// User testing coinbase maturity
pub const USER_TESTING_COINBASE_MATURITY: u64 = 3;
/// Testing cut through horizon in blocks
pub const TESTING_CUT_THROUGH_HORIZON: u32 = 20;
/// Testing initial block difficulty
pub const TESTING_INITIAL_DIFFICULTY: u64 = 1;
@ -163,6 +166,18 @@ pub fn initial_block_difficulty() -> u64 {
}
}
/// Horizon at which we can cut-through and do full local pruning
pub fn cut_through_horizon() -> u32 {
let param_ref = CHAIN_TYPE.read().unwrap();
match *param_ref {
ChainTypes::AutomatedTesting => TESTING_CUT_THROUGH_HORIZON,
ChainTypes::UserTesting => TESTING_CUT_THROUGH_HORIZON,
ChainTypes::Testnet1 => CUT_THROUGH_HORIZON,
ChainTypes::Testnet2 => CUT_THROUGH_HORIZON,
ChainTypes::Mainnet => CUT_THROUGH_HORIZON,
}
}
/// Are we in automated testing mode?
pub fn is_automated_testing_mode() -> bool {
let param_ref = CHAIN_TYPE.read().unwrap();

View file

@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fs::File;
use std::net::SocketAddr;
use std::sync::{Arc, RwLock};
use std::sync::atomic::{AtomicBool, Ordering};
@ -123,7 +124,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
if let &Err(ref e) = &res {
debug!(LOGGER, "Block header {} refused by chain: {:?}", bhash, e);
if e.is_bad_block() {
if e.is_bad_data() {
debug!(LOGGER, "header_received: {} is a bad header, resetting header head", bhash);
let _ = self.chain.reset_head();
return false;
@ -183,10 +184,14 @@ impl p2p::ChainAdapter for NetToChainAdapter {
}
}
}
let header_head = self.chain.get_header_head().unwrap();
info!(
LOGGER,
"Added {} headers to the header chain.",
added_hs.len()
"Added {} headers to the header chain. Last: {} at {}.",
added_hs.len(),
header_head.last_block_h,
header_head.height,
);
}
@ -241,6 +246,41 @@ impl p2p::ChainAdapter for NetToChainAdapter {
}
}
/// Provides a reading view into the current sumtree state as well as
/// the required indexes for a consumer to rewind to a consistant state
/// at the provided block hash.
fn sumtrees_read(&self, h: Hash) -> Option<p2p::SumtreesRead> {
match self.chain.sumtrees_read(h.clone()) {
Ok((out_index, kernel_index, read)) => Some(p2p::SumtreesRead {
output_index: out_index,
kernel_index: kernel_index,
reader: read,
}),
Err(e) => {
warn!(LOGGER, "Couldn't produce sumtrees data for block {}: {:?}",
h, e);
None
}
}
}
/// Writes a reading view on a sumtree state that's been provided to us.
/// If we're willing to accept that new state, the data stream will be
/// read as a zip file, unzipped and the resulting state files should be
/// rewound to the provided indexes.
fn sumtrees_write(&self, h: Hash,
rewind_to_output: u64, rewind_to_kernel: u64,
sumtree_data: File, peer_addr: SocketAddr) -> bool {
// TODO check whether we should accept any sumtree now
if let Err(e) = self.chain.sumtrees_write(h, rewind_to_output, rewind_to_kernel, sumtree_data) {
error!(LOGGER, "Failed to save sumtree archive: {:?}", e);
!e.is_bad_data()
} else {
info!(LOGGER, "Received valid sumtree data for {}.", h);
self.currently_syncing.store(true, Ordering::Relaxed);
true
}
}
}
impl NetToChainAdapter {
@ -302,7 +342,7 @@ impl NetToChainAdapter {
let res = self.chain.process_block(b, self.chain_opts());
if let Err(ref e) = res {
debug!(LOGGER, "Block {} refused by chain: {:?}", bhash, e);
if e.is_bad_block() {
if e.is_bad_data() {
debug!(LOGGER, "adapter: process_block: {} is a bad block, resetting head", bhash);
let _ = self.chain.reset_head();
return false;

View file

@ -142,6 +142,7 @@ impl Server {
p2p_server.peers.clone(),
shared_chain.clone(),
skip_sync_wait,
!config.archive_mode,
);
let p2p_inner = p2p_server.clone();
@ -206,6 +207,11 @@ impl Server {
self.chain.head().unwrap()
}
/// The head of the block header chain
pub fn header_head(&self) -> chain::Tip {
self.chain.get_header_head().unwrap()
}
/// Returns a set of stats about this server. This and the ServerStats
/// structure
/// can be updated over time to include any information needed by tests or

View file

@ -19,8 +19,9 @@ use std::sync::atomic::{AtomicBool, Ordering};
use time;
use chain;
use core::core::hash::{Hash, Hashed};
use core::core::hash::{Hash, Hashed, ZERO_HASH};
use core::core::target::Difficulty;
use core::global;
use p2p::{self, Peer, Peers, ChainAdapter};
use types::Error;
use util::LOGGER;
@ -31,6 +32,7 @@ pub fn run_sync(
peers: p2p::Peers,
chain: Arc<chain::Chain>,
skip_sync_wait: bool,
fast_sync: bool,
) {
let chain = chain.clone();
@ -39,18 +41,36 @@ pub fn run_sync(
.spawn(move || {
let mut prev_body_sync = time::now_utc();
let mut prev_header_sync = prev_body_sync.clone();
let mut prev_state_sync = prev_body_sync.clone() - time::Duration::seconds(5 * 60);
// initial sleep to give us time to peer with some nodes
if !skip_sync_wait {
thread::sleep(Duration::from_secs(30));
}
loop {
let syncing = needs_syncing(
currently_syncing.clone(), peers.clone(), chain.clone());
if syncing {
// fast sync has 3 states:
// * syncing headers
// * once all headers are sync'd, requesting the sumtree state
// * once we have the state, get blocks after that
//
// full sync gets rid of the middle step and just starts from
// the genesis state
let current_time = time::now_utc();
loop {
let horizon = global::cut_through_horizon() as u64;
let head = chain.head().unwrap();
let header_head = chain.get_header_head().unwrap();
// in archival nodes (no fast sync) we just consider we have the whole
// state already
let have_sumtrees = !fast_sync || head.height > 0 &&
header_head.height.saturating_sub(head.height) <= horizon;
let syncing = needs_syncing(
currently_syncing.clone(), peers.clone(), chain.clone(), !have_sumtrees);
let current_time = time::now_utc();
if syncing {
// run the header sync every 10s
if current_time - prev_header_sync > time::Duration::seconds(10) {
@ -62,7 +82,7 @@ pub fn run_sync(
}
// run the body_sync every 5s
if current_time - prev_body_sync > time::Duration::seconds(5) {
if have_sumtrees && current_time - prev_body_sync > time::Duration::seconds(5) {
body_sync(
peers.clone(),
chain.clone(),
@ -70,10 +90,29 @@ pub fn run_sync(
prev_body_sync = current_time;
}
thread::sleep(Duration::from_secs(1));
} else {
thread::sleep(Duration::from_secs(10));
}
} else if !have_sumtrees &&
current_time - prev_state_sync > time::Duration::seconds(5*60) {
if let Some(peer) = peers.most_work_peer() {
if let Ok(p) = peer.try_read() {
debug!(LOGGER, "Header head before sumtree request: {} / {}",
header_head.height, header_head.last_block_h);
// just to handle corner case of a too early start
if header_head.height > horizon {
// ask for sumtree at horizon
let mut sumtree_head = chain.get_block_header(&header_head.prev_block_h).unwrap();
for _ in 0..horizon-2 {
sumtree_head = chain.get_block_header(&sumtree_head.previous).unwrap();
}
p.send_sumtrees_request(sumtree_head.height, sumtree_head.hash());
prev_state_sync = current_time;
}
}
}
}
thread::sleep(Duration::from_secs(1));
}
});
}
@ -199,20 +238,29 @@ fn request_headers(
pub fn needs_syncing(
currently_syncing: Arc<AtomicBool>,
peers: Peers,
chain: Arc<chain::Chain>) -> bool {
chain: Arc<chain::Chain>,
header_only: bool) -> bool {
let local_diff = peers.total_difficulty();
let local_diff = if header_only {
chain.total_header_difficulty().unwrap()
} else {
chain.total_difficulty()
};
let peer = peers.most_work_peer();
// if we're already syncing, we're caught up if no peer has a higher
// difficulty than us
if currently_syncing.load(Ordering::Relaxed) {
if let Some(peer) = peer {
if let Ok(peer) = peer.try_read() {
debug!(LOGGER, "needs_syncing {} {} {}", local_diff, peer.info.total_difficulty, header_only);
if peer.info.total_difficulty <= local_diff {
info!(LOGGER, "synchronized at {:?} @ {:?}", local_diff, chain.head().unwrap().height);
currently_syncing.store(false, Ordering::Relaxed);
let _ = chain.reset_head();
if !header_only {
let _ = chain.reset_head();
}
}
}
} else {

View file

@ -116,6 +116,9 @@ pub struct ServerConfig {
#[serde(default)]
pub chain_type: ChainTypes,
/// Whether this node is a full archival node or a fast-sync, pruned node
pub archive_mode: bool,
/// Method used to get the list of seed nodes for initial bootstrap.
#[serde(default)]
pub seeding_type: Seeding,
@ -155,6 +158,7 @@ impl Default for ServerConfig {
p2p_config: p2p::P2PConfig::default(),
mining_config: Some(pow::types::MinerConfig::default()),
chain_type: ChainTypes::default(),
archive_mode: false,
pool_config: pool::PoolConfig::default(),
skip_sync_wait: Some(true),
}

View file

@ -247,7 +247,6 @@ fn simulate_full_sync() {
util::init_test_logger();
// we actually set the chain_type in the ServerConfig below
// TODO - avoid needing to set it in two places?
global::set_mining_mode(ChainTypes::AutomatedTesting);
let test_name_dir = "grin-sync";
@ -271,16 +270,56 @@ fn simulate_full_sync() {
let s1 = grin::Server::new(config(0, "grin-sync")).unwrap();
// mine a few blocks on server 1
s1.start_miner(miner_config);
thread::sleep(time::Duration::from_secs(5));
thread::sleep(time::Duration::from_secs(8));
let mut conf = config(1, "grin-sync");
conf.skip_sync_wait = Some(false);
let s2 = grin::Server::new(conf).unwrap();
while s2.head().height < 4 {
thread::sleep(time::Duration::from_millis(100));
}
}
/// Creates 2 different disconnected servers, mine a few blocks on one, connect
/// them and check that the 2nd gets all using fast sync algo
#[test]
fn simulate_fast_sync() {
util::init_test_logger();
// we actually set the chain_type in the ServerConfig below
global::set_mining_mode(ChainTypes::AutomatedTesting);
let test_name_dir = "grin-fast";
framework::clean_all_output(test_name_dir);
let mut plugin_config = pow::types::CuckooMinerPluginConfig::default();
let mut plugin_config_vec: Vec<pow::types::CuckooMinerPluginConfig> = Vec::new();
plugin_config.type_filter = String::from("mean_cpu");
plugin_config_vec.push(plugin_config);
let miner_config = pow::types::MinerConfig {
enable_mining: true,
burn_reward: true,
use_cuckoo_miner: false,
cuckoo_miner_async_mode: Some(false),
cuckoo_miner_plugin_dir: Some(String::from("../target/debug/deps")),
cuckoo_miner_plugin_config: Some(plugin_config_vec),
..Default::default()
};
let s1 = grin::Server::new(config(1000, "grin-fast")).unwrap();
// mine a few blocks on server 1
s1.start_miner(miner_config);
thread::sleep(time::Duration::from_secs(8));
let mut conf = config(1001, "grin-fast");
conf.archive_mode = false;
conf.seeds = Some(vec!["127.0.0.1:12000".to_string()]);
let s2 = grin::Server::new(conf).unwrap();
while s2.head().height != s2.header_head().height || s2.head().height < 20 {
thread::sleep(time::Duration::from_millis(1000));
}
}
fn config(n: u16, test_name_dir: &str) -> grin::ServerConfig {
grin::ServerConfig {
api_http_addr: format!("127.0.0.1:{}", 19000 + n),
@ -292,6 +331,8 @@ fn config(n: u16, test_name_dir: &str) -> grin::ServerConfig {
seeding_type: grin::Seeding::List,
seeds: Some(vec!["127.0.0.1:11000".to_string()]),
chain_type: core::global::ChainTypes::AutomatedTesting,
archive_mode: true,
skip_sync_wait: Some(true),
..Default::default()
}
}

View file

@ -20,7 +20,9 @@
//! forces us to go through some additional gymnastic to loop over the async
//! stream and make sure we get the right number of bytes out.
use std::io::{self, Write};
use std::cmp;
use std::fs::File;
use std::io::{self, Read, Write};
use std::sync::{Arc, Mutex, mpsc};
use std::net::TcpStream;
use std::thread;
@ -31,8 +33,10 @@ use msg::*;
use types::*;
use util::LOGGER;
/// A trait to be implemented in order to receive messages from the
/// connection. Allows providing an optional response.
pub trait MessageHandler: Send + 'static {
fn consume(&self, msg: &mut Message) -> Result<Option<(Vec<u8>, Type)>, Error>;
fn consume<'a>(&self, msg: Message<'a>) -> Result<Option<Response<'a>>, Error>;
}
// Macro to simplify the boilerplate around asyn I/O error handling,
@ -52,6 +56,8 @@ macro_rules! try_break {
}
}
/// A message as received by the connection. Provides access to the message
/// header lazily consumes the message body, handling its deserialization.
pub struct Message<'a> {
pub header: MsgHeader,
conn: &'a mut TcpStream,
@ -63,9 +69,67 @@ impl<'a> Message<'a> {
Message{header, conn}
}
/// Read the message body from the underlying connection
pub fn body<T>(&mut self) -> Result<T, Error> where T: ser::Readable {
read_body(&self.header, self.conn)
}
pub fn copy_attachment(&mut self, len: usize, writer: &mut Write) -> Result<(), Error> {
let mut written = 0;
while written < len {
let read_len = cmp::min(8000, len - written);
let mut buf = vec![0u8; read_len];
read_exact(&mut self.conn, &mut buf[..], 10000, true)?;
writer.write_all(&mut buf)?;
written += read_len;
}
Ok(())
}
/// Respond to the message with the provided message type and body
pub fn respond<T>(self, resp_type: Type, body: T) -> Response<'a>
where
T: ser::Writeable
{
let body = ser::ser_vec(&body).unwrap();
Response{
resp_type: resp_type,
body: body,
conn: self.conn,
attachment: None,
}
}
}
/// Response to a `Message`
pub struct Response<'a> {
resp_type: Type,
body: Vec<u8>,
conn: &'a mut TcpStream,
attachment: Option<File>,
}
impl<'a> Response<'a> {
fn write(mut self) -> Result<(), Error> {
let mut msg = ser::ser_vec(&MsgHeader::new(self.resp_type, self.body.len() as u64)).unwrap();
msg.append(&mut self.body);
write_all(&mut self.conn, &msg[..], 10000)?;
if let Some(mut file) = self.attachment {
let mut buf = [0u8; 8000];
loop {
match file.read(&mut buf[..]) {
Ok(0) => break,
Ok(n) => write_all(&mut self.conn, &buf[..n], 10000)?,
Err(e) => return Err(From::from(e)),
}
}
}
Ok(())
}
pub fn add_attachment(&mut self, file: File) {
self.attachment = Some(file);
}
}
// TODO count sent and received
@ -105,7 +169,7 @@ where
let (error_tx, error_rx) = mpsc::channel();
stream.set_nonblocking(true).expect("Non-blocking IO not available.");
poll(stream, handler, send_rx, send_tx.clone(), error_tx, close_rx);
poll(stream, handler, send_rx, error_tx, close_rx);
Tracker {
sent_bytes: Arc::new(Mutex::new(0)),
@ -120,7 +184,6 @@ fn poll<H>(
conn: TcpStream,
handler: H,
send_rx: mpsc::Receiver<Vec<u8>>,
send_tx: mpsc::Sender<Vec<u8>>,
error_tx: mpsc::Sender<Error>,
close_rx: mpsc::Receiver<()>
)
@ -137,10 +200,10 @@ where
loop {
// check the read end
if let Some(h) = try_break!(error_tx, read_header(conn)) {
let mut msg = Message::from_header(h, conn);
let msg = Message::from_header(h, conn);
debug!(LOGGER, "Received message header, type {:?}, len {}.", msg.header.msg_type, msg.header.msg_len);
if let Some(Some((body, typ))) = try_break!(error_tx, handler.consume(&mut msg)) {
respond(&send_tx, typ, body);
if let Some(Some(resp)) = try_break!(error_tx, handler.consume(msg)) {
try_break!(error_tx, resp.write());
}
}
@ -173,9 +236,3 @@ where
}
});
}
fn respond(send_tx: &mpsc::Sender<Vec<u8>>, msg_type: Type, mut body: Vec<u8>) {
let mut msg = ser::ser_vec(&MsgHeader::new(msg_type, body.len() as u64)).unwrap();
msg.append(&mut body);
send_tx.send(msg).unwrap();
}

View file

@ -52,6 +52,6 @@ mod types;
pub use serv::{Server, DummyAdapter};
pub use peers::Peers;
pub use peer::Peer;
pub use types::{Capabilities, Error, ChainAdapter, P2PConfig, PeerInfo, MAX_BLOCK_HEADERS,
MAX_PEER_ADDRS};
pub use types::{Capabilities, Error, ChainAdapter, SumtreesRead, P2PConfig,
PeerInfo, MAX_BLOCK_HEADERS, MAX_PEER_ADDRS};
pub use store::{PeerData, State};

View file

@ -65,6 +65,8 @@ enum_from_primitive! {
GetCompactBlock,
CompactBlock,
Transaction,
SumtreesRequest,
SumtreesArchive
}
}
@ -120,6 +122,34 @@ pub fn read_exact(
Ok(())
}
/// Same as `read_exact` but for writing.
pub fn write_all(conn: &mut Write, mut buf: &[u8], timeout: u32) -> io::Result<()> {
let sleep_time = time::Duration::from_millis(1);
let mut count = 0;
while !buf.is_empty() {
match conn.write(buf) {
Ok(0) => return Err(io::Error::new(io::ErrorKind::WriteZero,
"failed to write whole buffer")),
Ok(n) => buf = &buf[n..],
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
Err(e) => return Err(e),
}
if !buf.is_empty() {
thread::sleep(sleep_time);
count += 1;
} else {
break;
}
if count > timeout {
return Err(io::Error::new(io::ErrorKind::TimedOut, "reading from tcp stream"));
}
}
Ok(())
}
/// Read a header from the provided connection without blocking if the
/// underlying stream is async. Typically headers will be polled for, so
/// we do not want to block.
@ -571,7 +601,7 @@ impl Readable for Ping {
Ok(h) => h,
Err(_) => 0,
};
Ok(Ping { total_difficulty, height })
Ok(Ping { total_difficulty, height })
}
}
@ -605,3 +635,65 @@ impl Readable for Pong {
Ok(Pong { total_difficulty, height })
}
}
/// Request to get an archive of the full sumtree store, required to sync
/// a new node.
pub struct SumtreesRequest {
/// Hash of the block for which the sumtrees should be provided
pub hash: Hash,
/// Height of the corresponding block
pub height: u64
}
impl Writeable for SumtreesRequest {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
self.hash.write(writer)?;
writer.write_u64(self.height)?;
Ok(())
}
}
impl Readable for SumtreesRequest {
fn read(reader: &mut Reader) -> Result<SumtreesRequest, ser::Error> {
Ok(SumtreesRequest {
hash: Hash::read(reader)?,
height: reader.read_u64()?,
})
}
}
/// Response to a sumtree archive request, must include a zip stream of the
/// archive after the message body.
pub struct SumtreesArchive {
/// Hash of the block for which the sumtrees are provided
pub hash: Hash,
/// Height of the corresponding block
pub height: u64,
/// Output tree index the receiver should rewind to
pub rewind_to_output: u64,
/// Kernel tree index the receiver should rewind to
pub rewind_to_kernel: u64,
/// Size in bytes of the archive
pub bytes: u64,
}
impl Writeable for SumtreesArchive {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
self.hash.write(writer)?;
ser_multiwrite!(writer, [write_u64, self.height],
[write_u64, self.rewind_to_output],
[write_u64, self.rewind_to_kernel],
[write_u64, self.bytes]);
Ok(())
}
}
impl Readable for SumtreesArchive {
fn read(reader: &mut Reader) -> Result<SumtreesArchive, ser::Error> {
let hash = Hash::read(reader)?;
let (height, rewind_to_output, rewind_to_kernel, bytes) =
ser_multiread!(reader, read_u64, read_u64, read_u64, read_u64);
Ok(SumtreesArchive {hash, height, rewind_to_output, rewind_to_kernel, bytes})
}
}

View file

@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fs::File;
use std::net::{SocketAddr, TcpStream};
use std::sync::{Arc, RwLock};
@ -230,6 +231,13 @@ impl Peer {
msg::Type::GetPeerAddrs)
}
pub fn send_sumtrees_request(&self, height: u64, hash: Hash) -> Result<(), Error> {
debug!(LOGGER, "Asking {} for sumtree archive at {} {}.",
self.info.addr, height, hash);
self.connection.as_ref().unwrap().send(
&SumtreesRequest {hash, height }, msg::Type::SumtreesRequest)
}
/// Stops the peer, closing its connection
pub fn stop(&self) {
let _ = self.connection.as_ref().unwrap().close_channel.send(());
@ -326,6 +334,17 @@ impl ChainAdapter for TrackingAdapter {
fn get_block(&self, h: Hash) -> Option<core::Block> {
self.adapter.get_block(h)
}
fn sumtrees_read(&self, h: Hash) -> Option<SumtreesRead> {
self.adapter.sumtrees_read(h)
}
fn sumtrees_write(&self, h: Hash,
rewind_to_output: u64, rewind_to_kernel: u64,
sumtree_data: File, peer_addr: SocketAddr) -> bool {
self.adapter.sumtrees_write(h, rewind_to_output, rewind_to_kernel,
sumtree_data, peer_addr)
}
}
impl NetAdapter for TrackingAdapter {

View file

@ -13,6 +13,7 @@
// limitations under the License.
use std::collections::HashMap;
use std::fs::File;
use std::net::SocketAddr;
use std::sync::{Arc, RwLock};
@ -482,6 +483,25 @@ impl ChainAdapter for Peers {
fn get_block(&self, h: Hash) -> Option<core::Block> {
self.adapter.get_block(h)
}
fn sumtrees_read(&self, h: Hash) -> Option<SumtreesRead> {
self.adapter.sumtrees_read(h)
}
fn sumtrees_write(
&self,
h: Hash,
rewind_to_output: u64,
rewind_to_kernel: u64,
sumtree_data: File,
peer_addr: SocketAddr,
) -> bool {
if !self.adapter.sumtrees_write(h, rewind_to_output, rewind_to_kernel,
sumtree_data, peer_addr) {
self.ban_peer(&peer_addr);
false
} else {
true
}
}
}
impl NetAdapter for Peers {

View file

@ -12,12 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use std::env;
use std::fs::File;
use std::net::SocketAddr;
use std::sync::Arc;
use core::core;
use core::core::hash::{Hash, Hashed};
use core::ser;
use conn::*;
use msg::*;
use rand;
@ -37,7 +38,7 @@ impl Protocol {
}
impl MessageHandler for Protocol {
fn consume(&self, msg: &mut Message) -> Result<Option<(Vec<u8>, Type)>, Error> {
fn consume<'a>(&self, mut msg: Message<'a>) -> Result<Option<Response<'a>>, Error> {
let adapter = &self.adapter;
match msg.header.msg_type {
@ -46,13 +47,14 @@ impl MessageHandler for Protocol {
let ping: Ping = msg.body()?;
adapter.peer_difficulty(self.addr, ping.total_difficulty, ping.height);
let pong_bytes = ser::ser_vec(
&Pong {
total_difficulty: adapter.total_difficulty(),
height: adapter.total_height(),
}).unwrap();
Ok(Some((pong_bytes, Type::Pong)))
Ok(Some(
msg.respond(
Type::Pong,
Pong {
total_difficulty: adapter.total_difficulty(),
height: adapter.total_height(),
})
))
}
Type::Pong => {
@ -73,8 +75,7 @@ impl MessageHandler for Protocol {
let bo = adapter.get_block(h);
if let Some(b) = bo {
let block_bytes = ser::ser_vec(&b).unwrap();
return Ok(Some((block_bytes, Type::Block)));
return Ok(Some(msg.respond(Type::Block, b)));
}
Ok(None)
}
@ -111,11 +112,9 @@ impl MessageHandler for Protocol {
"handle_payload: GetCompactBlock: empty block, sending full block",
);
let block_bytes = ser::ser_vec(&b).unwrap();
Ok(Some((block_bytes, Type::Block)))
Ok(Some(msg.respond(Type::Block, b)))
} else {
let compact_block_bytes = ser::ser_vec(&cb).unwrap();
Ok(Some((compact_block_bytes, Type::CompactBlock)))
Ok(Some(msg.respond(Type::CompactBlock, cb)))
}
} else {
Ok(None)
@ -137,8 +136,7 @@ impl MessageHandler for Protocol {
let headers = adapter.locate_headers(loc.hashes);
// serialize and send all the headers over
let header_bytes = ser::ser_vec(&Headers { headers: headers }).unwrap();
return Ok(Some((header_bytes, Type::Headers)));
Ok(Some(msg.respond(Type::Headers, Headers { headers: headers })))
}
// "header first" block propagation - if we have not yet seen this block
@ -162,11 +160,13 @@ impl MessageHandler for Protocol {
Type::GetPeerAddrs => {
let get_peers: GetPeerAddrs = msg.body()?;
let peer_addrs = adapter.find_peer_addrs(get_peers.capabilities);
let peer_addrs_bytes = ser::ser_vec(
&PeerAddrs {
peers: peer_addrs.iter().map(|sa| SockAddr(*sa)).collect(),
}).unwrap();
return Ok(Some((peer_addrs_bytes, Type::PeerAddrs)));
Ok(Some(
msg.respond(
Type::PeerAddrs,
PeerAddrs {
peers: peer_addrs.iter().map(|sa| SockAddr(*sa)).collect(),
})
))
}
Type::PeerAddrs => {
@ -175,6 +175,52 @@ impl MessageHandler for Protocol {
Ok(None)
}
Type::SumtreesRequest => {
let sm_req: SumtreesRequest = msg.body()?;
debug!(LOGGER, "handle_payload: sumtree req for {} at {}",
sm_req.hash, sm_req.height);
let sumtrees = self.adapter.sumtrees_read(sm_req.hash);
if let Some(sumtrees) = sumtrees {
let file_sz = sumtrees.reader.metadata()?.len();
let mut resp = msg.respond(
Type::SumtreesArchive,
&SumtreesArchive {
height: sm_req.height as u64,
hash: sm_req.hash,
rewind_to_output: sumtrees.output_index,
rewind_to_kernel: sumtrees.kernel_index,
bytes: file_sz,
});
resp.add_attachment(sumtrees.reader);
Ok(Some(resp))
} else {
Ok(None)
}
}
Type::SumtreesArchive => {
let sm_arch: SumtreesArchive = msg.body()?;
debug!(LOGGER, "handle_payload: sumtree archive for {} at {} rewind to {}/{}",
sm_arch.hash, sm_arch.height,
sm_arch.rewind_to_output, sm_arch.rewind_to_kernel);
let mut tmp = env::temp_dir();
tmp.push("sumtree.zip");
{
let mut tmp_zip = File::create(tmp.clone())?;
msg.copy_attachment(sm_arch.bytes as usize, &mut tmp_zip)?;
tmp_zip.sync_all()?;
}
let tmp_zip = File::open(tmp)?;
self.adapter.sumtrees_write(
sm_arch.hash, sm_arch.rewind_to_output,
sm_arch.rewind_to_kernel, tmp_zip, self.addr);
Ok(None)
}
_ => {
debug!(LOGGER, "unknown message type {:?}", msg.header.msg_type);
Ok(None)

View file

@ -12,8 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::{Arc, RwLock};
use std::fs::File;
use std::net::{TcpListener, TcpStream, SocketAddr, Shutdown};
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::Duration;
@ -194,6 +195,15 @@ impl ChainAdapter for DummyAdapter {
fn get_block(&self, _: Hash) -> Option<core::Block> {
None
}
fn sumtrees_read(&self, _h: Hash) -> Option<SumtreesRead> {
unimplemented!()
}
fn sumtrees_write(&self, _h: Hash,
_rewind_to_output: u64, _rewind_to_kernel: u64,
_sumtree_data: File, _peer_addr: SocketAddr) -> bool {
false
}
}
impl NetAdapter for DummyAdapter {

View file

@ -13,6 +13,7 @@
// limitations under the License.
use std::convert::From;
use std::fs::File;
use std::io;
use std::net::{IpAddr, SocketAddr};
use std::sync::mpsc;
@ -132,6 +133,17 @@ pub struct PeerInfo {
pub total_difficulty: Difficulty,
}
/// The full sumtree data along with indexes required for a consumer to
/// rewind to a consistant requested state.
pub struct SumtreesRead {
/// Output tree index the receiver should rewind to
pub output_index: u64,
/// Kernel tree index the receiver should rewind to
pub kernel_index: u64,
/// Binary stream for the sumtree zipped data
pub reader: File,
}
/// Bridge between the networking layer and the rest of the system. Handles the
/// forwarding or querying of blocks and transactions from the network among
/// other things.
@ -167,6 +179,19 @@ pub trait ChainAdapter: Sync + Send {
/// Gets a full block by its hash.
fn get_block(&self, h: Hash) -> Option<core::Block>;
/// Provides a reading view into the current sumtree state as well as
/// the required indexes for a consumer to rewind to a consistant state
/// at the provided block hash.
fn sumtrees_read(&self, h: Hash) -> Option<SumtreesRead>;
/// Writes a reading view on a sumtree state that's been provided to us.
/// If we're willing to accept that new state, the data stream will be
/// read as a zip file, unzipped and the resulting state files should be
/// rewound to the provided indexes.
fn sumtrees_write(&self, h: Hash,
rewind_to_output: u64, rewind_to_kernel: u64,
sumtree_data: File, peer_addr: SocketAddr) -> bool;
}
/// Additional methods required by the protocol that don't need to be

View file

@ -17,11 +17,12 @@
use std::sync::Arc;
use std::collections::{HashMap, HashSet};
use core::core::transaction;
use core::core::OutputIdentifier;
use core::core::{block, hash};
use util::secp::pedersen::Commitment;
use core::core::transaction;
use core::core::{block, hash, OutputIdentifier};
use core::global;
use types::*;
pub use graph;

View file

@ -18,6 +18,7 @@ use memmap;
use std::cmp;
use std::fs::{self, File, OpenOptions};
use std::io::{self, BufRead, BufReader, ErrorKind, Write};
use std::marker::PhantomData;
use std::os::unix::io::AsRawFd;
use std::path::Path;
use std::io::Read;
@ -27,7 +28,7 @@ use libc::{ftruncate64, off64_t};
#[cfg(not(any(target_os = "linux", target_os = "android")))]
use libc::{ftruncate as ftruncate64, off_t as off64_t};
use core::core::pmmr::{self, Backend, HashSum, Summable, VecBackend};
use core::core::pmmr::{self, Backend, HashSum, Summable};
use core::ser;
use util::LOGGER;
@ -46,15 +47,18 @@ pub const RM_LOG_MAX_NODES: usize = 10000;
/// Despite being append-only, the file can still be pruned and truncated. The
/// former simply happens by rewriting it, ignoring some of the data. The
/// latter by truncating the underlying file and re-creating the mmap.
struct AppendOnlyFile {
pub struct AppendOnlyFile {
path: String,
file: File,
mmap: Option<memmap::Mmap>,
buffer_start: usize,
buffer: Vec<u8>,
buffer_start_bak: usize,
}
impl AppendOnlyFile {
/// Open a file (existing or not) as append-only, backed by a mmap.
fn open(path: String) -> io::Result<AppendOnlyFile> {
pub fn open(path: String) -> io::Result<AppendOnlyFile> {
let file = OpenOptions::new()
.read(true)
.append(true)
@ -64,31 +68,68 @@ impl AppendOnlyFile {
path: path.clone(),
file: file,
mmap: None,
buffer_start: 0,
buffer: vec![],
buffer_start_bak: 0,
};
if let Ok(sz) = aof.size() {
if sz > 0 {
aof.sync()?;
aof.buffer_start = sz as usize;
aof.mmap = Some(unsafe { memmap::Mmap::map(&aof.file)? });
}
}
Ok(aof)
}
/// Append data to the file.
fn append(&mut self, buf: &[u8]) -> io::Result<()> {
self.file.write_all(buf)
/// Append data to the file. Until the append-only file is synced, data is
/// only written to memory.
pub fn append(&mut self, buf: &mut Vec<u8>) {
self.buffer.append(buf);
}
/// Rewinds the data file back to a lower position. The new position needs
/// to be the one of the first byte the next time data is appended.
pub fn rewind(&mut self, pos: u64) {
if self.buffer_start_bak > 0 || self.buffer.len() > 0 {
panic!("Can't rewind on a dirty state.");
}
self.buffer_start_bak = self.buffer_start;
self.buffer_start = pos as usize;
}
/// Syncs all writes (fsync), reallocating the memory map to make the newly
/// written data accessible.
fn sync(&mut self) -> io::Result<()> {
pub fn flush(&mut self) -> io::Result<()> {
if self.buffer_start_bak > 0 {
// flushing a rewound state, we need to truncate before applying
self.truncate(self.buffer_start)?;
self.buffer_start_bak = 0;
}
self.buffer_start += self.buffer.len();
self.file.write(&self.buffer[..])?;
self.file.sync_data()?;
self.buffer = vec![];
self.mmap = Some(unsafe { memmap::Mmap::map(&self.file)? });
Ok(())
}
/// Discard the current non-flushed data.
pub fn discard(&mut self) {
if self.buffer_start_bak > 0 {
// discarding a rewound state, restore the buffer start
self.buffer_start = self.buffer_start_bak;
self.buffer_start_bak = 0;
}
self.buffer = vec![];
}
/// Read length bytes of data at offset from the file. Leverages the memory
/// map.
fn read(&self, offset: usize, length: usize) -> Vec<u8> {
if offset >= self.buffer_start {
let offset = offset - self.buffer_start;
return self.buffer[offset..(offset+length)].to_vec();
}
if let None = self.mmap {
return vec![];
}
@ -96,6 +137,17 @@ impl AppendOnlyFile {
(&mmap[offset..(offset + length)]).to_vec()
}
/// Truncates the underlying file to the provided offset
fn truncate(&self, offs: usize) -> io::Result<()> {
let fd = self.file.as_raw_fd();
let res = unsafe { ftruncate64(fd, offs as off64_t) };
if res == -1 {
Err(io::Error::last_os_error())
} else {
Ok(())
}
}
/// Saves a copy of the current file content, skipping data at the provided
/// prune indices. The prune Vec must be ordered.
fn save_prune(&self, target: String, prune_offs: Vec<u64>, prune_len: u64) -> io::Result<()> {
@ -116,7 +168,7 @@ impl AppendOnlyFile {
} as u64;
// write the buffer, except if we prune offsets in the current span,
// in which case we skip
// in which case we skip
let mut buf_start = 0;
while prune_offs[prune_pos] >= read && prune_offs[prune_pos] < read + len {
let prune_at = prune_offs[prune_pos] as usize;
@ -135,21 +187,15 @@ impl AppendOnlyFile {
}
}
/// Truncates the underlying file to the provided offset
fn truncate(&self, offs: u64) -> io::Result<()> {
let fd = self.file.as_raw_fd();
let res = unsafe { ftruncate64(fd, offs as off64_t) };
if res == -1 {
Err(io::Error::last_os_error())
} else {
Ok(())
}
}
/// Current size of the file in bytes.
fn size(&self) -> io::Result<u64> {
fs::metadata(&self.path).map(|md| md.len())
}
/// Path of the underlying file
pub fn path(&self) -> String {
self.path.clone()
}
}
/// Log file fully cached in memory containing all positions that should be
@ -181,11 +227,12 @@ impl RemoveLog {
}
/// Truncate and empties the remove log.
fn truncate(&mut self, last_offs: u32) -> io::Result<()> {
fn rewind(&mut self, last_offs: u32) -> io::Result<()> {
// simplifying assumption: we always remove older than what's in tmp
self.removed_tmp = vec![];
// DEBUG
let _ = self.flush_truncate(last_offs);
// backing it up before truncating
self.removed_bak = self.removed.clone();
// backing it up before truncating
self.removed_bak = self.removed.clone();
@ -202,15 +249,6 @@ impl RemoveLog {
Ok(())
}
// DEBUG: saves the remove log to the side before each truncate
fn flush_truncate(&mut self, last_offs: u32) -> io::Result<()> {
let mut file = File::create(format!("{}.{}", self.path.clone(), last_offs))?;
for elmt in &self.removed {
file.write_all(&ser::ser_vec(&elmt).unwrap()[..])?;
}
file.sync_data()
}
/// Append a set of new positions to the remove log. Both adds those
/// positions the ordered in-memory set and to the file.
fn append(&mut self, elmts: Vec<u64>, index: u32) -> io::Result<()> {
@ -291,12 +329,7 @@ where
hashsum_file: AppendOnlyFile,
remove_log: RemoveLog,
pruned_nodes: pmmr::PruneList,
// buffers addition of new elements until they're fully written to disk
buffer: VecBackend<T>,
buffer_index: usize,
// whether a rewind occurred since last flush, the rewind position, index
// and buffer index are captured
rewind: Option<(u64, u32, usize)>,
phantom: PhantomData<T>,
}
impl<T> Backend<T> for PMMRBackend<T>
@ -306,25 +339,19 @@ where
/// Append the provided HashSums to the backend storage.
#[allow(unused_variables)]
fn append(&mut self, position: u64, data: Vec<HashSum<T>>) -> Result<(), String> {
self.buffer
.append(position - (self.buffer_index as u64), data.clone())?;
for d in data {
self.hashsum_file.append(&mut ser::ser_vec(&d).unwrap());
}
Ok(())
}
/// Get a HashSum by insertion position
fn get(&self, position: u64) -> Option<HashSum<T>> {
// First, check if it's in our temporary write buffer
let pos_sz = position as usize;
if pos_sz > self.buffer_index && pos_sz - 1 < self.buffer_index + self.buffer.len() {
return self.buffer.get((pos_sz - self.buffer_index) as u64);
}
// Second, check if this position has been pruned in the remove log
// Check if this position has been pruned in the remove log or the
// pruned list
if self.remove_log.includes(position) {
return None;
}
// Third, check if it's in the pruned list or its offset
let shift = self.pruned_nodes.get_shift(position);
if let None = shift {
return None;
@ -351,26 +378,19 @@ where
}
fn rewind(&mut self, position: u64, index: u32) -> Result<(), String> {
assert!(self.buffer.len() == 0, "Rewind on non empty buffer.");
self.remove_log
.truncate(index)
.rewind(index)
.map_err(|e| format!("Could not truncate remove log: {}", e))?;
self.rewind = Some((position, index, self.buffer_index));
self.buffer_index = position as usize;
let shift = self.pruned_nodes.get_shift(position).unwrap_or(0);
let record_len = 32 + T::sum_len();
let file_pos = (position - shift) * (record_len as u64);
self.hashsum_file.rewind(file_pos);
Ok(())
}
/// Remove HashSums by insertion position
fn remove(&mut self, positions: Vec<u64>, index: u32) -> Result<(), String> {
if self.buffer.used_size() > 0 {
for position in &positions {
let pos_sz = *position as usize;
if pos_sz > self.buffer_index && pos_sz - 1 < self.buffer_index + self.buffer.len()
{
self.buffer.remove(vec![*position - self.buffer_index as u64], index).unwrap();
}
}
}
self.remove_log.append(positions, index).map_err(|e| {
format!("Could not write to log storage, disk full? {:?}", e)
})
@ -385,8 +405,6 @@ where
/// store its files.
pub fn new(data_dir: String) -> io::Result<PMMRBackend<T>> {
let hs_file = AppendOnlyFile::open(format!("{}/{}", data_dir, PMMR_DATA_FILE))?;
let sz = hs_file.size()?;
let record_len = 32 + T::sum_len();
let rm_log = RemoveLog::open(format!("{}/{}", data_dir, PMMR_RM_LOG_FILE))?;
let prune_list = read_ordered_vec(format!("{}/{}", data_dir, PMMR_PRUNED_FILE), 8)?;
@ -394,12 +412,10 @@ where
data_dir: data_dir,
hashsum_file: hs_file,
remove_log: rm_log,
buffer: VecBackend::new(),
buffer_index: (sz as usize) / record_len,
pruned_nodes: pmmr::PruneList {
pruned_nodes: prune_list,
},
rewind: None,
phantom: PhantomData,
})
}
@ -415,45 +431,21 @@ where
/// Syncs all files to disk. A call to sync is required to ensure all the
/// data has been successfully written to disk.
pub fn sync(&mut self) -> io::Result<()> {
// truncating the storage file if a rewind occurred
let record_len = 32 + T::sum_len() as u64;
if let Some((pos, _, _)) = self.rewind {
self.hashsum_file.truncate(pos * record_len)?;
}
for elem in &self.buffer.elems {
let res = if let Some(ref hs) = *elem {
self.hashsum_file.append(&ser::ser_vec(&hs).unwrap()[..])
} else {
// the element has alredy been pruned in the buffer, we just insert
// zeros until compaction to avoid wrong hashum store offsets
self.hashsum_file.append(&vec![0; record_len as usize])
};
if let Err(e) = res {
return Err(io::Error::new(
if let Err(e) = self.hashsum_file.flush() {
return Err(io::Error::new(
io::ErrorKind::Interrupted,
format!("Could not write to log storage, disk full? {:?}", e),
));
}
));
}
self.buffer_index = self.buffer_index + self.buffer.len();
self.buffer.clear();
self.remove_log.flush()?;
self.hashsum_file.sync()?;
self.rewind = None;
Ok(())
}
/// Discard the current, non synced state of the backend.
pub fn discard(&mut self) {
if let Some((_, _, bi)) = self.rewind {
self.buffer_index = bi;
}
self.buffer.clear();
self.hashsum_file.discard();
self.remove_log.discard();
self.rewind = None;
}
/// Checks the length of the remove log to see if it should get compacted.
@ -475,7 +467,7 @@ where
}
// 0. validate none of the nodes in the rm log are in the prune list (to
// avoid accidental double compaction)
// avoid accidental double compaction)
for pos in &self.remove_log.removed[..] {
if let None = self.pruned_nodes.pruned_pos(pos.0) {
// TODO we likely can recover from this by directly jumping to 3
@ -489,7 +481,7 @@ where
}
// 1. save hashsum file to a compact copy, skipping data that's in the
// remove list
// remove list
let tmp_prune_file = format!("{}/{}.prune", self.data_dir, PMMR_DATA_FILE);
let record_len = (32 + T::sum_len()) as u64;
let to_rm = self.remove_log
@ -518,10 +510,9 @@ where
format!("{}/{}", self.data_dir, PMMR_DATA_FILE),
)?;
self.hashsum_file = AppendOnlyFile::open(format!("{}/{}", self.data_dir, PMMR_DATA_FILE))?;
self.hashsum_file.sync()?;
// 4. truncate the rm log
self.remove_log.truncate(0)?;
self.remove_log.rewind(0)?;
self.remove_log.flush()?;
Ok(())

View file

@ -16,3 +16,5 @@ serde = "~1.0.8"
serde_derive = "~1.0.8"
secp256k1zkp = { git = "https://github.com/mimblewimble/rust-secp256k1-zkp", tag="grin_integration_7" }
#secp256k1zkp = { path = "../../rust-secp256k1-zkp" }
walkdir = "^2.0.1"
zip = "^0.2.6"

View file

@ -34,6 +34,8 @@ extern crate lazy_static;
extern crate serde;
#[macro_use]
extern crate serde_derive;
extern crate walkdir;
extern crate zip as zip_rs;
// Re-export so only has to be included once
pub extern crate secp256k1zkp as secp_;
@ -59,6 +61,9 @@ use byteorder::{BigEndian, ByteOrder};
mod hex;
pub use hex::*;
/// Compress and decompress zip bz2 archives
pub mod zip;
/// Encapsulation of a RefCell<Option<T>> for one-time initialization after
/// construction. This implementation will purposefully fail hard if not used
/// properly, for example if it's not initialized before being first used

94
util/src/zip.rs Normal file
View file

@ -0,0 +1,94 @@
// Copyright 2017 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/// Wrappers around the `zip-rs` library to compress and decompress zip
/// bzip2 archives.
use std::io;
use std::path::Path;
use std::fs::{self, File};
use walkdir::WalkDir;
use zip_rs;
use zip_rs::result::{ZipResult, ZipError};
use zip_rs::write::FileOptions;
/// Compress a source directory recursively into a zip file using the
/// bzip2 format. Permissions are set to 644 by default to avoid any
/// unwanted execution bits.
pub fn compress(src_dir: &Path, dst_file: &File) -> ZipResult<()> {
if !Path::new(src_dir).is_dir() {
return Err(ZipError::Io(
io::Error::new(io::ErrorKind::Other, "Source must be a directory.")));
}
let options = FileOptions::default()
.compression_method(zip_rs::CompressionMethod::Bzip2)
.unix_permissions(0o644);
let mut zip = zip_rs::ZipWriter::new(dst_file);
let walkdir = WalkDir::new(src_dir.to_str().unwrap());
let it = walkdir.into_iter();
for dent in it.filter_map(|e| e.ok()) {
let path = dent.path();
let name = path.strip_prefix(Path::new(src_dir))
.unwrap()
.to_str()
.unwrap();
if path.is_file() {
zip.start_file(name, options)?;
let mut f = File::open(path)?;
io::copy(&mut f, &mut zip)?;
}
}
zip.finish()?;
dst_file.sync_all()?;
Ok(())
}
/// Decompress a source file into the provided destination path.
pub fn decompress<R>(src_file: R, dest: &Path) -> ZipResult<()> where R: io::Read + io::Seek {
let mut archive = zip_rs::ZipArchive::new(src_file)?;
for i in 0..archive.len() {
let mut file = archive.by_index(i)?;
let file_path = dest.join(file.name());
if (&*file.name()).ends_with('/') {
fs::create_dir_all(&file_path)?;
} else {
if let Some(p) = file_path.parent() {
if !p.exists() {
fs::create_dir_all(&p)?;
}
}
let mut outfile = fs::File::create(&file_path)?;
io::copy(&mut file, &mut outfile)?;
}
// Get and Set permissions
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
if let Some(mode) = file.unix_mode() {
fs::set_permissions(&file_path.to_str().unwrap(), PermissionsExt::from_mode(mode))?;
}
}
}
Ok(())
}