Split header MMR (and sync MMR) out from txhashset (#3004)

* wip

* sync sort of works now

* get rid of the deadlock during compaction
there is *always* a deadlock in there when we make changes like this...

* cleanup how we rebuild the sync MMR on init

* cleanup rewind logic

* roll the "fix invalid root" changes into this PR

* move rebuild_height_pos_index into txhashset
and pass in header_pmmr for header lookups

* cleanup and remember to setup sync head on init

* cleanup unnecessary ref muts

* rebuild_height_pos_index when writing txhashset
This commit is contained in:
Antioch Peverell 2019-09-07 00:28:26 +01:00 committed by Gary Yu
parent d55ebe8a2b
commit 383985292c
12 changed files with 437 additions and 663 deletions

View file

@ -29,7 +29,7 @@ use crate::error::{Error, ErrorKind};
use crate::pipe;
use crate::store;
use crate::txhashset;
use crate::txhashset::TxHashSet;
use crate::txhashset::{PMMRHandle, TxHashSet};
use crate::types::{
BlockStatus, ChainAdapter, NoStatus, Options, OutputMMRPosition, Tip, TxHashSetRoots,
TxHashsetWriteStatus,
@ -150,6 +150,8 @@ pub struct Chain {
adapter: Arc<dyn ChainAdapter + Send + Sync>,
orphans: Arc<OrphanBlockPool>,
txhashset: Arc<RwLock<txhashset::TxHashSet>>,
header_pmmr: Arc<RwLock<txhashset::PMMRHandle<BlockHeader>>>,
sync_pmmr: Arc<RwLock<txhashset::PMMRHandle<BlockHeader>>>,
verifier_cache: Arc<RwLock<dyn VerifierCache>>,
// POW verification function
pow_verifier: fn(&BlockHeader) -> Result<(), pow::Error>,
@ -174,7 +176,17 @@ impl Chain {
// open the txhashset, creating a new one if necessary
let mut txhashset = txhashset::TxHashSet::open(db_root.clone(), store.clone(), None)?;
setup_head(&genesis, &store, &mut txhashset)?;
let mut header_pmmr =
PMMRHandle::new(&db_root, "header", "header_head", false, true, None)?;
let mut sync_pmmr = PMMRHandle::new(&db_root, "header", "sync_head", false, true, None)?;
setup_head(
&genesis,
&store,
&mut header_pmmr,
&mut sync_pmmr,
&mut txhashset,
)?;
Chain::log_heads(&store)?;
Ok(Chain {
@ -183,6 +195,8 @@ impl Chain {
adapter,
orphans: Arc::new(OrphanBlockPool::new()),
txhashset: Arc::new(RwLock::new(txhashset)),
header_pmmr: Arc::new(RwLock::new(header_pmmr)),
sync_pmmr: Arc::new(RwLock::new(sync_pmmr)),
pow_verifier,
verifier_cache,
archive_mode,
@ -190,6 +204,11 @@ impl Chain {
})
}
/// Return our shared header MMR handle.
pub fn header_pmmr(&self) -> Arc<RwLock<PMMRHandle<BlockHeader>>> {
self.header_pmmr.clone()
}
/// Return our shared txhashset instance.
pub fn txhashset(&self) -> Arc<RwLock<TxHashSet>> {
self.txhashset.clone()
@ -277,9 +296,10 @@ impl Chain {
/// or false if it has added to a fork (or orphan?).
fn process_block_single(&self, b: Block, opts: Options) -> Result<Option<Tip>, Error> {
let (maybe_new_head, prev_head) = {
let mut header_pmmr = self.header_pmmr.write();
let mut txhashset = self.txhashset.write();
let batch = self.store.batch()?;
let mut ctx = self.new_ctx(opts, batch, &mut txhashset)?;
let mut ctx = self.new_ctx(opts, batch, &mut header_pmmr, &mut txhashset)?;
let prev_head = ctx.batch.head()?;
@ -355,9 +375,10 @@ impl Chain {
/// Note: This will update header MMR and corresponding header_head
/// if total work increases (on the header chain).
pub fn process_block_header(&self, bh: &BlockHeader, opts: Options) -> Result<(), Error> {
let mut header_pmmr = self.header_pmmr.write();
let mut txhashset = self.txhashset.write();
let batch = self.store.batch()?;
let mut ctx = self.new_ctx(opts, batch, &mut txhashset)?;
let mut ctx = self.new_ctx(opts, batch, &mut header_pmmr, &mut txhashset)?;
pipe::process_block_header(bh, &mut ctx)?;
ctx.batch.commit()?;
Ok(())
@ -367,19 +388,26 @@ impl Chain {
/// This is only ever used during sync and is based on sync_head.
/// We update header_head here if our total work increases.
pub fn sync_block_headers(&self, headers: &[BlockHeader], opts: Options) -> Result<(), Error> {
let mut sync_pmmr = self.sync_pmmr.write();
let mut header_pmmr = self.header_pmmr.write();
let mut txhashset = self.txhashset.write();
let batch = self.store.batch()?;
let mut ctx = self.new_ctx(opts, batch, &mut txhashset)?;
// Sync the chunk of block headers, updating sync_head as necessary.
pipe::sync_block_headers(headers, &mut ctx)?;
{
let batch = self.store.batch()?;
let mut ctx = self.new_ctx(opts, batch, &mut sync_pmmr, &mut txhashset)?;
pipe::sync_block_headers(headers, &mut ctx)?;
ctx.batch.commit()?;
}
// Now "process" the last block header, updating header_head to match sync_head.
if let Some(header) = headers.last() {
let batch = self.store.batch()?;
let mut ctx = self.new_ctx(opts, batch, &mut header_pmmr, &mut txhashset)?;
pipe::process_block_header(header, &mut ctx)?;
ctx.batch.commit()?;
}
ctx.batch.commit()?;
Ok(())
}
@ -387,12 +415,14 @@ impl Chain {
&self,
opts: Options,
batch: store::Batch<'a>,
header_pmmr: &'a mut txhashset::PMMRHandle<BlockHeader>,
txhashset: &'a mut txhashset::TxHashSet,
) -> Result<pipe::BlockContext<'a>, Error> {
Ok(pipe::BlockContext {
opts,
pow_verifier: self.pow_verifier,
verifier_cache: self.verifier_cache.clone(),
header_pmmr,
txhashset,
batch,
})
@ -476,8 +506,9 @@ impl Chain {
/// Validate the tx against the current UTXO set.
pub fn validate_tx(&self, tx: &Transaction) -> Result<(), Error> {
let header_pmmr = self.header_pmmr.read();
let txhashset = self.txhashset.read();
txhashset::utxo_view(&txhashset, |utxo| {
txhashset::utxo_view(&header_pmmr, &txhashset, |utxo| {
utxo.validate_tx(tx)?;
Ok(())
})
@ -492,8 +523,9 @@ impl Chain {
/// that has not yet sufficiently matured.
pub fn verify_coinbase_maturity(&self, tx: &Transaction) -> Result<(), Error> {
let height = self.next_block_height()?;
let header_pmmr = self.header_pmmr.read();
let txhashset = self.txhashset.read();
txhashset::utxo_view(&txhashset, |utxo| {
txhashset::utxo_view(&header_pmmr, &txhashset, |utxo| {
utxo.verify_coinbase_maturity(&tx.inputs(), height)?;
Ok(())
})
@ -519,15 +551,16 @@ impl Chain {
return Ok(());
}
let mut header_pmmr = self.header_pmmr.write();
let mut txhashset = self.txhashset.write();
// Now create an extension from the txhashset and validate against the
// latest block header. Rewind the extension to the specified header to
// ensure the view is consistent.
txhashset::extending_readonly(&mut txhashset, |extension| {
let head = extension.batch.head()?;
pipe::rewind_and_apply_fork(&header, &head, extension)?;
extension.validate(fast_validation, &NoStatus)?;
txhashset::extending_readonly(&mut header_pmmr, &mut txhashset, |mut ext| {
pipe::rewind_and_apply_fork(&header, &mut ext)?;
let ref mut extension = ext.extension;
extension.validate(&self.genesis, fast_validation, &NoStatus)?;
Ok(())
})
}
@ -535,15 +568,19 @@ impl Chain {
/// Sets the txhashset roots on a brand new block by applying the block on
/// the current txhashset state.
pub fn set_txhashset_roots(&self, b: &mut Block) -> Result<(), Error> {
let mut header_pmmr = self.header_pmmr.write();
let mut txhashset = self.txhashset.write();
let (prev_root, roots, sizes) =
txhashset::extending_readonly(&mut txhashset, |extension| {
let previous_header = extension.batch.get_previous_header(&b.header)?;
let head = extension.batch.head()?;
pipe::rewind_and_apply_fork(&previous_header, &head, extension)?;
txhashset::extending_readonly(&mut header_pmmr, &mut txhashset, |ext| {
let previous_header = ext.batch().get_previous_header(&b.header)?;
pipe::rewind_and_apply_fork(&previous_header, ext)?;
let ref mut extension = ext.extension;
let ref mut header_extension = ext.header_extension;
// Retrieve the header root before we apply the new block
let prev_root = extension.header_root()?;
let prev_root = header_extension.root()?;
// Apply the latest block to the chain state via the extension.
extension.apply_block(b)?;
@ -562,7 +599,7 @@ impl Chain {
// Set the output and kernel MMR sizes.
{
// Carefully destructure these correctly...
let (_, output_mmr_size, _, kernel_mmr_size) = sizes;
let (output_mmr_size, _, kernel_mmr_size) = sizes;
b.header.output_mmr_size = output_mmr_size;
b.header.kernel_mmr_size = kernel_mmr_size;
}
@ -576,12 +613,14 @@ impl Chain {
output: &OutputIdentifier,
header: &BlockHeader,
) -> Result<MerkleProof, Error> {
let mut header_pmmr = self.header_pmmr.write();
let mut txhashset = self.txhashset.write();
let merkle_proof = txhashset::extending_readonly(&mut txhashset, |extension| {
let head = extension.batch.head()?;
pipe::rewind_and_apply_fork(&header, &head, extension)?;
extension.merkle_proof(output)
})?;
let merkle_proof =
txhashset::extending_readonly(&mut header_pmmr, &mut txhashset, |ext| {
pipe::rewind_and_apply_fork(&header, ext)?;
let ref mut extension = ext.extension;
extension.merkle_proof(output)
})?;
Ok(merkle_proof)
}
@ -631,11 +670,11 @@ impl Chain {
// to rewind after receiving the txhashset zip.
let header = self.get_block_header(&h)?;
{
let mut header_pmmr = self.header_pmmr.write();
let mut txhashset = self.txhashset.write();
txhashset::extending_readonly(&mut txhashset, |extension| {
let head = extension.batch.head()?;
pipe::rewind_and_apply_fork(&header, &head, extension)?;
txhashset::extending_readonly(&mut header_pmmr, &mut txhashset, |ext| {
pipe::rewind_and_apply_fork(&header, ext)?;
let ref mut extension = ext.extension;
extension.snapshot()?;
Ok(())
})?;
@ -706,31 +745,13 @@ impl Chain {
/// Rebuild the sync MMR based on current header_head.
/// We rebuild the sync MMR when first entering sync mode so ensure we
/// have an MMR we can safely rewind based on the headers received from a peer.
/// TODO - think about how to optimize this.
pub fn rebuild_sync_mmr(&self, head: &Tip) -> Result<(), Error> {
let mut txhashset = self.txhashset.write();
let mut sync_pmmr = self.sync_pmmr.write();
let mut batch = self.store.batch()?;
txhashset::sync_extending(&mut txhashset, &mut batch, |extension| {
extension.rebuild(head, &self.genesis)?;
Ok(())
})?;
batch.commit()?;
Ok(())
}
/// Rebuild the header MMR based on current header_head.
/// We rebuild the header MMR after receiving a txhashset from a peer.
/// The txhashset contains output, rangeproof and kernel MMRs but we construct
/// the header MMR locally based on headers from our db.
/// TODO - think about how to optimize this.
fn rebuild_header_mmr(
&self,
head: &Tip,
txhashset: &mut txhashset::TxHashSet,
) -> Result<(), Error> {
let mut batch = self.store.batch()?;
txhashset::header_extending(txhashset, &mut batch, |extension| {
extension.rebuild(head, &self.genesis)?;
let sync_head = batch.get_sync_head()?;
let header = batch.get_block_header(&head.hash())?;
txhashset::header_extending(&mut sync_pmmr, &sync_head, &mut batch, |extension| {
pipe::rewind_and_apply_header_fork(&header, extension)?;
Ok(())
})?;
batch.commit()?;
@ -824,7 +845,6 @@ impl Chain {
/// Clean the temporary sandbox folder
pub fn clean_txhashset_sandbox(&self) {
txhashset::clean_txhashset_folder(&self.get_tmp_dir());
txhashset::clean_header_folder(&self.get_tmp_dir());
}
/// Specific tmp dir.
@ -889,7 +909,6 @@ impl Chain {
// Write txhashset to sandbox (in the Grin specific tmp dir)
let sandbox_dir = self.get_tmp_dir();
txhashset::clean_txhashset_folder(&sandbox_dir);
txhashset::clean_header_folder(&sandbox_dir);
txhashset::zip_write(sandbox_dir.clone(), txhashset_data.try_clone()?, &header)?;
let mut txhashset = txhashset::TxHashSet::open(
@ -901,24 +920,21 @@ impl Chain {
Some(&header),
)?;
// The txhashset.zip contains the output, rangeproof and kernel MMRs.
// We must rebuild the header MMR ourselves based on the headers in our db.
self.rebuild_header_mmr(&Tip::from_header(&header), &mut txhashset)?;
// Validate the full kernel history (kernel MMR root for every block header).
self.validate_kernel_history(&header, &txhashset)?;
// all good, prepare a new batch and update all the required records
debug!("txhashset_write: rewinding a 2nd time (writeable)");
let mut header_pmmr = self.header_pmmr.write();
let mut batch = self.store.batch()?;
txhashset::extending(&mut txhashset, &mut batch, |extension| {
txhashset::extending(&mut header_pmmr, &mut txhashset, &mut batch, |ext| {
let extension = &mut ext.extension;
extension.rewind(&header)?;
// Validate the extension, generating the utxo_sum and kernel_sum.
// Full validation, including rangeproofs and kernel signature verification.
let (utxo_sum, kernel_sum) = extension.validate(false, status)?;
let (utxo_sum, kernel_sum) = extension.validate(&self.genesis, false, status)?;
// Save the block_sums (utxo_sum, kernel_sum) to the db for use later.
extension.batch.save_block_sums(
@ -946,6 +962,9 @@ impl Chain {
batch.save_body_tail(&tip)?;
}
// Rebuild our output_pos index in the db based on current UTXO set.
txhashset.rebuild_height_pos_index(&header_pmmr, &mut batch)?;
// Commit all the changes to the db.
batch.commit()?;
@ -969,21 +988,17 @@ impl Chain {
Some(&header),
)?;
self.rebuild_header_mmr(&Tip::from_header(&header), &mut txhashset)?;
txhashset::clean_header_folder(&sandbox_dir);
// Replace the chain txhashset with the newly built one.
*txhashset_ref = txhashset;
}
debug!("txhashset_write: replaced our txhashset with the new one");
self.rebuild_height_for_pos()?;
// Check for any orphan blocks and process them based on the new chain state.
self.check_orphans(header.height + 1);
status.on_done();
Ok(false)
}
@ -992,7 +1007,7 @@ impl Chain {
/// *Only* runs if we are not in archive mode.
fn remove_historical_blocks(
&self,
txhashset: &txhashset::TxHashSet,
header_pmmr: &txhashset::PMMRHandle<BlockHeader>,
batch: &mut store::Batch<'_>,
) -> Result<(), Error> {
if self.archive_mode {
@ -1019,7 +1034,7 @@ impl Chain {
}
let mut count = 0;
let tail_hash = txhashset.get_header_hash_by_height(head.height - horizon)?;
let tail_hash = header_pmmr.get_header_hash_by_height(head.height - horizon)?;
let tail = batch.get_block_header(&tail_hash)?;
// Remove old blocks (including short lived fork blocks) which height < tail.height
@ -1067,29 +1082,34 @@ impl Chain {
}
}
// Take a write lock on the txhashet and start a new writeable db batch.
let header_pmmr = self.header_pmmr.read();
let mut txhashset = self.txhashset.write();
let mut batch = self.store.batch()?;
// Compact the txhashset itself (rewriting the pruned backend files).
{
// Take a write lock on the txhashet and start a new writeable db batch.
let mut txhashset = self.txhashset.write();
let mut batch = self.store.batch()?;
let head_header = batch.head_header()?;
let current_height = head_header.height;
let horizon_height =
current_height.saturating_sub(global::cut_through_horizon().into());
let horizon_hash = header_pmmr.get_header_hash_by_height(horizon_height)?;
let horizon_header = batch.get_block_header(&horizon_hash)?;
// Compact the txhashset itself (rewriting the pruned backend files).
txhashset.compact(&mut batch)?;
// Rebuild our output_pos index in the db based on current UTXO set.
txhashset::extending(&mut txhashset, &mut batch, |extension| {
extension.rebuild_height_pos_index()?;
Ok(())
})?;
// If we are not in archival mode remove historical blocks from the db.
if !self.archive_mode {
self.remove_historical_blocks(&txhashset, &mut batch)?;
}
// Commit all the above db changes.
batch.commit()?;
txhashset.compact(&horizon_header, &mut batch)?;
}
// Rebuild our output_pos index in the db based on current UTXO set.
txhashset.rebuild_height_pos_index(&header_pmmr, &mut batch)?;
// If we are not in archival mode remove historical blocks from the db.
if !self.archive_mode {
self.remove_historical_blocks(&header_pmmr, &mut batch)?;
}
// Commit all the above db changes.
batch.commit()?;
Ok(())
}
@ -1202,20 +1222,16 @@ impl Chain {
}
/// Gets the block header at the provided height.
/// Note: Takes a read lock on the txhashset.
/// Take care not to call this repeatedly in a tight loop.
/// Note: Takes a read lock on the header_pmmr.
pub fn get_header_by_height(&self, height: u64) -> Result<BlockHeader, Error> {
let hash = self.get_header_hash_by_height(height)?;
self.get_block_header(&hash)
}
/// Gets the header hash at the provided height.
/// Note: Takes a read lock on the txhashset.
/// Take care not to call this repeatedly in a tight loop.
/// Note: Takes a read lock on the header_pmmr.
fn get_header_hash_by_height(&self, height: u64) -> Result<Hash, Error> {
let txhashset = self.txhashset.read();
let hash = txhashset.get_header_hash_by_height(height)?;
Ok(hash)
self.header_pmmr.read().get_header_hash_by_height(height)
}
/// Migrate the index 'commitment -> output_pos' to index 'commitment -> (output_pos, block_height)'
@ -1223,6 +1239,7 @@ impl Chain {
/// - Node start-up. For database migration from the old version.
/// - After the txhashset 'rebuild_index' when state syncing.
pub fn rebuild_height_for_pos(&self) -> Result<(), Error> {
let header_pmmr = self.header_pmmr.read();
let txhashset = self.txhashset.read();
let mut outputs_pos = txhashset.get_all_output_pos()?;
let total_outputs = outputs_pos.len();
@ -1248,7 +1265,8 @@ impl Chain {
let mut i = 0;
for search_height in 0..max_height {
let h = txhashset.get_header_by_height(search_height + 1)?;
let hash = header_pmmr.get_header_hash_by_height(search_height + 1)?;
let h = batch.get_block_header(&hash)?;
while i < total_outputs {
let (commit, pos) = outputs_pos[i];
if pos > h.output_mmr_size {
@ -1274,10 +1292,11 @@ impl Chain {
&self,
output_ref: &OutputIdentifier,
) -> Result<BlockHeader, Error> {
let header_pmmr = self.header_pmmr.read();
let txhashset = self.txhashset.read();
let output_pos = txhashset.is_unspent(output_ref)?;
Ok(txhashset.get_header_by_height(output_pos.height)?)
let hash = header_pmmr.get_header_hash_by_height(output_pos.height)?;
Ok(self.get_block_header(&hash)?)
}
/// Gets the kernel with a given excess and the block height it is included in.
@ -1318,7 +1337,7 @@ impl Chain {
min_height: Option<u64>,
max_height: Option<u64>,
) -> Result<BlockHeader, Error> {
let txhashset = self.txhashset.read();
let header_pmmr = self.header_pmmr.read();
let mut min = min_height.unwrap_or(0).saturating_sub(1);
let mut max = match max_height {
@ -1328,11 +1347,13 @@ impl Chain {
loop {
let search_height = max - (max - min) / 2;
let h = txhashset.get_header_by_height(search_height)?;
let hash = header_pmmr.get_header_hash_by_height(search_height)?;
let h = self.get_block_header(&hash)?;
if search_height == 0 {
return Ok(h);
}
let h_prev = txhashset.get_header_by_height(search_height - 1)?;
let hash_prev = header_pmmr.get_header_hash_by_height(search_height - 1)?;
let h_prev = self.get_block_header(&hash_prev)?;
if kernel_mmr_index > h.kernel_mmr_size {
min = search_height;
} else if kernel_mmr_index < h_prev.kernel_mmr_size {
@ -1386,6 +1407,8 @@ impl Chain {
fn setup_head(
genesis: &Block,
store: &store::ChainStore,
header_pmmr: &mut txhashset::PMMRHandle<BlockHeader>,
sync_pmmr: &mut txhashset::PMMRHandle<BlockHeader>,
txhashset: &mut txhashset::TxHashSet,
) -> Result<(), Error> {
let mut batch = store.batch()?;
@ -1403,24 +1426,11 @@ fn setup_head(
// to match the provided block header.
let header = batch.get_block_header(&head.last_block_h)?;
// If we have no header MMR then rebuild as necessary.
// Supports old nodes with no header MMR.
txhashset::header_extending(txhashset, &mut batch, |extension| {
let needs_rebuild = match extension.get_header_by_height(head.height) {
Ok(header) => header.hash() != head.last_block_h,
Err(_) => true,
};
let res = txhashset::extending(header_pmmr, txhashset, &mut batch, |ext| {
pipe::rewind_and_apply_fork(&header, ext)?;
if needs_rebuild {
extension.rebuild(&head, &genesis.header)?;
}
let ref mut extension = ext.extension;
Ok(())
})?;
let res = txhashset::extending(txhashset, &mut batch, |extension| {
let head = extension.batch.head()?;
pipe::rewind_and_apply_fork(&header, &head, extension)?;
extension.validate_roots()?;
// now check we have the "block sums" for the block in question
@ -1436,7 +1446,8 @@ fn setup_head(
// Do a full (and slow) validation of the txhashset extension
// to calculate the utxo_sum and kernel_sum at this block height.
let (utxo_sum, kernel_sum) = extension.validate_kernel_sums()?;
let (utxo_sum, kernel_sum) =
extension.validate_kernel_sums(&genesis.header)?;
// Save the block_sums to the db for use later.
extension.batch.save_block_sums(
@ -1494,11 +1505,16 @@ fn setup_head(
kernel_sum,
};
}
txhashset::header_extending(txhashset, &mut batch, |extension| {
txhashset::header_extending(header_pmmr, &tip, &mut batch, |extension| {
extension.apply_header(&genesis.header)?;
Ok(())
})?;
txhashset::extending(txhashset, &mut batch, |extension| {
txhashset::header_extending(sync_pmmr, &tip, &mut batch, |extension| {
extension.apply_header(&genesis.header)?;
Ok(())
})?;
txhashset::extending(header_pmmr, txhashset, &mut batch, |ext| {
let ref mut extension = ext.extension;
extension.apply_block(&genesis)?;
extension.validate_roots()?;
extension.validate_sizes()?;

View file

@ -39,6 +39,8 @@ pub struct BlockContext<'a> {
pub pow_verifier: fn(&BlockHeader) -> Result<(), pow::Error>,
/// The active txhashset (rewindable MMRs) to use for block processing.
pub txhashset: &'a mut txhashset::TxHashSet,
/// The active header MMR handle.
pub header_pmmr: &'a mut txhashset::PMMRHandle<BlockHeader>,
/// The active batch to use for block processing.
pub batch: store::Batch<'a>,
/// The verifier cache (caching verifier for rangeproofs and kernel signatures)
@ -113,35 +115,38 @@ pub fn process_block(b: &Block, ctx: &mut BlockContext<'_>) -> Result<Option<Tip
// Start a chain extension unit of work dependent on the success of the
// internal validation and saving operations
let block_sums = txhashset::extending(&mut ctx.txhashset, &mut ctx.batch, |mut extension| {
rewind_and_apply_fork(&prev, &head, extension)?;
let ref mut header_pmmr = &mut ctx.header_pmmr;
let ref mut txhashset = &mut ctx.txhashset;
let ref mut batch = &mut ctx.batch;
let block_sums = txhashset::extending(header_pmmr, txhashset, batch, |ext| {
rewind_and_apply_fork(&prev, ext)?;
// Check any coinbase being spent have matured sufficiently.
// This needs to be done within the context of a potentially
// rewound txhashset extension to reflect chain state prior
// to applying the new block.
verify_coinbase_maturity(b, &mut extension)?;
verify_coinbase_maturity(b, ext)?;
// Validate the block against the UTXO set.
validate_utxo(b, &mut extension)?;
validate_utxo(b, ext)?;
// Using block_sums (utxo_sum, kernel_sum) for the previous block from the db
// we can verify_kernel_sums across the full UTXO sum and full kernel sum
// accounting for inputs/outputs/kernels in this new block.
// We know there are no double-spends etc. if this verifies successfully.
// Remember to save these to the db later on (regardless of extension rollback)
let block_sums = verify_block_sums(b, &extension.batch)?;
let block_sums = verify_block_sums(b, ext.batch())?;
// Apply the block to the txhashset state.
// Validate the txhashset roots and sizes against the block header.
// Block is invalid if there are any discrepencies.
apply_block_to_txhashset(b, &mut extension)?;
apply_block_to_txhashset(b, ext)?;
// If applying this block does not increase the work on the chain then
// we know we have not yet updated the chain to produce a new chain head.
let head = extension.batch.head()?;
let head = ext.batch().head()?;
if !has_more_work(&b.header, &head) {
extension.force_rollback();
ext.extension.force_rollback();
}
Ok(block_sums)
@ -175,10 +180,7 @@ pub fn sync_block_headers(
if headers.is_empty() {
return Ok(());
}
let first_header = headers.first().expect("first header");
let last_header = headers.last().expect("last header");
let prev_header = ctx.batch.get_previous_header(&first_header)?;
// Check if we know about all these headers. If so we can accept them quickly.
// If they *do not* increase total work on the sync chain we are done.
@ -190,22 +192,19 @@ pub fn sync_block_headers(
}
}
txhashset::sync_extending(&mut ctx.txhashset, &mut ctx.batch, |extension| {
rewind_and_apply_header_fork(&prev_header, extension)?;
for header in headers {
extension.validate_root(header)?;
extension.apply_header(header)?;
add_block_header(header, &extension.batch)?;
}
Ok(())
})?;
// Validate all our headers now that we have added each "previous"
// header to the db in this batch above.
// Validate each header in the chunk and add to our db.
// Note: This batch may be rolled back later if the MMR does not validate successfully.
for header in headers {
validate_header(header, ctx)?;
add_block_header(header, &ctx.batch)?;
}
// Now apply this entire chunk of headers to the sync MMR.
txhashset::header_extending(&mut ctx.header_pmmr, &sync_head, &mut ctx.batch, |ext| {
rewind_and_apply_header_fork(&last_header, ext)?;
Ok(())
})?;
if has_more_work(&last_header, &sync_head) {
update_sync_head(&Tip::from_header(&last_header), &mut ctx.batch)?;
}
@ -221,8 +220,7 @@ pub fn process_block_header(header: &BlockHeader, ctx: &mut BlockContext<'_>) ->
// Check this header is not an orphan, we must know about the previous header to continue.
let prev_header = ctx.batch.get_previous_header(&header)?;
// If this header is "known" then stop processing the header.
// Do not stop processing with an error though.
// Check if we know about the full block for this header.
if check_known(header, ctx).is_err() {
return Ok(());
}
@ -238,12 +236,12 @@ pub fn process_block_header(header: &BlockHeader, ctx: &mut BlockContext<'_>) ->
}
}
txhashset::header_extending(&mut ctx.txhashset, &mut ctx.batch, |extension| {
rewind_and_apply_header_fork(&prev_header, extension)?;
extension.validate_root(header)?;
extension.apply_header(header)?;
txhashset::header_extending(&mut ctx.header_pmmr, &header_head, &mut ctx.batch, |ext| {
rewind_and_apply_header_fork(&prev_header, ext)?;
ext.validate_root(header)?;
ext.apply_header(header)?;
if !has_more_work(&header, &header_head) {
extension.force_rollback();
ext.force_rollback();
}
Ok(())
})?;
@ -413,8 +411,14 @@ fn validate_block(block: &Block, ctx: &mut BlockContext<'_>) -> Result<(), Error
}
/// Verify the block is not spending coinbase outputs before they have sufficiently matured.
fn verify_coinbase_maturity(block: &Block, ext: &txhashset::Extension<'_>) -> Result<(), Error> {
ext.utxo_view()
fn verify_coinbase_maturity(
block: &Block,
ext: &txhashset::ExtensionPair<'_>,
) -> Result<(), Error> {
let ref extension = ext.extension;
let ref header_extension = ext.header_extension;
extension
.utxo_view(header_extension)
.verify_coinbase_maturity(&block.inputs(), block.header.height)
}
@ -445,12 +449,12 @@ fn verify_block_sums(b: &Block, batch: &store::Batch<'_>) -> Result<BlockSums, E
/// Check both the txhashset roots and sizes are correct after applying the block.
fn apply_block_to_txhashset(
block: &Block,
ext: &mut txhashset::Extension<'_>,
ext: &mut txhashset::ExtensionPair<'_>,
) -> Result<(), Error> {
ext.validate_header_root(&block.header)?;
ext.apply_block(block)?;
ext.validate_roots()?;
ext.validate_sizes()?;
let ref mut extension = ext.extension;
extension.apply_block(block)?;
extension.validate_roots()?;
extension.validate_sizes()?;
Ok(())
}
@ -545,6 +549,7 @@ pub fn rewind_and_apply_header_fork(
.batch
.get_block_header(&h)
.map_err(|e| ErrorKind::StoreErr(e, format!("getting forked headers")))?;
ext.validate_root(&header)?;
ext.apply_header(&header)?;
}
@ -557,48 +562,35 @@ pub fn rewind_and_apply_header_fork(
/// the expected state.
pub fn rewind_and_apply_fork(
header: &BlockHeader,
header_head: &Tip,
ext: &mut txhashset::Extension<'_>,
ext: &mut txhashset::ExtensionPair<'_>,
) -> Result<(), Error> {
// TODO - Skip the "rewind and reapply" if everything is aligned and this is the "next" block.
// This will be significantly easier once we break out the header extension.
let ref mut batch = ext.batch();
let ref mut extension = ext.extension;
let ref mut header_extension = ext.header_extension;
// Find the fork point where head and header_head diverge.
// We may need to rewind back to this fork point if they diverged
// prior to the fork point for the provided header.
let header_forked_header = {
let mut current = ext.batch.get_block_header(&header_head.last_block_h)?;
while current.height > 0 && !ext.is_on_current_chain(&current).is_ok() {
current = ext.batch.get_previous_header(&current)?;
}
current
};
// Prepare the header MMR.
rewind_and_apply_header_fork(header, header_extension)?;
// Find the fork point where the provided header diverges from our main chain.
// Account for the header fork point. Use the earliest fork point to determine
// where we need to rewind to. We need to do this
let (forked_header, fork_hashes) = {
let mut fork_hashes = vec![];
let mut current = header.clone();
while current.height > 0
&& (!ext.is_on_current_chain(&current).is_ok()
|| current.height > header_forked_header.height)
{
fork_hashes.push(current.hash());
current = ext.batch.get_previous_header(&current)?;
}
fork_hashes.reverse();
// Rewind the txhashset extension back to common ancestor based on header MMR.
let mut current = batch.head_header()?;
while current.height > 0 && !header_extension.is_on_current_chain(&current).is_ok() {
current = batch.get_previous_header(&current)?;
}
let fork_point = current;
extension.rewind(&fork_point)?;
(current, fork_hashes)
};
// Then apply all full blocks since this common ancestor
// to put txhashet extension in a state to accept the new block.
let mut fork_hashes = vec![];
let mut current = header.clone();
while current.height > fork_point.height {
fork_hashes.push(current.hash());
current = batch.get_previous_header(&current)?;
}
fork_hashes.reverse();
// Rewind the txhashset state back to the block where we forked from the most work chain.
ext.rewind(&forked_header)?;
// Now re-apply all blocks on this fork.
for h in fork_hashes {
let fb = ext
.batch
let fb = batch
.get_block(&h)
.map_err(|e| ErrorKind::StoreErr(e, format!("getting forked blocks")))?;
@ -607,7 +599,7 @@ pub fn rewind_and_apply_fork(
// Validate the block against the UTXO set.
validate_utxo(&fb, ext)?;
// Re-verify block_sums to set the block_sums up on this fork correctly.
verify_block_sums(&fb, &ext.batch)?;
verify_block_sums(&fb, batch)?;
// Re-apply the blocks.
apply_block_to_txhashset(&fb, ext)?;
}
@ -615,6 +607,8 @@ pub fn rewind_and_apply_fork(
Ok(())
}
fn validate_utxo(block: &Block, ext: &txhashset::Extension<'_>) -> Result<(), Error> {
ext.utxo_view().validate_block(block)
fn validate_utxo(block: &Block, ext: &mut txhashset::ExtensionPair<'_>) -> Result<(), Error> {
let ref mut extension = ext.extension;
let ref mut header_extension = ext.header_extension;
extension.utxo_view(header_extension).validate_block(block)
}

File diff suppressed because it is too large Load diff

View file

@ -111,7 +111,7 @@ impl<'a> UTXOView<'a> {
.unwrap_or(0);
if pos > 0 {
// If we have not yet reached 1,000 / 1,440 blocks then
// If we have not yet reached 1440 blocks then
// we can fail immediately as coinbase cannot be mature.
if height < global::coinbase_maturity() {
return Err(ErrorKind::ImmatureCoinbase.into());

View file

@ -205,10 +205,8 @@ impl TxHashsetWriteStatus for SyncState {
/// A helper to hold the roots of the txhashset in order to keep them
/// readable.
#[derive(Debug)]
#[derive(Debug, PartialEq)]
pub struct TxHashSetRoots {
/// Header root
pub header_root: Hash,
/// Output root
pub output_root: Hash,
/// Range Proof root

View file

@ -13,7 +13,6 @@
// limitations under the License.
use self::core::core::hash::Hashed;
use grin_chain as chain;
use grin_core as core;
use grin_util as util;

View file

@ -84,8 +84,6 @@ fn test_unexpected_zip() {
);
assert!(txhashset::zip_read(db_root.clone(), &head).is_ok());
let txhashset_zip_path =
Path::new(&db_root).join(format!("txhashset_zip_{}", head.hash().to_string()));
let _ = fs::remove_dir_all(
Path::new(&db_root).join(format!("txhashset_zip_{}", head.hash().to_string())),
);

View file

@ -216,13 +216,6 @@ where
Ok(())
}
/// Truncate the MMR by rewinding back to empty state.
pub fn truncate(&mut self) -> Result<(), String> {
self.backend.rewind(0, &Bitmap::create())?;
self.last_pos = 0;
Ok(())
}
/// Rewind the PMMR to a previous position, as if all push operations after
/// that had been canceled. Expects a position in the PMMR to rewind and
/// bitmaps representing the positions added and removed that we want to

View file

@ -86,6 +86,11 @@ where
}
}
/// Iterator over current (unpruned, unremoved) leaf positions.
pub fn leaf_pos_iter(&self) -> impl Iterator<Item = u64> + '_ {
self.backend.leaf_pos_iter()
}
/// Is the MMR empty?
pub fn is_empty(&self) -> bool {
self.last_pos == 0

View file

@ -310,8 +310,8 @@ impl p2p::ChainAdapter for NetToChainAdapter {
let max_height = self.chain().header_head()?.height;
let txhashset = self.chain().txhashset();
let txhashset = txhashset.read();
let header_pmmr = self.chain().header_pmmr();
let header_pmmr = header_pmmr.read();
// looks like we know one, getting as many following headers as allowed
let hh = header.height;
@ -321,7 +321,8 @@ impl p2p::ChainAdapter for NetToChainAdapter {
break;
}
if let Ok(header) = txhashset.get_header_by_height(h) {
if let Ok(hash) = header_pmmr.get_header_hash_by_height(h) {
let header = self.chain().get_block_header(&hash)?;
headers.push(header);
} else {
error!("Failed to locate headers successfully.");
@ -498,14 +499,16 @@ impl NetToChainAdapter {
// Find the first locator hash that refers to a known header on our main chain.
fn find_common_header(&self, locator: &[Hash]) -> Option<BlockHeader> {
let txhashset = self.chain().txhashset();
let txhashset = txhashset.read();
let header_pmmr = self.chain().header_pmmr();
let header_pmmr = header_pmmr.read();
for hash in locator {
if let Ok(header) = self.chain().get_block_header(&hash) {
if let Ok(header_at_height) = txhashset.get_header_by_height(header.height) {
if header.hash() == header_at_height.hash() {
return Some(header);
if let Ok(hash_at_height) = header_pmmr.get_header_hash_by_height(header.height) {
if let Ok(header_at_height) = self.chain().get_block_header(&hash_at_height) {
if header.hash() == header_at_height.hash() {
return Some(header);
}
}
}
}

View file

@ -435,8 +435,8 @@ impl Server {
let tip_height = self.head()?.height as i64;
let mut height = tip_height as i64 - last_blocks.len() as i64 + 1;
let txhashset = self.chain.txhashset();
let txhashset = txhashset.read();
let header_pmmr = self.chain.header_pmmr();
let header_pmmr = header_pmmr.read();
let diff_entries: Vec<DiffBlock> = last_blocks
.windows(2)
@ -449,8 +449,8 @@ impl Server {
// Use header hash if real header.
// Default to "zero" hash if synthetic header_info.
let hash = if height >= 0 {
if let Ok(header) = txhashset.get_header_by_height(height as u64) {
header.hash()
if let Ok(hash) = header_pmmr.get_header_hash_by_height(height as u64) {
hash
} else {
ZERO_HASH
}

View file

@ -182,9 +182,10 @@ impl Controller {
}
if Utc::now().timestamp() > next_stat_update {
let stats = server.get_server_stats().unwrap();
self.ui.ui_tx.send(UIMessage::UpdateStatus(stats)).unwrap();
next_stat_update = Utc::now().timestamp() + stat_update_interval;
if let Ok(stats) = server.get_server_stats() {
self.ui.ui_tx.send(UIMessage::UpdateStatus(stats)).unwrap();
}
}
}
server.stop();