The Header MMR (One MMR To Rule Them All) (#1716)

* header MMR in use within txhashset itself
works with fast sync
not yet in place for initial header sync

* add the (currently unused) sync_head mmr

* use sync MMR during fast sync
rebuild header MMR after we validate full txhashset after download

* support missing header MMR (rebuild as necessary) for legacy nodes

* rustfmt

* comments/docs

* rustfmt

* cleanup DBBackend

* cleanup DBBackend

* cleanup

* rename to HashOnly

* rustfmt

* cleanup backend.append()

* simply pmmr append api
no need to pass position when appending

* cleanup

* simplify vec_backend to match simpler append api

* rustfmt

* docs/comments

* rustfmt

* cleanup
This commit is contained in:
Antioch Peverell 2018-10-15 17:16:34 +01:00 committed by GitHub
parent 6f29685daf
commit 8b7a20f8b0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 978 additions and 237 deletions

View file

@ -97,9 +97,9 @@ impl TxHashSet {
pub fn from_head(head: Arc<chain::Chain>) -> TxHashSet {
let roots = head.get_txhashset_roots();
TxHashSet {
output_root_hash: roots.0.to_hex(),
range_proof_root_hash: roots.1.to_hex(),
kernel_root_hash: roots.2.to_hex(),
output_root_hash: roots.output_root.to_hex(),
range_proof_root_hash: roots.rproof_root.to_hex(),
kernel_root_hash: roots.kernel_root.to_hex(),
}
}
}

View file

@ -35,7 +35,7 @@ use grin_store::Error::NotFoundErr;
use pipe;
use store;
use txhashset;
use types::{ChainAdapter, NoStatus, Options, Tip, TxHashsetWriteStatus};
use types::{ChainAdapter, NoStatus, Options, Tip, TxHashSetRoots, TxHashsetWriteStatus};
use util::secp::pedersen::{Commitment, RangeProof};
use util::LOGGER;
@ -153,6 +153,7 @@ pub struct Chain {
// POW verification function
pow_verifier: fn(&BlockHeader, u8) -> Result<(), pow::Error>,
archive_mode: bool,
genesis: BlockHeader,
}
unsafe impl Sync for Chain {}
@ -178,7 +179,7 @@ impl Chain {
// open the txhashset, creating a new one if necessary
let mut txhashset = txhashset::TxHashSet::open(db_root.clone(), store.clone(), None)?;
setup_head(genesis, store.clone(), &mut txhashset)?;
setup_head(genesis.clone(), store.clone(), &mut txhashset)?;
let head = store.head()?;
debug!(
@ -199,6 +200,7 @@ impl Chain {
verifier_cache,
block_hashes_cache: Arc::new(RwLock::new(LruCache::new(HASHES_CACHE_SIZE))),
archive_mode,
genesis: genesis.header.clone(),
})
}
@ -246,8 +248,7 @@ impl Chain {
Ok(head)
}
Err(e) => {
match e.kind() {
Err(e) => match e.kind() {
ErrorKind::Orphan => {
let block_hash = b.hash();
let orphan = Orphan {
@ -292,8 +293,7 @@ impl Chain {
add_to_hash_cache(b.hash());
Err(ErrorKind::Other(format!("{:?}", e).to_owned()).into())
}
}
}
},
}
}
@ -494,11 +494,15 @@ impl Chain {
Ok((extension.roots(), extension.sizes()))
})?;
// Carefully destructure these correctly...
// TODO - Maybe sizes should be a struct to add some type safety here...
let (_, output_mmr_size, _, kernel_mmr_size) = sizes;
b.header.output_root = roots.output_root;
b.header.range_proof_root = roots.rproof_root;
b.header.kernel_root = roots.kernel_root;
b.header.output_mmr_size = sizes.0;
b.header.kernel_mmr_size = sizes.2;
b.header.output_mmr_size = output_mmr_size;
b.header.kernel_mmr_size = kernel_mmr_size;
Ok(())
}
@ -526,7 +530,7 @@ impl Chain {
}
/// Returns current txhashset roots
pub fn get_txhashset_roots(&self) -> (Hash, Hash, Hash) {
pub fn get_txhashset_roots(&self) -> TxHashSetRoots {
let mut txhashset = self.txhashset.write().unwrap();
txhashset.roots()
}
@ -594,6 +598,40 @@ impl Chain {
Ok(())
}
/// Rebuild the sync MMR based on current header_head.
/// We rebuild the sync MMR when first entering sync mode so ensure we
/// have an MMR we can safely rewind based on the headers received from a peer.
/// TODO - think about how to optimize this.
pub fn rebuild_sync_mmr(&self, head: &Tip) -> Result<(), Error> {
let mut txhashset = self.txhashset.write().unwrap();
let mut batch = self.store.batch()?;
txhashset::sync_extending(&mut txhashset, &mut batch, |extension| {
extension.rebuild(head, &self.genesis)?;
Ok(())
})?;
batch.commit()?;
Ok(())
}
/// Rebuild the header MMR based on current header_head.
/// We rebuild the header MMR after receiving a txhashset from a peer.
/// The txhashset contains output, rangeproof and kernel MMRs but we construct
/// the header MMR locally based on headers from our db.
/// TODO - think about how to optimize this.
fn rebuild_header_mmr(
&self,
head: &Tip,
txhashset: &mut txhashset::TxHashSet,
) -> Result<(), Error> {
let mut batch = self.store.batch()?;
txhashset::header_extending(txhashset, &mut batch, |extension| {
extension.rebuild(head, &self.genesis)?;
Ok(())
})?;
batch.commit()?;
Ok(())
}
/// Writes a reading view on a txhashset state that's been provided to us.
/// If we're willing to accept that new state, the data stream will be
/// read as a zip file, unzipped and the resulting state files should be
@ -621,6 +659,10 @@ impl Chain {
let mut txhashset =
txhashset::TxHashSet::open(self.db_root.clone(), self.store.clone(), Some(&header))?;
// The txhashset.zip contains the output, rangeproof and kernel MMRs.
// We must rebuild the header MMR ourselves based on the headers in our db.
self.rebuild_header_mmr(&Tip::from_block(&header), &mut txhashset)?;
// Validate the full kernel history (kernel MMR root for every block header).
self.validate_kernel_history(&header, &txhashset)?;
@ -983,6 +1025,15 @@ fn setup_head(
// to match the provided block header.
let header = batch.get_block_header(&head.last_block_h)?;
// If we have no header MMR then rebuild as necessary.
// Supports old nodes with no header MMR.
txhashset::header_extending(txhashset, &mut batch, |extension| {
if extension.size() == 0 {
extension.rebuild(&head, &genesis.header)?;
}
Ok(())
})?;
let res = txhashset::extending(txhashset, &mut batch, |extension| {
extension.rewind(&header)?;
extension.validate_roots()?;
@ -1042,17 +1093,15 @@ fn setup_head(
batch.save_head(&tip)?;
batch.setup_height(&genesis.header, &tip)?;
// Apply the genesis block to our empty MMRs.
txhashset::extending(txhashset, &mut batch, |extension| {
extension.apply_block(&genesis)?;
// Save the block_sums to the db for use later.
extension
.batch
.save_block_sums(&genesis.hash(), &BlockSums::default())?;
Ok(())
})?;
// Save the block_sums to the db for use later.
batch.save_block_sums(&genesis.hash(), &BlockSums::default())?;
info!(LOGGER, "chain: init: saved genesis: {:?}", genesis.hash());
}
Err(e) => return Err(ErrorKind::StoreErr(e, "chain init load head".to_owned()))?,

View file

@ -91,10 +91,19 @@ pub fn process_block(b: &Block, ctx: &mut BlockContext) -> Result<Option<Tip>, E
// Check if this block is already know due it being in the current set of orphan blocks.
check_known_orphans(&b.header, ctx)?;
// Check we have *this* block in the store.
// Stop if we have processed this block previously (it is in the store).
// This is more expensive than the earlier check_known() as we hit the store.
check_known_store(&b.header, ctx)?;
}
// Header specific processing.
handle_block_header(&b.header, ctx)?;
{
validate_header(&b.header, ctx)?;
add_block_header(&b.header, ctx)?;
update_header_head(&b.header, ctx)?;
}
// Check if are processing the "next" block relative to the current chain head.
let head = ctx.batch.head()?;
@ -104,11 +113,6 @@ pub fn process_block(b: &Block, ctx: &mut BlockContext) -> Result<Option<Tip>, E
// * special case where this is the first fast sync full block
// Either way we can proceed (and we know the block is new and unprocessed).
} else {
// Check we have *this* block in the store.
// Stop if we have processed this block previously (it is in the store).
// This is more expensive than the earlier check_known() as we hit the store.
check_known_store(&b.header, ctx)?;
// At this point it looks like this is a new block that we have not yet processed.
// Check we have the *previous* block in the store.
// If we do not then treat this block as an orphan.
@ -128,7 +132,7 @@ pub fn process_block(b: &Block, ctx: &mut BlockContext) -> Result<Option<Tip>, E
if is_next_block(&b.header, &head) {
// No need to rewind if we are processing the next block.
} else {
// Rewind the re-apply blocks on the forked chain to
// Rewind and re-apply blocks on the forked chain to
// put the txhashset in the correct forked state
// (immediately prior to this new block).
rewind_and_apply_fork(b, extension)?;
@ -174,12 +178,8 @@ pub fn process_block(b: &Block, ctx: &mut BlockContext) -> Result<Option<Tip>, E
// Add the newly accepted block and header to our index.
add_block(b, ctx)?;
// Update the chain head (and header_head) if total work is increased.
let res = {
let _ = update_header_head(&b.header, ctx)?;
// Update the chain head if total work is increased.
let res = update_head(b, ctx)?;
res
};
Ok(res)
}
@ -209,8 +209,22 @@ pub fn sync_block_headers(
if !all_known {
for header in headers {
handle_block_header(header, ctx)?;
validate_header(header, ctx)?;
add_block_header(header, ctx)?;
}
let first_header = headers.first().unwrap();
let prev_header = ctx.batch.get_block_header(&first_header.previous)?;
txhashset::sync_extending(&mut ctx.txhashset, &mut ctx.batch, |extension| {
// Optimize this if "next" header
extension.rewind(&prev_header)?;
for header in headers {
extension.apply_header(header)?;
}
Ok(())
})?;
}
// Update header_head (if most work) and sync_head (regardless) in all cases,
@ -231,12 +245,6 @@ pub fn sync_block_headers(
}
}
fn handle_block_header(header: &BlockHeader, ctx: &mut BlockContext) -> Result<(), Error> {
validate_header(header, ctx)?;
add_block_header(header, ctx)?;
Ok(())
}
/// Process block header as part of "header first" block propagation.
/// We validate the header but we do not store it or update header head based
/// on this. We will update these once we get the block back after requesting

View file

@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! Utility structs to handle the 3 hashtrees (output, range proof,
//! kernel) more conveniently and transactionally.
//! Utility structs to handle the 3 MMRs (output, rangeproof,
//! kernel) along the overall header MMR conveniently and transactionally.
use std::collections::HashSet;
use std::fs::{self, File};
@ -28,26 +28,47 @@ use util::secp::pedersen::{Commitment, RangeProof};
use core::core::committed::Committed;
use core::core::hash::{Hash, Hashed};
use core::core::merkle_proof::MerkleProof;
use core::core::pmmr::{self, ReadonlyPMMR, RewindablePMMR, PMMR};
use core::core::pmmr::{self, ReadonlyPMMR, RewindablePMMR, DBPMMR, PMMR};
use core::core::{Block, BlockHeader, Input, Output, OutputFeatures, OutputIdentifier, TxKernel};
use core::global;
use core::ser::{PMMRIndexHashable, PMMRable};
use error::{Error, ErrorKind};
use grin_store;
use grin_store::pmmr::{PMMRBackend, PMMR_FILES};
use grin_store::pmmr::{HashOnlyMMRBackend, PMMRBackend, PMMR_FILES};
use grin_store::types::prune_noop;
use store::{Batch, ChainStore};
use txhashset::{RewindableKernelView, UTXOView};
use types::{TxHashSetRoots, TxHashsetWriteStatus};
use types::{Tip, TxHashSetRoots, TxHashsetWriteStatus};
use util::{file, secp_static, zip, LOGGER};
const HEADERHASHSET_SUBDIR: &'static str = "header";
const TXHASHSET_SUBDIR: &'static str = "txhashset";
const HEADER_HEAD_SUBDIR: &'static str = "header_head";
const SYNC_HEAD_SUBDIR: &'static str = "sync_head";
const OUTPUT_SUBDIR: &'static str = "output";
const RANGE_PROOF_SUBDIR: &'static str = "rangeproof";
const KERNEL_SUBDIR: &'static str = "kernel";
const TXHASHSET_ZIP: &'static str = "txhashset_snapshot.zip";
struct HashOnlyMMRHandle {
backend: HashOnlyMMRBackend,
last_pos: u64,
}
impl HashOnlyMMRHandle {
fn new(root_dir: &str, sub_dir: &str, file_name: &str) -> Result<HashOnlyMMRHandle, Error> {
let path = Path::new(root_dir).join(sub_dir).join(file_name);
fs::create_dir_all(path.clone())?;
let backend = HashOnlyMMRBackend::new(path.to_str().unwrap().to_string())?;
let last_pos = backend.unpruned_size()?;
Ok(HashOnlyMMRHandle { backend, last_pos })
}
}
struct PMMRHandle<T>
where
T: PMMRable,
@ -61,19 +82,17 @@ where
T: PMMRable + ::std::fmt::Debug,
{
fn new(
root_dir: String,
root_dir: &str,
sub_dir: &str,
file_name: &str,
prunable: bool,
header: Option<&BlockHeader>,
) -> Result<PMMRHandle<T>, Error> {
let path = Path::new(&root_dir).join(TXHASHSET_SUBDIR).join(file_name);
let path = Path::new(root_dir).join(sub_dir).join(file_name);
fs::create_dir_all(path.clone())?;
let be = PMMRBackend::new(path.to_str().unwrap().to_string(), prunable, header)?;
let sz = be.unpruned_size()?;
Ok(PMMRHandle {
backend: be,
last_pos: sz,
})
let backend = PMMRBackend::new(path.to_str().unwrap().to_string(), prunable, header)?;
let last_pos = backend.unpruned_size()?;
Ok(PMMRHandle { backend, last_pos })
}
}
@ -86,8 +105,24 @@ where
/// guaranteed to indicate whether an output is spent or not. The index
/// may have commitments that have already been spent, even with
/// pruning enabled.
pub struct TxHashSet {
/// Header MMR to support the header_head chain.
/// This is rewound and applied transactionally with the
/// output, rangeproof and kernel MMRs during an extension or a
/// readonly_extension.
/// It can also be rewound and applied separately via a header_extension.
/// Note: the header MMR is backed by the database maintains just the hash file.
header_pmmr_h: HashOnlyMMRHandle,
/// Header MMR to support exploratory sync_head.
/// The header_head and sync_head chains can diverge so we need to maintain
/// multiple header MMRs during the sync process.
///
/// Note: this is rewound and applied separately to the other MMRs
/// via a "sync_extension".
/// Note: the sync MMR is backed by the database and maintains just the hash file.
sync_pmmr_h: HashOnlyMMRHandle,
output_pmmr_h: PMMRHandle<OutputIdentifier>,
rproof_pmmr_h: PMMRHandle<RangeProof>,
kernel_pmmr_h: PMMRHandle<TxKernel>,
@ -104,9 +139,33 @@ impl TxHashSet {
header: Option<&BlockHeader>,
) -> Result<TxHashSet, Error> {
Ok(TxHashSet {
output_pmmr_h: PMMRHandle::new(root_dir.clone(), OUTPUT_SUBDIR, true, header)?,
rproof_pmmr_h: PMMRHandle::new(root_dir.clone(), RANGE_PROOF_SUBDIR, true, header)?,
kernel_pmmr_h: PMMRHandle::new(root_dir.clone(), KERNEL_SUBDIR, false, None)?,
header_pmmr_h: HashOnlyMMRHandle::new(
&root_dir,
HEADERHASHSET_SUBDIR,
HEADER_HEAD_SUBDIR,
)?,
sync_pmmr_h: HashOnlyMMRHandle::new(&root_dir, HEADERHASHSET_SUBDIR, SYNC_HEAD_SUBDIR)?,
output_pmmr_h: PMMRHandle::new(
&root_dir,
TXHASHSET_SUBDIR,
OUTPUT_SUBDIR,
true,
header,
)?,
rproof_pmmr_h: PMMRHandle::new(
&root_dir,
TXHASHSET_SUBDIR,
RANGE_PROOF_SUBDIR,
true,
header,
)?,
kernel_pmmr_h: PMMRHandle::new(
&root_dir,
TXHASHSET_SUBDIR,
KERNEL_SUBDIR,
false,
None,
)?,
commit_index,
})
}
@ -186,16 +245,23 @@ impl TxHashSet {
rproof_pmmr.elements_from_insertion_index(start_index, max_count)
}
/// Get sum tree roots
/// TODO: Return data instead of hashes
pub fn roots(&mut self) -> (Hash, Hash, Hash) {
/// Get MMR roots.
pub fn roots(&mut self) -> TxHashSetRoots {
let header_pmmr: DBPMMR<BlockHeader, _> =
DBPMMR::at(&mut self.header_pmmr_h.backend, self.header_pmmr_h.last_pos);
let output_pmmr: PMMR<OutputIdentifier, _> =
PMMR::at(&mut self.output_pmmr_h.backend, self.output_pmmr_h.last_pos);
let rproof_pmmr: PMMR<RangeProof, _> =
PMMR::at(&mut self.rproof_pmmr_h.backend, self.rproof_pmmr_h.last_pos);
let kernel_pmmr: PMMR<TxKernel, _> =
PMMR::at(&mut self.kernel_pmmr_h.backend, self.kernel_pmmr_h.last_pos);
(output_pmmr.root(), rproof_pmmr.root(), kernel_pmmr.root())
TxHashSetRoots {
header_root: header_pmmr.root(),
output_root: output_pmmr.root(),
rproof_root: rproof_pmmr.root(),
kernel_root: kernel_pmmr.root(),
}
}
/// build a new merkle proof for the given position
@ -255,8 +321,6 @@ pub fn extending_readonly<'a, F, T>(trees: &'a mut TxHashSet, inner: F) -> Resul
where
F: FnOnce(&mut Extension) -> Result<T, Error>,
{
let res: Result<T, Error>;
{
let commit_index = trees.commit_index.clone();
let batch = commit_index.batch()?;
@ -265,13 +329,20 @@ where
let header = batch.head_header()?;
trace!(LOGGER, "Starting new txhashset (readonly) extension.");
let res = {
let mut extension = Extension::new(trees, &batch, header);
extension.force_rollback();
res = inner(&mut extension);
}
// TODO - header_mmr may be out ahead via the header_head
// TODO - do we need to handle this via an explicit rewind on the header_mmr?
inner(&mut extension)
};
trace!(LOGGER, "Rollbacking txhashset (readonly) extension.");
trees.header_pmmr_h.backend.discard();
trees.output_pmmr_h.backend.discard();
trees.rproof_pmmr_h.backend.discard();
trees.kernel_pmmr_h.backend.discard();
@ -340,7 +411,7 @@ pub fn extending<'a, F, T>(
where
F: FnOnce(&mut Extension) -> Result<T, Error>,
{
let sizes: (u64, u64, u64);
let sizes: (u64, u64, u64, u64);
let res: Result<T, Error>;
let rollback: bool;
@ -353,6 +424,9 @@ where
let child_batch = batch.child()?;
{
trace!(LOGGER, "Starting new txhashset extension.");
// TODO - header_mmr may be out ahead via the header_head
// TODO - do we need to handle this via an explicit rewind on the header_mmr?
let mut extension = Extension::new(trees, &child_batch, header);
res = inner(&mut extension);
@ -366,6 +440,7 @@ where
LOGGER,
"Error returned, discarding txhashset extension: {}", e
);
trees.header_pmmr_h.backend.discard();
trees.output_pmmr_h.backend.discard();
trees.rproof_pmmr_h.backend.discard();
trees.kernel_pmmr_h.backend.discard();
@ -374,18 +449,21 @@ where
Ok(r) => {
if rollback {
trace!(LOGGER, "Rollbacking txhashset extension. sizes {:?}", sizes);
trees.header_pmmr_h.backend.discard();
trees.output_pmmr_h.backend.discard();
trees.rproof_pmmr_h.backend.discard();
trees.kernel_pmmr_h.backend.discard();
} else {
trace!(LOGGER, "Committing txhashset extension. sizes {:?}", sizes);
child_batch.commit()?;
trees.header_pmmr_h.backend.sync()?;
trees.output_pmmr_h.backend.sync()?;
trees.rproof_pmmr_h.backend.sync()?;
trees.kernel_pmmr_h.backend.sync()?;
trees.output_pmmr_h.last_pos = sizes.0;
trees.rproof_pmmr_h.last_pos = sizes.1;
trees.kernel_pmmr_h.last_pos = sizes.2;
trees.header_pmmr_h.last_pos = sizes.0;
trees.output_pmmr_h.last_pos = sizes.1;
trees.rproof_pmmr_h.last_pos = sizes.2;
trees.kernel_pmmr_h.last_pos = sizes.3;
}
trace!(LOGGER, "TxHashSet extension done.");
@ -394,12 +472,266 @@ where
}
}
/// Start a new sync MMR unit of work. This MMR tracks the sync_head.
/// This is used during header sync to validate batches of headers as they arrive
/// without needing to repeatedly rewind the header MMR that continues to track
/// the header_head as they diverge during sync.
pub fn sync_extending<'a, F, T>(
trees: &'a mut TxHashSet,
batch: &'a mut Batch,
inner: F,
) -> Result<T, Error>
where
F: FnOnce(&mut HeaderExtension) -> Result<T, Error>,
{
let size: u64;
let res: Result<T, Error>;
let rollback: bool;
// We want to use the current sync_head unless
// we explicitly rewind the extension.
let head = batch.get_sync_head()?;
let header = batch.get_block_header(&head.last_block_h)?;
// create a child transaction so if the state is rolled back by itself, all
// index saving can be undone
let child_batch = batch.child()?;
{
trace!(LOGGER, "Starting new txhashset sync_head extension.");
let pmmr = DBPMMR::at(&mut trees.sync_pmmr_h.backend, trees.sync_pmmr_h.last_pos);
let mut extension = HeaderExtension::new(pmmr, &child_batch, header);
res = inner(&mut extension);
rollback = extension.rollback;
size = extension.size();
}
match res {
Err(e) => {
debug!(
LOGGER,
"Error returned, discarding txhashset sync_head extension: {}", e
);
trees.sync_pmmr_h.backend.discard();
Err(e)
}
Ok(r) => {
if rollback {
trace!(
LOGGER,
"Rollbacking txhashset sync_head extension. size {:?}",
size
);
trees.sync_pmmr_h.backend.discard();
} else {
trace!(
LOGGER,
"Committing txhashset sync_head extension. size {:?}",
size
);
child_batch.commit()?;
trees.sync_pmmr_h.backend.sync()?;
trees.sync_pmmr_h.last_pos = size;
}
trace!(LOGGER, "TxHashSet sync_head extension done.");
Ok(r)
}
}
}
/// Start a new header MMR unit of work. This MMR tracks the header_head.
/// This MMR can be extended individually beyond the other (output, rangeproof and kernel) MMRs
/// to allow headers to be validated before we receive the full block data.
pub fn header_extending<'a, F, T>(
trees: &'a mut TxHashSet,
batch: &'a mut Batch,
inner: F,
) -> Result<T, Error>
where
F: FnOnce(&mut HeaderExtension) -> Result<T, Error>,
{
let size: u64;
let res: Result<T, Error>;
let rollback: bool;
// We want to use the current head of the header chain unless
// we explicitly rewind the extension.
let head = batch.header_head()?;
let header = batch.get_block_header(&head.last_block_h)?;
// create a child transaction so if the state is rolled back by itself, all
// index saving can be undone
let child_batch = batch.child()?;
{
trace!(LOGGER, "Starting new txhashset header extension.");
let pmmr = DBPMMR::at(
&mut trees.header_pmmr_h.backend,
trees.header_pmmr_h.last_pos,
);
let mut extension = HeaderExtension::new(pmmr, &child_batch, header);
res = inner(&mut extension);
rollback = extension.rollback;
size = extension.size();
}
match res {
Err(e) => {
debug!(
LOGGER,
"Error returned, discarding txhashset header extension: {}", e
);
trees.header_pmmr_h.backend.discard();
Err(e)
}
Ok(r) => {
if rollback {
trace!(
LOGGER,
"Rollbacking txhashset header extension. size {:?}",
size
);
trees.header_pmmr_h.backend.discard();
} else {
trace!(
LOGGER,
"Committing txhashset header extension. size {:?}",
size
);
child_batch.commit()?;
trees.header_pmmr_h.backend.sync()?;
trees.header_pmmr_h.last_pos = size;
}
trace!(LOGGER, "TxHashSet header extension done.");
Ok(r)
}
}
}
/// A header extension to allow the header MMR to extend beyond the other MMRs individually.
/// This is to allow headers to be validated against the MMR before we have the full block data.
pub struct HeaderExtension<'a> {
header: BlockHeader,
pmmr: DBPMMR<'a, BlockHeader, HashOnlyMMRBackend>,
/// Rollback flag.
rollback: bool,
/// Batch in which the extension occurs, public so it can be used within
/// an `extending` closure. Just be careful using it that way as it will
/// get rolled back with the extension (i.e on a losing fork).
pub batch: &'a Batch<'a>,
}
impl<'a> HeaderExtension<'a> {
fn new(
pmmr: DBPMMR<'a, BlockHeader, HashOnlyMMRBackend>,
batch: &'a Batch,
header: BlockHeader,
) -> HeaderExtension<'a> {
HeaderExtension {
header,
pmmr,
rollback: false,
batch,
}
}
/// Apply a new header to the header MMR extension.
/// This may be either the header MMR or the sync MMR depending on the
/// extension.
pub fn apply_header(&mut self, header: &BlockHeader) -> Result<(), Error> {
self.pmmr
.push(header.clone())
.map_err(&ErrorKind::TxHashSetErr)?;
self.header = header.clone();
Ok(())
}
/// Rewind the header extension to the specified header.
/// Note the close relationship between header height and insertion index.
pub fn rewind(&mut self, header: &BlockHeader) -> Result<(), Error> {
debug!(
LOGGER,
"Rewind header extension to {} at {}",
header.hash(),
header.height
);
let header_pos = pmmr::insertion_to_pmmr_index(header.height + 1);
self.pmmr
.rewind(header_pos)
.map_err(&ErrorKind::TxHashSetErr)?;
// Update our header to reflect the one we rewound to.
self.header = header.clone();
Ok(())
}
/// Truncate the header MMR (rewind all the way back to pos 0).
/// Used when rebuilding the header MMR by reapplying all headers
/// including the genesis block header.
pub fn truncate(&mut self) -> Result<(), Error> {
debug!(LOGGER, "Truncating header extension.");
self.pmmr.rewind(0).map_err(&ErrorKind::TxHashSetErr)?;
Ok(())
}
/// The size of the header MMR.
pub fn size(&self) -> u64 {
self.pmmr.unpruned_size()
}
/// TODO - think about how to optimize this.
/// Requires *all* header hashes to be iterated over in ascending order.
pub fn rebuild(&mut self, head: &Tip, genesis: &BlockHeader) -> Result<(), Error> {
debug!(
LOGGER,
"About to rebuild header extension from {:?} to {:?}.",
genesis.hash(),
head.last_block_h,
);
let mut header_hashes = vec![];
let mut current = self.batch.get_block_header(&head.last_block_h)?;
while current.height > 0 {
header_hashes.push(current.hash());
current = self.batch.get_block_header(&current.previous)?;
}
// Include the genesis header as we will re-apply it after truncating the extension.
header_hashes.push(genesis.hash());
header_hashes.reverse();
// Trucate the extension (back to pos 0).
self.truncate()?;
debug!(
LOGGER,
"Re-applying {} headers to extension, from {:?} to {:?}.",
header_hashes.len(),
header_hashes.first().unwrap(),
header_hashes.last().unwrap(),
);
for h in header_hashes {
let header = self.batch.get_block_header(&h)?;
// self.validate_header_root()?;
self.apply_header(&header)?;
}
Ok(())
}
}
/// Allows the application of new blocks on top of the sum trees in a
/// reversible manner within a unit of work provided by the `extending`
/// function.
pub struct Extension<'a> {
header: BlockHeader,
header_pmmr: DBPMMR<'a, BlockHeader, HashOnlyMMRBackend>,
output_pmmr: PMMR<'a, OutputIdentifier, PMMRBackend<OutputIdentifier>>,
rproof_pmmr: PMMR<'a, RangeProof, PMMRBackend<RangeProof>>,
kernel_pmmr: PMMR<'a, TxKernel, PMMRBackend<TxKernel>>,
@ -447,6 +779,10 @@ impl<'a> Extension<'a> {
fn new(trees: &'a mut TxHashSet, batch: &'a Batch, header: BlockHeader) -> Extension<'a> {
Extension {
header,
header_pmmr: DBPMMR::at(
&mut trees.header_pmmr_h.backend,
trees.header_pmmr_h.last_pos,
),
output_pmmr: PMMR::at(
&mut trees.output_pmmr_h.backend,
trees.output_pmmr_h.last_pos,
@ -508,7 +844,16 @@ impl<'a> Extension<'a> {
}
/// Apply a new block to the existing state.
///
/// Applies the following -
/// * header
/// * outputs
/// * inputs
/// * kernels
///
pub fn apply_block(&mut self, b: &Block) -> Result<(), Error> {
self.apply_header(&b.header)?;
for out in b.outputs() {
let pos = self.apply_output(out)?;
// Update the output_pos index for the new output.
@ -606,12 +951,18 @@ impl<'a> Extension<'a> {
Ok(output_pos)
}
/// Push kernel onto MMR (hash and data files).
fn apply_kernel(&mut self, kernel: &TxKernel) -> Result<(), Error> {
// push kernels in their MMR and file
self.kernel_pmmr
.push(kernel.clone())
.map_err(&ErrorKind::TxHashSetErr)?;
Ok(())
}
fn apply_header(&mut self, header: &BlockHeader) -> Result<(), Error> {
self.header_pmmr
.push(header.clone())
.map_err(&ErrorKind::TxHashSetErr)?;
Ok(())
}
@ -653,12 +1004,12 @@ impl<'a> Extension<'a> {
/// Rewinds the MMRs to the provided block, rewinding to the last output pos
/// and last kernel pos of that block.
pub fn rewind(&mut self, block_header: &BlockHeader) -> Result<(), Error> {
trace!(
pub fn rewind(&mut self, header: &BlockHeader) -> Result<(), Error> {
debug!(
LOGGER,
"Rewind to header {} @ {}",
block_header.height,
block_header.hash(),
"Rewind to header {} at {}",
header.hash(),
header.height,
);
// We need to build bitmaps of added and removed output positions
@ -667,16 +1018,19 @@ impl<'a> Extension<'a> {
// undone during rewind).
// Rewound output pos will be removed from the MMR.
// Rewound input (spent) pos will be added back to the MMR.
let rewind_rm_pos = input_pos_to_rewind(block_header, &self.header, &self.batch)?;
let rewind_rm_pos = input_pos_to_rewind(header, &self.header, &self.batch)?;
let header_pos = pmmr::insertion_to_pmmr_index(header.height + 1);
self.rewind_to_pos(
block_header.output_mmr_size,
block_header.kernel_mmr_size,
header_pos,
header.output_mmr_size,
header.kernel_mmr_size,
&rewind_rm_pos,
)?;
// Update our header to reflect the one we rewound to.
self.header = block_header.clone();
self.header = header.clone();
Ok(())
}
@ -685,17 +1039,22 @@ impl<'a> Extension<'a> {
/// kernel we want to rewind to.
fn rewind_to_pos(
&mut self,
header_pos: u64,
output_pos: u64,
kernel_pos: u64,
rewind_rm_pos: &Bitmap,
) -> Result<(), Error> {
trace!(
debug!(
LOGGER,
"Rewind txhashset to output {}, kernel {}",
"txhashset: rewind_to_pos: header {}, output {}, kernel {}",
header_pos,
output_pos,
kernel_pos,
);
self.header_pmmr
.rewind(header_pos)
.map_err(&ErrorKind::TxHashSetErr)?;
self.output_pmmr
.rewind(output_pos, rewind_rm_pos)
.map_err(&ErrorKind::TxHashSetErr)?;
@ -712,13 +1071,23 @@ impl<'a> Extension<'a> {
/// and kernel sum trees.
pub fn roots(&self) -> TxHashSetRoots {
TxHashSetRoots {
header_root: self.header_pmmr.root(),
output_root: self.output_pmmr.root(),
rproof_root: self.rproof_pmmr.root(),
kernel_root: self.kernel_pmmr.root(),
}
}
/// Validate the various MMR roots against the block header.
/// Validate the following MMR roots against the latest header applied -
/// * output
/// * rangeproof
/// * kernel
///
/// Note we do not validate the header MMR roots here as we need to validate
/// a header against the state of the MMR *prior* to applying it as
/// each header commits to the root of the MMR of all previous headers,
/// not including the header itself.
///
pub fn validate_roots(&self) -> Result<(), Error> {
// If we are validating the genesis block then we have no outputs or
// kernels. So we are done here.
@ -727,6 +1096,7 @@ impl<'a> Extension<'a> {
}
let roots = self.roots();
if roots.output_root != self.header.output_root
|| roots.rproof_root != self.header.range_proof_root
|| roots.kernel_root != self.header.kernel_root
@ -737,7 +1107,26 @@ impl<'a> Extension<'a> {
}
}
/// Validate the output and kernel MMR sizes against the block header.
/// Validate the provided header by comparing its "prev_root" to the
/// root of the current header MMR.
///
/// TODO - Implement this once we commit to prev_root in block headers.
///
pub fn validate_header_root(&self, _header: &BlockHeader) -> Result<(), Error> {
if self.header.height == 0 {
return Ok(());
}
let _roots = self.roots();
// TODO - validate once we commit to header MMR root in the header
// (not just previous hash)
// if roots.header_root != header.prev_root
Ok(())
}
/// Validate the header, output and kernel MMR sizes against the block header.
pub fn validate_sizes(&self) -> Result<(), Error> {
// If we are validating the genesis block then we have no outputs or
// kernels. So we are done here.
@ -745,10 +1134,14 @@ impl<'a> Extension<'a> {
return Ok(());
}
let (output_mmr_size, rproof_mmr_size, kernel_mmr_size) = self.sizes();
if output_mmr_size != self.header.output_mmr_size
|| kernel_mmr_size != self.header.kernel_mmr_size
{
let (header_mmr_size, output_mmr_size, rproof_mmr_size, kernel_mmr_size) = self.sizes();
let expected_header_mmr_size = pmmr::insertion_to_pmmr_index(self.header.height + 2) - 1;
if header_mmr_size != expected_header_mmr_size {
Err(ErrorKind::InvalidMMRSize.into())
} else if output_mmr_size != self.header.output_mmr_size {
Err(ErrorKind::InvalidMMRSize.into())
} else if kernel_mmr_size != self.header.kernel_mmr_size {
Err(ErrorKind::InvalidMMRSize.into())
} else if output_mmr_size != rproof_mmr_size {
Err(ErrorKind::InvalidMMRSize.into())
@ -761,6 +1154,9 @@ impl<'a> Extension<'a> {
let now = Instant::now();
// validate all hashes and sums within the trees
if let Err(e) = self.header_pmmr.validate() {
return Err(ErrorKind::InvalidTxHashSet(e).into());
}
if let Err(e) = self.output_pmmr.validate() {
return Err(ErrorKind::InvalidTxHashSet(e).into());
}
@ -773,7 +1169,8 @@ impl<'a> Extension<'a> {
debug!(
LOGGER,
"txhashset: validated the output {}, rproof {}, kernel {} mmrs, took {}s",
"txhashset: validated the header {}, output {}, rproof {}, kernel {} mmrs, took {}s",
self.header_pmmr.unpruned_size(),
self.output_pmmr.unpruned_size(),
self.rproof_pmmr.unpruned_size(),
self.kernel_pmmr.unpruned_size(),
@ -871,8 +1268,9 @@ impl<'a> Extension<'a> {
}
/// Sizes of each of the sum trees
pub fn sizes(&self) -> (u64, u64, u64) {
pub fn sizes(&self) -> (u64, u64, u64, u64) {
(
self.header_pmmr.unpruned_size(),
self.output_pmmr.unpruned_size(),
self.rproof_pmmr.unpruned_size(),
self.kernel_pmmr.unpruned_size(),

View file

@ -34,9 +34,11 @@ bitflags! {
}
/// A helper to hold the roots of the txhashset in order to keep them
/// readable
/// readable.
#[derive(Debug)]
pub struct TxHashSetRoots {
/// Header root
pub header_root: Hash,
/// Output root
pub output_root: Hash,
/// Range Proof root

View file

@ -34,7 +34,7 @@ use core::{
use global;
use keychain::{self, BlindingFactor};
use pow::{Difficulty, Proof, ProofOfWork};
use ser::{self, Readable, Reader, Writeable, Writer};
use ser::{self, PMMRable, Readable, Reader, Writeable, Writer};
use util::{secp, secp_static, static_secp_instance, LOGGER};
/// Errors thrown by Block validation
@ -196,6 +196,14 @@ impl Default for BlockHeader {
}
}
/// Block header hashes are maintained in the header MMR
/// but we store the data itself in the db.
impl PMMRable for BlockHeader {
fn len() -> usize {
0
}
}
/// Serialization of a block header
impl Writeable for BlockHeader {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {

View file

@ -53,11 +53,13 @@ impl fmt::Display for Hash {
}
impl Hash {
pub const SIZE: usize = 32;
/// Builds a Hash from a byte vector. If the vector is too short, it will be
/// completed by zeroes. If it's too long, it will be truncated.
pub fn from_vec(v: &[u8]) -> Hash {
let mut h = [0; 32];
let copy_size = min(v.len(), 32);
let mut h = [0; Hash::SIZE];
let copy_size = min(v.len(), Hash::SIZE);
h[..copy_size].copy_from_slice(&v[..copy_size]);
Hash(h)
}

View file

@ -18,6 +18,14 @@ use core::hash::Hash;
use core::BlockHeader;
use ser::PMMRable;
pub trait HashOnlyBackend {
fn append(&mut self, data: Vec<Hash>) -> Result<(), String>;
fn rewind(&mut self, position: u64) -> Result<(), String>;
fn get_hash(&self, position: u64) -> Option<Hash>;
}
/// Storage backend for the MMR, just needs to be indexed by order of insertion.
/// The PMMR itself does not need the Backend to be accurate on the existence
/// of an element (i.e. remove could be a no-op) but layers above can
@ -30,7 +38,7 @@ where
/// associated data element to flatfile storage (for leaf nodes only). The
/// position of the first element of the Vec in the MMR is provided to
/// help the implementation.
fn append(&mut self, position: u64, data: Vec<(Hash, Option<T>)>) -> Result<(), String>;
fn append(&mut self, data: T, hashes: Vec<Hash>) -> Result<(), String>;
/// Rewind the backend state to a previous position, as if all append
/// operations after that had been canceled. Expects a position in the PMMR

View file

@ -0,0 +1,173 @@
// Copyright 2018 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Database backed MMR.
use std::marker;
use core::hash::Hash;
use core::pmmr::{bintree_postorder_height, is_leaf, peak_map_height, peaks, HashOnlyBackend};
use ser::{PMMRIndexHashable, PMMRable};
/// Database backed MMR.
pub struct DBPMMR<'a, T, B>
where
T: PMMRable,
B: 'a + HashOnlyBackend,
{
/// The last position in the PMMR
last_pos: u64,
/// The backend for this readonly PMMR
backend: &'a mut B,
// only needed to parameterise Backend
_marker: marker::PhantomData<T>,
}
impl<'a, T, B> DBPMMR<'a, T, B>
where
T: PMMRable + ::std::fmt::Debug,
B: 'a + HashOnlyBackend,
{
/// Build a new db backed MMR.
pub fn new(backend: &'a mut B) -> DBPMMR<T, B> {
DBPMMR {
last_pos: 0,
backend: backend,
_marker: marker::PhantomData,
}
}
/// Build a new db backed MMR initialized to
/// last_pos with the provided db backend.
pub fn at(backend: &'a mut B, last_pos: u64) -> DBPMMR<T, B> {
DBPMMR {
last_pos: last_pos,
backend: backend,
_marker: marker::PhantomData,
}
}
pub fn unpruned_size(&self) -> u64 {
self.last_pos
}
pub fn is_empty(&self) -> bool {
self.last_pos == 0
}
pub fn rewind(&mut self, position: u64) -> Result<(), String> {
// Identify which actual position we should rewind to as the provided
// position is a leaf. We traverse the MMR to include any parent(s) that
// need to be included for the MMR to be valid.
let mut pos = position;
while bintree_postorder_height(pos + 1) > 0 {
pos += 1;
}
self.backend.rewind(pos)?;
self.last_pos = pos;
Ok(())
}
/// Get the hash element at provided position in the MMR.
pub fn get_hash(&self, pos: u64) -> Option<Hash> {
if pos > self.last_pos {
// If we are beyond the rhs of the MMR return None.
None
} else if is_leaf(pos) {
// If we are a leaf then get data from the backend.
self.backend.get_hash(pos)
} else {
// If we are not a leaf then return None as only leaves have data.
None
}
}
/// Push a new element into the MMR. Computes new related peaks at
/// the same time if applicable.
pub fn push(&mut self, elmt: T) -> Result<u64, String> {
let elmt_pos = self.last_pos + 1;
let mut current_hash = elmt.hash_with_index(elmt_pos - 1);
let mut to_append = vec![current_hash];
let mut pos = elmt_pos;
let (peak_map, height) = peak_map_height(pos - 1);
if height != 0 {
return Err(format!("bad mmr size {}", pos - 1));
}
// hash with all immediately preceding peaks, as indicated by peak map
let mut peak = 1;
while (peak_map & peak) != 0 {
let left_sibling = pos + 1 - 2 * peak;
let left_hash = self
.backend
.get_hash(left_sibling)
.ok_or("missing left sibling in tree, should not have been pruned")?;
peak *= 2;
pos += 1;
current_hash = (left_hash, current_hash).hash_with_index(pos - 1);
to_append.push(current_hash);
}
// append all the new nodes and update the MMR index
self.backend.append(to_append)?;
self.last_pos = pos;
Ok(elmt_pos)
}
pub fn peaks(&self) -> Vec<Hash> {
let peaks_pos = peaks(self.last_pos);
peaks_pos
.into_iter()
.filter_map(|pi| self.backend.get_hash(pi))
.collect()
}
pub fn root(&self) -> Hash {
let mut res = None;
for peak in self.peaks().iter().rev() {
res = match res {
None => Some(*peak),
Some(rhash) => Some((*peak, rhash).hash_with_index(self.unpruned_size())),
}
}
res.expect("no root, invalid tree")
}
pub fn validate(&self) -> Result<(), String> {
// iterate on all parent nodes
for n in 1..(self.last_pos + 1) {
let height = bintree_postorder_height(n);
if height > 0 {
if let Some(hash) = self.get_hash(n) {
let left_pos = n - (1 << height);
let right_pos = n - 1;
if let Some(left_child_hs) = self.get_hash(left_pos) {
if let Some(right_child_hs) = self.get_hash(right_pos) {
// hash the two child nodes together with parent_pos and compare
if (left_child_hs, right_child_hs).hash_with_index(n - 1) != hash {
return Err(format!(
"Invalid MMR, hash of parent at {} does \
not match children.",
n
));
}
}
}
}
}
}
Ok(())
}
}

View file

@ -37,11 +37,13 @@
//! either be a simple Vec or a database.
mod backend;
mod db_pmmr;
mod pmmr;
mod readonly_pmmr;
mod rewindable_pmmr;
pub use self::backend::*;
pub use self::db_pmmr::*;
pub use self::pmmr::*;
pub use self::readonly_pmmr::*;
pub use self::rewindable_pmmr::*;

View file

@ -173,7 +173,7 @@ where
let elmt_pos = self.last_pos + 1;
let mut current_hash = elmt.hash_with_index(elmt_pos - 1);
let mut to_append = vec![(current_hash, Some(elmt))];
let mut to_append = vec![current_hash];
let mut pos = elmt_pos;
let (peak_map, height) = peak_map_height(pos - 1);
@ -191,11 +191,11 @@ where
peak *= 2;
pos += 1;
current_hash = (left_hash, current_hash).hash_with_index(pos - 1);
to_append.push((current_hash, None));
to_append.push(current_hash);
}
// append all the new nodes and update the MMR index
self.backend.append(elmt_pos, to_append)?;
self.backend.append(elmt, to_append)?;
self.last_pos = pos;
Ok(elmt_pos)
}
@ -463,6 +463,9 @@ pub fn n_leaves(size: u64) -> u64 {
/// Returns the pmmr index of the nth inserted element
pub fn insertion_to_pmmr_index(mut sz: u64) -> u64 {
if sz == 0 {
return 0;
}
// 1 based pmmrs
sz -= 1;
2 * sz - sz.count_ones() as u64 + 1

View file

@ -19,7 +19,7 @@ use std::marker;
use core::hash::Hash;
use core::pmmr::{bintree_postorder_height, is_leaf, peaks, Backend};
use ser::{PMMRable, PMMRIndexHashable};
use ser::{PMMRIndexHashable, PMMRable};
/// Rewindable (but still readonly) view of a PMMR.
pub struct RewindablePMMR<'a, T, B>

View file

@ -438,7 +438,7 @@ fn pmmr_prune() {
}
// First check the initial numbers of elements.
assert_eq!(ba.elems.len(), 16);
assert_eq!(ba.hashes.len(), 16);
assert_eq!(ba.remove_list.len(), 0);
// pruning a leaf with no parent should do nothing
@ -447,7 +447,7 @@ fn pmmr_prune() {
pmmr.prune(16).unwrap();
assert_eq!(orig_root, pmmr.root());
}
assert_eq!(ba.elems.len(), 16);
assert_eq!(ba.hashes.len(), 16);
assert_eq!(ba.remove_list.len(), 1);
// pruning leaves with no shared parent just removes 1 element
@ -456,7 +456,7 @@ fn pmmr_prune() {
pmmr.prune(2).unwrap();
assert_eq!(orig_root, pmmr.root());
}
assert_eq!(ba.elems.len(), 16);
assert_eq!(ba.hashes.len(), 16);
assert_eq!(ba.remove_list.len(), 2);
{
@ -464,7 +464,7 @@ fn pmmr_prune() {
pmmr.prune(4).unwrap();
assert_eq!(orig_root, pmmr.root());
}
assert_eq!(ba.elems.len(), 16);
assert_eq!(ba.hashes.len(), 16);
assert_eq!(ba.remove_list.len(), 3);
// pruning a non-leaf node has no effect
@ -473,7 +473,7 @@ fn pmmr_prune() {
pmmr.prune(3).unwrap_err();
assert_eq!(orig_root, pmmr.root());
}
assert_eq!(ba.elems.len(), 16);
assert_eq!(ba.hashes.len(), 16);
assert_eq!(ba.remove_list.len(), 3);
// TODO - no longer true (leaves only now) - pruning sibling removes subtree
@ -482,7 +482,7 @@ fn pmmr_prune() {
pmmr.prune(5).unwrap();
assert_eq!(orig_root, pmmr.root());
}
assert_eq!(ba.elems.len(), 16);
assert_eq!(ba.hashes.len(), 16);
assert_eq!(ba.remove_list.len(), 4);
// TODO - no longer true (leaves only now) - pruning all leaves under level >1
@ -492,7 +492,7 @@ fn pmmr_prune() {
pmmr.prune(1).unwrap();
assert_eq!(orig_root, pmmr.root());
}
assert_eq!(ba.elems.len(), 16);
assert_eq!(ba.hashes.len(), 16);
assert_eq!(ba.remove_list.len(), 5);
// pruning everything should only leave us with a single peak
@ -503,7 +503,7 @@ fn pmmr_prune() {
}
assert_eq!(orig_root, pmmr.root());
}
assert_eq!(ba.elems.len(), 16);
assert_eq!(ba.hashes.len(), 16);
assert_eq!(ba.remove_list.len(), 9);
}

View file

@ -17,7 +17,7 @@ extern crate croaring;
use croaring::Bitmap;
use core::core::hash::Hash;
use core::core::pmmr::Backend;
use core::core::pmmr::{self, Backend};
use core::core::BlockHeader;
use core::ser;
use core::ser::{PMMRable, Readable, Reader, Writeable, Writer};
@ -59,7 +59,8 @@ where
T: PMMRable,
{
/// Backend elements
pub elems: Vec<Option<(Hash, Option<T>)>>,
pub data: Vec<T>,
pub hashes: Vec<Hash>,
/// Positions of removed elements
pub remove_list: Vec<u64>,
}
@ -68,8 +69,9 @@ impl<T> Backend<T> for VecBackend<T>
where
T: PMMRable,
{
fn append(&mut self, _position: u64, data: Vec<(Hash, Option<T>)>) -> Result<(), String> {
self.elems.append(&mut map_vec!(data, |d| Some(d.clone())));
fn append(&mut self, data: T, hashes: Vec<Hash>) -> Result<(), String> {
self.data.push(data);
self.hashes.append(&mut hashes.clone());
Ok(())
}
@ -77,11 +79,7 @@ where
if self.remove_list.contains(&position) {
None
} else {
if let Some(ref elem) = self.elems[(position - 1) as usize] {
Some(elem.0)
} else {
None
}
self.get_from_file(position)
}
}
@ -89,28 +87,19 @@ where
if self.remove_list.contains(&position) {
None
} else {
if let Some(ref elem) = self.elems[(position - 1) as usize] {
elem.1.clone()
} else {
None
}
self.get_data_from_file(position)
}
}
fn get_from_file(&self, position: u64) -> Option<Hash> {
if let Some(ref x) = self.elems[(position - 1) as usize] {
Some(x.0)
} else {
None
}
let hash = &self.hashes[(position - 1) as usize];
Some(hash.clone())
}
fn get_data_from_file(&self, position: u64) -> Option<T> {
if let Some(ref x) = self.elems[(position - 1) as usize] {
x.1.clone()
} else {
None
}
let idx = pmmr::n_leaves(position);
let data = &self.data[(idx - 1) as usize];
Some(data.clone())
}
fn remove(&mut self, position: u64) -> Result<(), String> {
@ -119,7 +108,9 @@ where
}
fn rewind(&mut self, position: u64, _rewind_rm_pos: &Bitmap) -> Result<(), String> {
self.elems = self.elems[0..(position as usize) + 1].to_vec();
let idx = pmmr::n_leaves(position);
self.data = self.data[0..(idx as usize) + 1].to_vec();
self.hashes = self.hashes[0..(position as usize) + 1].to_vec();
Ok(())
}
@ -141,20 +132,9 @@ where
/// Instantiates a new VecBackend<T>
pub fn new() -> VecBackend<T> {
VecBackend {
elems: vec![],
data: vec![],
hashes: vec![],
remove_list: vec![],
}
}
// /// Current number of elements in the underlying Vec.
// pub fn used_size(&self) -> usize {
// let mut usz = self.elems.len();
// for (idx, _) in self.elems.iter().enumerate() {
// let idx = idx as u64;
// if self.remove_list.contains(&idx) {
// usz -= 1;
// }
// }
// usz
// }
}

View file

@ -67,7 +67,13 @@ impl HeaderSync {
header_head.hash(),
header_head.height,
);
// Reset sync_head to the same as current header_head.
self.chain.reset_sync_head(&header_head).unwrap();
// Rebuild the sync MMR to match our updates sync_head.
self.chain.rebuild_sync_mmr(&header_head).unwrap();
self.history_locators.clear();
true
}

View file

@ -50,31 +50,6 @@ use byteorder::{BigEndian, WriteBytesExt};
pub use lmdb::*;
/// An iterator thad produces Readable instances back. Wraps the lower level
/// DBIterator and deserializes the returned values.
// pub struct SerIterator<T>
// where
// T: ser::Readable,
// {
// iter: DBIterator,
// _marker: marker::PhantomData<T>,
// }
//
// impl<T> Iterator for SerIterator<T>
// where
// T: ser::Readable,
// {
// type Item = T;
//
// fn next(&mut self) -> Option<T> {
// let next = self.iter.next();
// next.and_then(|r| {
// let (_, v) = r;
// ser::deserialize(&mut &v[..]).ok()
// })
// }
// }
/// Build a db key from a prefix and a byte vector identifier.
pub fn to_key(prefix: u8, k: &mut Vec<u8>) -> Vec<u8> {
let mut res = Vec::with_capacity(k.len() + 2);

View file

@ -18,12 +18,12 @@ use std::{fs, io, marker};
use croaring::Bitmap;
use core::core::hash::{Hash, Hashed};
use core::core::pmmr::{self, family, Backend};
use core::core::pmmr::{self, family, Backend, HashOnlyBackend};
use core::core::BlockHeader;
use core::ser::{self, PMMRable};
use leaf_set::LeafSet;
use prune_list::PruneList;
use types::{prune_noop, AppendOnlyFile};
use types::{prune_noop, AppendOnlyFile, HashFile};
use util::LOGGER;
const PMMR_HASH_FILE: &'static str = "pmmr_hash.bin";
@ -67,19 +67,19 @@ impl<T> Backend<T> for PMMRBackend<T>
where
T: PMMRable + ::std::fmt::Debug,
{
/// Append the provided Hashes to the backend storage.
/// Append the provided data and hashes to the backend storage.
/// Add the new leaf pos to our leaf_set if this is a prunable MMR.
#[allow(unused_variables)]
fn append(&mut self, position: u64, data: Vec<(Hash, Option<T>)>) -> Result<(), String> {
for d in data {
self.hash_file.append(&mut ser::ser_vec(&d.0).unwrap());
if let Some(elem) = d.1 {
self.data_file.append(&mut ser::ser_vec(&elem).unwrap());
fn append(&mut self, data: T, hashes: Vec<Hash>) -> Result<(), String> {
if self.prunable {
// Add the new position to our leaf_set.
let record_len = Hash::SIZE as u64;
let shift = self.prune_list.get_total_shift();
let position = (self.hash_file.size_unsync() / record_len) + shift + 1;
self.leaf_set.add(position);
}
}
self.data_file.append(&mut ser::ser_vec(&data).unwrap());
for ref h in hashes {
self.hash_file.append(&mut ser::ser_vec(h).unwrap());
}
Ok(())
}
@ -96,7 +96,7 @@ where
let pos = position - 1;
// Must be on disk, doing a read at the correct position
let hash_record_len = 32;
let hash_record_len = Hash::SIZE;
let file_offset = ((pos - shift) as usize) * hash_record_len;
let data = self.hash_file.read(file_offset, hash_record_len);
match ser::deserialize(&mut &data[..]) {
@ -165,7 +165,7 @@ where
// Rewind the hash file accounting for pruned/compacted pos
let shift = self.prune_list.get_shift(position);
let record_len = 32 as u64;
let record_len = Hash::SIZE as u64;
let file_pos = (position - shift) * record_len;
self.hash_file.rewind(file_pos);
@ -265,7 +265,7 @@ where
pub fn unpruned_size(&self) -> io::Result<u64> {
let total_shift = self.prune_list.get_total_shift();
let record_len = 32;
let record_len = Hash::SIZE as u64;
let sz = self.hash_file.size()?;
Ok(sz / record_len + total_shift)
}
@ -280,7 +280,7 @@ where
/// Size of the underlying hashed data. Extremely dependent on pruning
/// and compaction.
pub fn hash_size(&self) -> io::Result<u64> {
self.hash_file.size().map(|sz| sz / 32)
self.hash_file.size().map(|sz| sz / Hash::SIZE as u64)
}
/// Syncs all files to disk. A call to sync is required to ensure all the
@ -350,7 +350,7 @@ where
// 1. Save compact copy of the hash file, skipping removed data.
{
let record_len = 32;
let record_len = Hash::SIZE as u64;
let off_to_rm = map_vec!(pos_to_rm, |pos| {
let shift = self.prune_list.get_shift(pos.into());
@ -451,6 +451,65 @@ where
}
}
/// Simple MMR Backend for hashes only (data maintained in the db).
pub struct HashOnlyMMRBackend {
/// The hash file underlying this MMR backend.
hash_file: HashFile,
}
impl HashOnlyBackend for HashOnlyMMRBackend {
fn append(&mut self, hashes: Vec<Hash>) -> Result<(), String> {
for ref h in hashes {
self.hash_file
.append(h)
.map_err(|e| format!("Failed to append to backend, {:?}", e))?;
}
Ok(())
}
fn rewind(&mut self, position: u64) -> Result<(), String> {
self.hash_file
.rewind(position)
.map_err(|e| format!("Failed to rewind backend, {:?}", e))?;
Ok(())
}
fn get_hash(&self, position: u64) -> Option<Hash> {
self.hash_file.read(position)
}
}
impl HashOnlyMMRBackend {
/// Instantiates a new PMMR backend.
/// Use the provided dir to store its files.
pub fn new(data_dir: String) -> io::Result<HashOnlyMMRBackend> {
let hash_file = HashFile::open(format!("{}/{}", data_dir, PMMR_HASH_FILE))?;
Ok(HashOnlyMMRBackend { hash_file })
}
/// The unpruned size of this MMR backend.
pub fn unpruned_size(&self) -> io::Result<u64> {
let sz = self.hash_file.size()?;
Ok(sz / Hash::SIZE as u64)
}
/// Discard any pending changes to this MMR backend.
pub fn discard(&mut self) {
self.hash_file.discard();
}
/// Sync pending changes to the backend file on disk.
pub fn sync(&mut self) -> io::Result<()> {
if let Err(e) = self.hash_file.flush() {
return Err(io::Error::new(
io::ErrorKind::Interrupted,
format!("Could not write to hash storage, disk full? {:?}", e),
));
}
Ok(())
}
}
/// Filter remove list to exclude roots.
/// We want to keep roots around so we have hashes for Merkle proofs.
fn removed_excl_roots(removed: Bitmap) -> Bitmap {
@ -459,6 +518,5 @@ fn removed_excl_roots(removed: Bitmap) -> Bitmap {
.filter(|pos| {
let (parent_pos, _) = family(*pos as u64);
removed.contains(parent_pos as u32)
})
.collect()
}).collect()
}

View file

@ -134,8 +134,7 @@ impl RemoveLog {
None
}
},
)
.collect()
).collect()
}
}

View file

@ -25,11 +25,76 @@ use libc::{ftruncate as ftruncate64, off_t as off64_t};
#[cfg(any(target_os = "linux"))]
use libc::{ftruncate64, off64_t};
use core::core::hash::Hash;
use core::ser;
use util::LOGGER;
/// A no-op function for doing nothing with some pruned data.
pub fn prune_noop(_pruned_data: &[u8]) {}
/// Hash file (MMR) wrapper around an append only file.
pub struct HashFile {
file: AppendOnlyFile,
}
impl HashFile {
/// Open (or create) a hash file at the provided path on disk.
pub fn open(path: String) -> io::Result<HashFile> {
let file = AppendOnlyFile::open(path)?;
Ok(HashFile { file })
}
/// Append a hash to this hash file.
/// Will not be written to disk until flush() is subsequently called.
/// Alternatively discard() may be called to discard any pending changes.
pub fn append(&mut self, hash: &Hash) -> io::Result<()> {
let mut bytes = ser::ser_vec(hash).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
self.file.append(&mut bytes);
Ok(())
}
/// Read a hash from the hash file by position.
pub fn read(&self, position: u64) -> Option<Hash> {
// The MMR starts at 1, our binary backend starts at 0.
let pos = position - 1;
// Must be on disk, doing a read at the correct position
let file_offset = (pos as usize) * Hash::SIZE;
let data = self.file.read(file_offset, Hash::SIZE);
match ser::deserialize(&mut &data[..]) {
Ok(h) => Some(h),
Err(e) => {
error!(
LOGGER,
"Corrupted storage, could not read an entry from hash file: {:?}", e
);
return None;
}
}
}
/// Rewind the backend file to the specified position.
pub fn rewind(&mut self, position: u64) -> io::Result<()> {
self.file.rewind(position * Hash::SIZE as u64);
Ok(())
}
/// Flush unsynced changes to the hash file to disk.
pub fn flush(&mut self) -> io::Result<()> {
self.file.flush()
}
/// Discard any unsynced changes to the hash file.
pub fn discard(&mut self) {
self.file.discard()
}
/// Size of the hash file in bytes.
pub fn size(&self) -> io::Result<u64> {
self.file.size()
}
}
/// Wrapper for a file that can be read at any position (random read) but for
/// which writes are append only. Reads are backed by a memory map (mmap(2)),
/// relying on the operating system for fast access and caching. The memory
@ -246,6 +311,11 @@ impl AppendOnlyFile {
fs::metadata(&self.path).map(|md| md.len())
}
/// Current size of the (unsynced) file in bytes.
pub fn size_unsync(&self) -> u64 {
(self.buffer_start + self.buffer.len()) as u64
}
/// Path of the underlying file
pub fn path(&self) -> String {
self.path.clone()