From 04c8713d83901eb83288f341a3b10b61696b778f Mon Sep 17 00:00:00 2001 From: Antioch Peverell Date: Fri, 31 Jul 2020 14:36:20 +0100 Subject: [PATCH] refactor apply_inputs and support converting block for v2 compatibility (#3409) * wip * convert incoming block to v2, rework orphan and duplicate check earlier than conversion * cleanup process_block_single * cleanup block conversion * cleanup * leverage utxo_view when applying block inputs --- chain/src/chain.rs | 121 ++++++++++++++++++++++++------- chain/src/pipe.rs | 28 +++---- chain/src/store.rs | 17 ++--- chain/src/txhashset/txhashset.rs | 92 ++++++++++++----------- chain/src/txhashset/utxo_view.rs | 76 +++++++++++++------ chain/tests/test_block_known.rs | 4 +- core/src/core/transaction.rs | 24 ++++++ 7 files changed, 239 insertions(+), 123 deletions(-) diff --git a/chain/src/chain.rs b/chain/src/chain.rs index 285600bba..e715ce6b5 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -19,7 +19,7 @@ use crate::core::core::hash::{Hash, Hashed, ZERO_HASH}; use crate::core::core::merkle_proof::MerkleProof; use crate::core::core::verifier_cache::VerifierCache; use crate::core::core::{ - Block, BlockHeader, BlockSums, Committed, KernelFeatures, Output, OutputIdentifier, + Block, BlockHeader, BlockSums, Committed, Inputs, KernelFeatures, Output, OutputIdentifier, Transaction, TxKernel, }; use crate::core::global; @@ -271,6 +271,41 @@ impl Chain { res } + /// We plan to support receiving blocks with CommitOnly inputs. + /// We also need to support relaying blocks with FeaturesAndCommit inputs to peers. + /// So we need a way to convert blocks from CommitOnly to FeaturesAndCommit. + /// Validating the inputs against the utxo_view allows us to look the outputs up. + fn convert_block_v2(&self, block: Block) -> Result { + debug!( + "convert_block_v2: {} at {}", + block.header.hash(), + block.header.height + ); + + if block.inputs().is_empty() { + return Ok(Block { + header: block.header, + body: block.body.replace_inputs(Inputs::FeaturesAndCommit(vec![])), + }); + } + + let mut header_pmmr = self.header_pmmr.write(); + let mut txhashset = self.txhashset.write(); + let outputs = + txhashset::extending_readonly(&mut header_pmmr, &mut txhashset, |ext, batch| { + let previous_header = batch.get_previous_header(&block.header)?; + pipe::rewind_and_apply_fork(&previous_header, ext, batch)?; + ext.extension + .utxo_view(ext.header_extension) + .validate_inputs(block.inputs(), batch) + })?; + let outputs: Vec<_> = outputs.into_iter().map(|(out, _)| out).collect(); + Ok(Block { + header: block.header, + body: block.body.replace_inputs(outputs.into()), + }) + } + fn determine_status(&self, head: Option, prev_head: Tip, fork_point: Tip) -> BlockStatus { // If head is updated then we are either "next" block or we just experienced a "reorg" to new head. // Otherwise this is a "fork" off the main chain. @@ -291,15 +326,69 @@ impl Chain { } } + /// Quick check for "known" duplicate block up to and including current chain head. + fn is_known(&self, header: &BlockHeader) -> Result<(), Error> { + let head = self.head()?; + if head.hash() == header.hash() { + return Err(ErrorKind::Unfit("duplicate block".into()).into()); + } + if header.total_difficulty() <= head.total_difficulty { + if self.block_exists(header.hash())? { + return Err(ErrorKind::Unfit("duplicate block".into()).into()); + } + } + Ok(()) + } + + // Check if the provided block is an orphan. + // If block is an orphan add it to our orphan block pool for deferred processing. + fn check_orphan(&self, block: &Block, opts: Options) -> Result<(), Error> { + if self.block_exists(block.header.prev_hash)? { + return Ok(()); + } + + let block_hash = block.hash(); + let orphan = Orphan { + block: block.clone(), + opts, + added: Instant::now(), + }; + self.orphans.add(orphan); + + debug!( + "is_orphan: {:?}, # orphans {}{}", + block_hash, + self.orphans.len(), + if self.orphans.len_evicted() > 0 { + format!(", # evicted {}", self.orphans.len_evicted()) + } else { + String::new() + }, + ); + + Err(ErrorKind::Orphan.into()) + } + /// Attempt to add a new block to the chain. /// Returns true if it has been added to the longest chain /// or false if it has added to a fork (or orphan?). fn process_block_single(&self, b: Block, opts: Options) -> Result, Error> { + // Check if we already know about this block. + self.is_known(&b.header)?; + // Process the header first. // If invalid then fail early. // If valid then continue with block processing with header_head committed to db etc. self.process_block_header(&b.header, opts)?; + // Check if this block is an orphan. + // Only do this once we know the header PoW is valid. + self.check_orphan(&b, opts)?; + + // Convert block to FeaturesAndCommit inputs. + // We know this block is not an orphan and header is valid at this point. + let b = self.convert_block_v2(b)?; + let (maybe_new_head, prev_head) = { let mut header_pmmr = self.header_pmmr.write(); let mut txhashset = self.txhashset.write(); @@ -331,28 +420,6 @@ impl Chain { Ok(head) } Err(e) => match e.kind() { - ErrorKind::Orphan => { - let block_hash = b.hash(); - let orphan = Orphan { - block: b, - opts: opts, - added: Instant::now(), - }; - - self.orphans.add(orphan); - - debug!( - "process_block: orphan: {:?}, # orphans {}{}", - block_hash, - self.orphans.len(), - if self.orphans.len_evicted() > 0 { - format!(", # evicted {}", self.orphans.len_evicted()) - } else { - String::new() - }, - ); - Err(ErrorKind::Orphan.into()) - } ErrorKind::Unfit(ref msg) => { debug!( "Block {} at {} is unfit at this time: {}", @@ -544,7 +611,8 @@ impl Chain { let header_pmmr = self.header_pmmr.read(); let txhashset = self.txhashset.read(); txhashset::utxo_view(&header_pmmr, &txhashset, |utxo, batch| { - utxo.validate_tx(tx, batch) + utxo.validate_tx(tx, batch)?; + Ok(()) }) } @@ -617,7 +685,7 @@ impl Chain { let prev_root = header_extension.root()?; // Apply the latest block to the chain state via the extension. - extension.apply_block(b, batch)?; + extension.apply_block(b, header_extension, batch)?; Ok((prev_root, extension.roots()?, extension.sizes())) })?; @@ -1576,7 +1644,8 @@ fn setup_head( }; } txhashset::extending(header_pmmr, txhashset, &mut batch, |ext, batch| { - ext.extension.apply_block(&genesis, batch) + ext.extension + .apply_block(&genesis, ext.header_extension, batch) })?; // Save the block_sums to the db for use later. diff --git a/chain/src/pipe.rs b/chain/src/pipe.rs index 014c18372..9ac1214fa 100644 --- a/chain/src/pipe.rs +++ b/chain/src/pipe.rs @@ -18,14 +18,13 @@ use crate::core::consensus; use crate::core::core::hash::Hashed; use crate::core::core::verifier_cache::VerifierCache; use crate::core::core::Committed; -use crate::core::core::{Block, BlockHeader, BlockSums}; +use crate::core::core::{Block, BlockHeader, BlockSums, OutputIdentifier}; use crate::core::pow; use crate::error::{Error, ErrorKind}; use crate::store; use crate::txhashset; -use crate::types::{Options, Tip}; +use crate::types::{CommitPos, Options, Tip}; use crate::util::RwLock; -use grin_store; use std::sync::Arc; /// Contextual information required to process a new block and either reject or @@ -104,17 +103,9 @@ pub fn process_block( // want to do this now and not later during header validation. validate_pow_only(&b.header, ctx)?; + // Get previous header from the db and check we have the corresponding full block. let prev = prev_header_store(&b.header, &mut ctx.batch)?; - - // Block is an orphan if we do not know about the previous full block. - // Skip this check if we have just processed the previous block - // or the full txhashset state (fast sync) at the previous block height. - { - let is_next = b.header.prev_hash == head.last_block_h; - if !is_next && !ctx.batch.block_exists(&prev.hash())? { - return Err(ErrorKind::Orphan.into()); - } - } + ctx.batch.block_exists(&prev.hash())?; // Process the header for the block. // Note: We still want to process the full block if we have seen this header before @@ -319,10 +310,7 @@ fn prev_header_store( header: &BlockHeader, batch: &mut store::Batch<'_>, ) -> Result { - let prev = batch.get_previous_header(&header).map_err(|e| match e { - grin_store::Error::NotFoundErr(_) => ErrorKind::Orphan, - _ => ErrorKind::StoreErr(e, "check prev header".into()), - })?; + let prev = batch.get_previous_header(&header)?; Ok(prev) } @@ -454,7 +442,8 @@ fn apply_block_to_txhashset( ext: &mut txhashset::ExtensionPair<'_>, batch: &store::Batch<'_>, ) -> Result<(), Error> { - ext.extension.apply_block(block, batch)?; + ext.extension + .apply_block(block, ext.header_extension, batch)?; ext.extension.validate_roots(&block.header)?; ext.extension.validate_sizes(&block.header)?; Ok(()) @@ -600,11 +589,12 @@ pub fn rewind_and_apply_fork( Ok(fork_point) } +/// Validate block inputs against utxo. fn validate_utxo( block: &Block, ext: &mut txhashset::ExtensionPair<'_>, batch: &store::Batch<'_>, -) -> Result<(), Error> { +) -> Result, Error> { let extension = &ext.extension; let header_extension = &ext.header_extension; extension diff --git a/chain/src/store.rs b/chain/src/store.rs index dff8fb094..10c2006c4 100644 --- a/chain/src/store.rs +++ b/chain/src/store.rs @@ -125,7 +125,7 @@ impl ChainStore { /// Get PMMR pos for the given output commitment. pub fn get_output_pos(&self, commit: &Commitment) -> Result { match self.get_output_pos_height(commit)? { - Some((pos, _)) => Ok(pos), + Some(pos) => Ok(pos.pos), None => Err(Error::NotFoundErr(format!( "Output position for: {:?}", commit @@ -134,7 +134,7 @@ impl ChainStore { } /// Get PMMR pos and block height for the given output commitment. - pub fn get_output_pos_height(&self, commit: &Commitment) -> Result, Error> { + pub fn get_output_pos_height(&self, commit: &Commitment) -> Result, Error> { self.db.get_ser(&to_key(OUTPUT_POS_PREFIX, commit)) } @@ -258,14 +258,9 @@ impl<'a> Batch<'a> { } /// Save output_pos and block height to index. - pub fn save_output_pos_height( - &self, - commit: &Commitment, - pos: u64, - height: u64, - ) -> Result<(), Error> { + pub fn save_output_pos_height(&self, commit: &Commitment, pos: CommitPos) -> Result<(), Error> { self.db - .put_ser(&to_key(OUTPUT_POS_PREFIX, commit)[..], &(pos, height)) + .put_ser(&to_key(OUTPUT_POS_PREFIX, commit)[..], &pos) } /// Delete the output_pos index entry for a spent output. @@ -290,7 +285,7 @@ impl<'a> Batch<'a> { /// Get output_pos from index. pub fn get_output_pos(&self, commit: &Commitment) -> Result { match self.get_output_pos_height(commit)? { - Some((pos, _)) => Ok(pos), + Some(pos) => Ok(pos.pos), None => Err(Error::NotFoundErr(format!( "Output position for: {:?}", commit @@ -299,7 +294,7 @@ impl<'a> Batch<'a> { } /// Get output_pos and block height from index. - pub fn get_output_pos_height(&self, commit: &Commitment) -> Result, Error> { + pub fn get_output_pos_height(&self, commit: &Commitment) -> Result, Error> { self.db.get_ser(&to_key(OUTPUT_POS_PREFIX, commit)) } diff --git a/chain/src/txhashset/txhashset.rs b/chain/src/txhashset/txhashset.rs index 4140b50d4..f9222dd6c 100644 --- a/chain/src/txhashset/txhashset.rs +++ b/chain/src/txhashset/txhashset.rs @@ -20,9 +20,7 @@ use crate::core::core::committed::Committed; use crate::core::core::hash::{Hash, Hashed}; use crate::core::core::merkle_proof::MerkleProof; use crate::core::core::pmmr::{self, Backend, ReadonlyPMMR, RewindablePMMR, PMMR}; -use crate::core::core::{ - Block, BlockHeader, Input, KernelFeatures, Output, OutputIdentifier, TxKernel, -}; +use crate::core::core::{Block, BlockHeader, KernelFeatures, Output, OutputIdentifier, TxKernel}; use crate::core::global; use crate::core::ser::{PMMRable, ProtocolVersion}; use crate::error::{Error, ErrorKind}; @@ -260,12 +258,12 @@ impl TxHashSet { pub fn get_unspent(&self, output_id: &OutputIdentifier) -> Result, Error> { let commit = output_id.commit; match self.commit_index.get_output_pos_height(&commit) { - Ok(Some((pos, height))) => { + Ok(Some(pos)) => { let output_pmmr: ReadonlyPMMR<'_, Output, _> = ReadonlyPMMR::at(&self.output_pmmr_h.backend, self.output_pmmr_h.last_pos); - if let Some(out) = output_pmmr.get_data(pos) { + if let Some(out) = output_pmmr.get_data(pos.pos) { if out == *output_id { - Ok(Some(CommitPos { pos, height })) + Ok(Some(pos)) } else { Ok(None) } @@ -568,7 +566,13 @@ impl TxHashSet { // Note: MMR position is 1-based and not 0-based, so here must be '>' instead of '>=' break; } - batch.save_output_pos_height(&commit, pos, h.height)?; + batch.save_output_pos_height( + &commit, + CommitPos { + pos, + height: h.height, + }, + )?; i += 1; } } @@ -1036,7 +1040,12 @@ impl<'a> Extension<'a> { /// Apply a new block to the current txhashet extension (output, rangeproof, kernel MMRs). /// Returns a vec of commit_pos representing the pos and height of the outputs spent /// by this block. - pub fn apply_block(&mut self, b: &Block, batch: &Batch<'_>) -> Result<(), Error> { + pub fn apply_block( + &mut self, + b: &Block, + header_ext: &HeaderExtension<'_>, + batch: &Batch<'_>, + ) -> Result<(), Error> { let mut affected_pos = vec![]; // Apply the output to the output and rangeproof MMRs. @@ -1045,20 +1054,30 @@ impl<'a> Extension<'a> { for out in b.outputs() { let pos = self.apply_output(out, batch)?; affected_pos.push(pos); - batch.save_output_pos_height(&out.commitment(), pos, b.header.height)?; + batch.save_output_pos_height( + &out.commitment(), + CommitPos { + pos, + height: b.header.height, + }, + )?; } - // Remove the output from the output and rangeproof MMRs. + // Use our utxo_view to identify outputs being spent by block inputs. + // Apply inputs to remove spent outputs from the output and rangeproof MMRs. // Add spent_pos to affected_pos to update the accumulator later on. - // Remove the spent output from the output_pos index. - let mut spent = vec![]; - let inputs: Vec<_> = b.inputs().into(); - for input in &inputs { - let pos = self.apply_input(input, batch)?; + // Remove the spent outputs from the output_pos index. + let spent = self + .utxo_view(header_ext) + .validate_inputs(b.inputs(), batch)?; + for (out, pos) in &spent { + self.apply_input(out.commitment(), *pos)?; affected_pos.push(pos.pos); - batch.delete_output_pos_height(&input.commitment())?; - spent.push(pos); + batch.delete_output_pos_height(&out.commitment())?; } + + // Update the spent index with spent pos. + let spent: Vec<_> = spent.into_iter().map(|(_, pos)| pos).collect(); batch.save_spent_index(&b.hash(), &spent)?; // Apply the kernels to the kernel MMR. @@ -1090,31 +1109,18 @@ impl<'a> Extension<'a> { ) } - fn apply_input(&mut self, input: &Input, batch: &Batch<'_>) -> Result { - let commit = input.commitment(); - if let Some((pos, height)) = batch.get_output_pos_height(&commit)? { - // First check this input corresponds to an existing entry in the output MMR. - if let Some(out) = self.output_pmmr.get_data(pos) { - if OutputIdentifier::from(input) != out { - return Err(ErrorKind::TxHashSetErr("output pmmr mismatch".to_string()).into()); - } + // Prune output and rangeproof PMMRs based on provided pos. + // Input is not valid if we cannot prune successfully. + fn apply_input(&mut self, commit: Commitment, pos: CommitPos) -> Result<(), Error> { + match self.output_pmmr.prune(pos.pos) { + Ok(true) => { + self.rproof_pmmr + .prune(pos.pos) + .map_err(ErrorKind::TxHashSetErr)?; + Ok(()) } - - // Now prune the output_pmmr, rproof_pmmr and their storage. - // Input is not valid if we cannot prune successfully (to spend an unspent - // output). - match self.output_pmmr.prune(pos) { - Ok(true) => { - self.rproof_pmmr - .prune(pos) - .map_err(ErrorKind::TxHashSetErr)?; - Ok(CommitPos { pos, height }) - } - Ok(false) => Err(ErrorKind::AlreadySpent(commit).into()), - Err(e) => Err(ErrorKind::TxHashSetErr(e).into()), - } - } else { - Err(ErrorKind::AlreadySpent(commit).into()) + Ok(false) => Err(ErrorKind::AlreadySpent(commit).into()), + Err(e) => Err(ErrorKind::TxHashSetErr(e).into()), } } @@ -1333,8 +1339,8 @@ impl<'a> Extension<'a> { // The output_pos index should be updated to reflect the old pos 1 when unspent. if let Ok(spent) = spent { let inputs: Vec<_> = block.inputs().into(); - for (x, y) in inputs.iter().zip(spent) { - batch.save_output_pos_height(&x.commitment(), y.pos, y.height)?; + for (input, pos) in inputs.iter().zip(spent) { + batch.save_output_pos_height(&input.commitment(), pos)?; } } diff --git a/chain/src/txhashset/utxo_view.rs b/chain/src/txhashset/utxo_view.rs index 929e3a96c..52d85f3a0 100644 --- a/chain/src/txhashset/utxo_view.rs +++ b/chain/src/txhashset/utxo_view.rs @@ -16,11 +16,12 @@ use crate::core::core::hash::{Hash, Hashed}; use crate::core::core::pmmr::{self, ReadonlyPMMR}; -use crate::core::core::{Block, BlockHeader, Input, Inputs, Output, OutputIdentifier, Transaction}; +use crate::core::core::{Block, BlockHeader, Inputs, Output, OutputIdentifier, Transaction}; use crate::core::global; use crate::error::{Error, ErrorKind}; use crate::store::Batch; -use crate::util::secp::pedersen::RangeProof; +use crate::types::CommitPos; +use crate::util::secp::pedersen::{Commitment, RangeProof}; use grin_store::pmmr::PMMRBackend; /// Readonly view of the UTXO set (based on output MMR). @@ -47,45 +48,76 @@ impl<'a> UTXOView<'a> { /// Validate a block against the current UTXO set. /// Every input must spend an output that currently exists in the UTXO set. /// No duplicate outputs. - pub fn validate_block(&self, block: &Block, batch: &Batch<'_>) -> Result<(), Error> { + pub fn validate_block( + &self, + block: &Block, + batch: &Batch<'_>, + ) -> Result, Error> { for output in block.outputs() { self.validate_output(output, batch)?; } - - let inputs: Vec<_> = block.inputs().into(); - for input in &inputs { - self.validate_input(input, batch)?; - } - Ok(()) + self.validate_inputs(block.inputs(), batch) } /// Validate a transaction against the current UTXO set. /// Every input must spend an output that currently exists in the UTXO set. /// No duplicate outputs. - pub fn validate_tx(&self, tx: &Transaction, batch: &Batch<'_>) -> Result<(), Error> { + pub fn validate_tx( + &self, + tx: &Transaction, + batch: &Batch<'_>, + ) -> Result, Error> { for output in tx.outputs() { self.validate_output(output, batch)?; } + self.validate_inputs(tx.inputs(), batch) + } - let inputs: Vec<_> = tx.inputs().into(); - for input in &inputs { - self.validate_input(input, batch)?; + /// Validate the provided inputs. + /// Returns a vec of output identifiers corresponding to outputs + /// that would be spent by the provided inputs. + pub fn validate_inputs( + &self, + inputs: Inputs, + batch: &Batch<'_>, + ) -> Result, Error> { + match inputs { + Inputs::FeaturesAndCommit(inputs) => { + let outputs_spent: Result, Error> = inputs + .iter() + .map(|input| { + self.validate_input(input.commitment(), batch) + .and_then(|(out, pos)| { + // Unspent output found. + // Check input matches full output identifier. + if out == input.into() { + Ok((out, pos)) + } else { + Err(ErrorKind::Other("input mismatch".into()).into()) + } + }) + }) + .collect(); + outputs_spent + } } - Ok(()) } // Input is valid if it is spending an (unspent) output // that currently exists in the output MMR. - // Compare against the entry in output MMR at the expected pos. - fn validate_input(&self, input: &Input, batch: &Batch<'_>) -> Result<(), Error> { - if let Ok(pos) = batch.get_output_pos(&input.commitment()) { - if let Some(out) = self.output_pmmr.get_data(pos) { - if OutputIdentifier::from(input) == out { - return Ok(()); - } + // Note: We lookup by commitment. Caller must compare the full input as necessary. + fn validate_input( + &self, + input: Commitment, + batch: &Batch<'_>, + ) -> Result<(OutputIdentifier, CommitPos), Error> { + let pos = batch.get_output_pos_height(&input)?; + if let Some(pos) = pos { + if let Some(out) = self.output_pmmr.get_data(pos.pos) { + return Ok((out, pos)); } } - Err(ErrorKind::AlreadySpent(input.commitment()).into()) + Err(ErrorKind::AlreadySpent(input).into()) } // Output is valid if it would not result in a duplicate commitment in the output MMR. diff --git a/chain/tests/test_block_known.rs b/chain/tests/test_block_known.rs index 2ee0a852d..937756a6f 100644 --- a/chain/tests/test_block_known.rs +++ b/chain/tests/test_block_known.rs @@ -43,7 +43,7 @@ fn check_known() { let res = chain.process_block(latest.clone(), chain::Options::NONE); assert_eq!( res.unwrap_err().kind(), - ErrorKind::Unfit("already known in head".to_string()).into() + ErrorKind::Unfit("duplicate block".to_string()).into() ); } @@ -53,7 +53,7 @@ fn check_known() { let res = chain.process_block(genesis.clone(), chain::Options::NONE); assert_eq!( res.unwrap_err().kind(), - ErrorKind::Unfit("already known in store".to_string()).into() + ErrorKind::Unfit("duplicate block".to_string()).into() ); } diff --git a/core/src/core/transaction.rs b/core/src/core/transaction.rs index a59539674..0e51da452 100644 --- a/core/src/core/transaction.rs +++ b/core/src/core/transaction.rs @@ -804,6 +804,12 @@ impl TransactionBody { self } + /// Fully replace inputs. + pub fn replace_inputs(mut self, inputs: Inputs) -> TransactionBody { + self.inputs = inputs; + self + } + /// Builds a new TransactionBody with the provided output added. Existing /// outputs, if any, are kept intact. /// Sort order is maintained. @@ -1600,6 +1606,19 @@ impl From> for Inputs { } } +impl From> for Inputs { + fn from(outputs: Vec) -> Self { + let inputs = outputs + .into_iter() + .map(|out| Input { + features: out.features, + commit: out.commit, + }) + .collect(); + Inputs::FeaturesAndCommit(inputs) + } +} + impl Default for Inputs { fn default() -> Self { Inputs::FeaturesAndCommit(vec![]) @@ -1623,6 +1642,11 @@ impl Inputs { } } + /// Empty inputs? + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + /// Verify inputs are sorted and unique. fn verify_sorted_and_unique(&self) -> Result<(), ser::Error> { match self {