refactor apply_inputs and support converting block for v2 compatibility (#3409)

* wip

* convert incoming block to v2, rework orphan and duplicate check earlier than conversion

* cleanup process_block_single

* cleanup block conversion

* cleanup

* leverage utxo_view when applying block inputs
This commit is contained in:
Antioch Peverell 2020-07-31 14:36:20 +01:00 committed by GitHub
parent 70c637fe4f
commit 04c8713d83
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 239 additions and 123 deletions

View file

@ -19,7 +19,7 @@ use crate::core::core::hash::{Hash, Hashed, ZERO_HASH};
use crate::core::core::merkle_proof::MerkleProof;
use crate::core::core::verifier_cache::VerifierCache;
use crate::core::core::{
Block, BlockHeader, BlockSums, Committed, KernelFeatures, Output, OutputIdentifier,
Block, BlockHeader, BlockSums, Committed, Inputs, KernelFeatures, Output, OutputIdentifier,
Transaction, TxKernel,
};
use crate::core::global;
@ -271,6 +271,41 @@ impl Chain {
res
}
/// We plan to support receiving blocks with CommitOnly inputs.
/// We also need to support relaying blocks with FeaturesAndCommit inputs to peers.
/// So we need a way to convert blocks from CommitOnly to FeaturesAndCommit.
/// Validating the inputs against the utxo_view allows us to look the outputs up.
fn convert_block_v2(&self, block: Block) -> Result<Block, Error> {
debug!(
"convert_block_v2: {} at {}",
block.header.hash(),
block.header.height
);
if block.inputs().is_empty() {
return Ok(Block {
header: block.header,
body: block.body.replace_inputs(Inputs::FeaturesAndCommit(vec![])),
});
}
let mut header_pmmr = self.header_pmmr.write();
let mut txhashset = self.txhashset.write();
let outputs =
txhashset::extending_readonly(&mut header_pmmr, &mut txhashset, |ext, batch| {
let previous_header = batch.get_previous_header(&block.header)?;
pipe::rewind_and_apply_fork(&previous_header, ext, batch)?;
ext.extension
.utxo_view(ext.header_extension)
.validate_inputs(block.inputs(), batch)
})?;
let outputs: Vec<_> = outputs.into_iter().map(|(out, _)| out).collect();
Ok(Block {
header: block.header,
body: block.body.replace_inputs(outputs.into()),
})
}
fn determine_status(&self, head: Option<Tip>, prev_head: Tip, fork_point: Tip) -> BlockStatus {
// If head is updated then we are either "next" block or we just experienced a "reorg" to new head.
// Otherwise this is a "fork" off the main chain.
@ -291,15 +326,69 @@ impl Chain {
}
}
/// Quick check for "known" duplicate block up to and including current chain head.
fn is_known(&self, header: &BlockHeader) -> Result<(), Error> {
let head = self.head()?;
if head.hash() == header.hash() {
return Err(ErrorKind::Unfit("duplicate block".into()).into());
}
if header.total_difficulty() <= head.total_difficulty {
if self.block_exists(header.hash())? {
return Err(ErrorKind::Unfit("duplicate block".into()).into());
}
}
Ok(())
}
// Check if the provided block is an orphan.
// If block is an orphan add it to our orphan block pool for deferred processing.
fn check_orphan(&self, block: &Block, opts: Options) -> Result<(), Error> {
if self.block_exists(block.header.prev_hash)? {
return Ok(());
}
let block_hash = block.hash();
let orphan = Orphan {
block: block.clone(),
opts,
added: Instant::now(),
};
self.orphans.add(orphan);
debug!(
"is_orphan: {:?}, # orphans {}{}",
block_hash,
self.orphans.len(),
if self.orphans.len_evicted() > 0 {
format!(", # evicted {}", self.orphans.len_evicted())
} else {
String::new()
},
);
Err(ErrorKind::Orphan.into())
}
/// Attempt to add a new block to the chain.
/// Returns true if it has been added to the longest chain
/// or false if it has added to a fork (or orphan?).
fn process_block_single(&self, b: Block, opts: Options) -> Result<Option<Tip>, Error> {
// Check if we already know about this block.
self.is_known(&b.header)?;
// Process the header first.
// If invalid then fail early.
// If valid then continue with block processing with header_head committed to db etc.
self.process_block_header(&b.header, opts)?;
// Check if this block is an orphan.
// Only do this once we know the header PoW is valid.
self.check_orphan(&b, opts)?;
// Convert block to FeaturesAndCommit inputs.
// We know this block is not an orphan and header is valid at this point.
let b = self.convert_block_v2(b)?;
let (maybe_new_head, prev_head) = {
let mut header_pmmr = self.header_pmmr.write();
let mut txhashset = self.txhashset.write();
@ -331,28 +420,6 @@ impl Chain {
Ok(head)
}
Err(e) => match e.kind() {
ErrorKind::Orphan => {
let block_hash = b.hash();
let orphan = Orphan {
block: b,
opts: opts,
added: Instant::now(),
};
self.orphans.add(orphan);
debug!(
"process_block: orphan: {:?}, # orphans {}{}",
block_hash,
self.orphans.len(),
if self.orphans.len_evicted() > 0 {
format!(", # evicted {}", self.orphans.len_evicted())
} else {
String::new()
},
);
Err(ErrorKind::Orphan.into())
}
ErrorKind::Unfit(ref msg) => {
debug!(
"Block {} at {} is unfit at this time: {}",
@ -544,7 +611,8 @@ impl Chain {
let header_pmmr = self.header_pmmr.read();
let txhashset = self.txhashset.read();
txhashset::utxo_view(&header_pmmr, &txhashset, |utxo, batch| {
utxo.validate_tx(tx, batch)
utxo.validate_tx(tx, batch)?;
Ok(())
})
}
@ -617,7 +685,7 @@ impl Chain {
let prev_root = header_extension.root()?;
// Apply the latest block to the chain state via the extension.
extension.apply_block(b, batch)?;
extension.apply_block(b, header_extension, batch)?;
Ok((prev_root, extension.roots()?, extension.sizes()))
})?;
@ -1576,7 +1644,8 @@ fn setup_head(
};
}
txhashset::extending(header_pmmr, txhashset, &mut batch, |ext, batch| {
ext.extension.apply_block(&genesis, batch)
ext.extension
.apply_block(&genesis, ext.header_extension, batch)
})?;
// Save the block_sums to the db for use later.

View file

@ -18,14 +18,13 @@ use crate::core::consensus;
use crate::core::core::hash::Hashed;
use crate::core::core::verifier_cache::VerifierCache;
use crate::core::core::Committed;
use crate::core::core::{Block, BlockHeader, BlockSums};
use crate::core::core::{Block, BlockHeader, BlockSums, OutputIdentifier};
use crate::core::pow;
use crate::error::{Error, ErrorKind};
use crate::store;
use crate::txhashset;
use crate::types::{Options, Tip};
use crate::types::{CommitPos, Options, Tip};
use crate::util::RwLock;
use grin_store;
use std::sync::Arc;
/// Contextual information required to process a new block and either reject or
@ -104,17 +103,9 @@ pub fn process_block(
// want to do this now and not later during header validation.
validate_pow_only(&b.header, ctx)?;
// Get previous header from the db and check we have the corresponding full block.
let prev = prev_header_store(&b.header, &mut ctx.batch)?;
// Block is an orphan if we do not know about the previous full block.
// Skip this check if we have just processed the previous block
// or the full txhashset state (fast sync) at the previous block height.
{
let is_next = b.header.prev_hash == head.last_block_h;
if !is_next && !ctx.batch.block_exists(&prev.hash())? {
return Err(ErrorKind::Orphan.into());
}
}
ctx.batch.block_exists(&prev.hash())?;
// Process the header for the block.
// Note: We still want to process the full block if we have seen this header before
@ -319,10 +310,7 @@ fn prev_header_store(
header: &BlockHeader,
batch: &mut store::Batch<'_>,
) -> Result<BlockHeader, Error> {
let prev = batch.get_previous_header(&header).map_err(|e| match e {
grin_store::Error::NotFoundErr(_) => ErrorKind::Orphan,
_ => ErrorKind::StoreErr(e, "check prev header".into()),
})?;
let prev = batch.get_previous_header(&header)?;
Ok(prev)
}
@ -454,7 +442,8 @@ fn apply_block_to_txhashset(
ext: &mut txhashset::ExtensionPair<'_>,
batch: &store::Batch<'_>,
) -> Result<(), Error> {
ext.extension.apply_block(block, batch)?;
ext.extension
.apply_block(block, ext.header_extension, batch)?;
ext.extension.validate_roots(&block.header)?;
ext.extension.validate_sizes(&block.header)?;
Ok(())
@ -600,11 +589,12 @@ pub fn rewind_and_apply_fork(
Ok(fork_point)
}
/// Validate block inputs against utxo.
fn validate_utxo(
block: &Block,
ext: &mut txhashset::ExtensionPair<'_>,
batch: &store::Batch<'_>,
) -> Result<(), Error> {
) -> Result<Vec<(OutputIdentifier, CommitPos)>, Error> {
let extension = &ext.extension;
let header_extension = &ext.header_extension;
extension

View file

@ -125,7 +125,7 @@ impl ChainStore {
/// Get PMMR pos for the given output commitment.
pub fn get_output_pos(&self, commit: &Commitment) -> Result<u64, Error> {
match self.get_output_pos_height(commit)? {
Some((pos, _)) => Ok(pos),
Some(pos) => Ok(pos.pos),
None => Err(Error::NotFoundErr(format!(
"Output position for: {:?}",
commit
@ -134,7 +134,7 @@ impl ChainStore {
}
/// Get PMMR pos and block height for the given output commitment.
pub fn get_output_pos_height(&self, commit: &Commitment) -> Result<Option<(u64, u64)>, Error> {
pub fn get_output_pos_height(&self, commit: &Commitment) -> Result<Option<CommitPos>, Error> {
self.db.get_ser(&to_key(OUTPUT_POS_PREFIX, commit))
}
@ -258,14 +258,9 @@ impl<'a> Batch<'a> {
}
/// Save output_pos and block height to index.
pub fn save_output_pos_height(
&self,
commit: &Commitment,
pos: u64,
height: u64,
) -> Result<(), Error> {
pub fn save_output_pos_height(&self, commit: &Commitment, pos: CommitPos) -> Result<(), Error> {
self.db
.put_ser(&to_key(OUTPUT_POS_PREFIX, commit)[..], &(pos, height))
.put_ser(&to_key(OUTPUT_POS_PREFIX, commit)[..], &pos)
}
/// Delete the output_pos index entry for a spent output.
@ -290,7 +285,7 @@ impl<'a> Batch<'a> {
/// Get output_pos from index.
pub fn get_output_pos(&self, commit: &Commitment) -> Result<u64, Error> {
match self.get_output_pos_height(commit)? {
Some((pos, _)) => Ok(pos),
Some(pos) => Ok(pos.pos),
None => Err(Error::NotFoundErr(format!(
"Output position for: {:?}",
commit
@ -299,7 +294,7 @@ impl<'a> Batch<'a> {
}
/// Get output_pos and block height from index.
pub fn get_output_pos_height(&self, commit: &Commitment) -> Result<Option<(u64, u64)>, Error> {
pub fn get_output_pos_height(&self, commit: &Commitment) -> Result<Option<CommitPos>, Error> {
self.db.get_ser(&to_key(OUTPUT_POS_PREFIX, commit))
}

View file

@ -20,9 +20,7 @@ use crate::core::core::committed::Committed;
use crate::core::core::hash::{Hash, Hashed};
use crate::core::core::merkle_proof::MerkleProof;
use crate::core::core::pmmr::{self, Backend, ReadonlyPMMR, RewindablePMMR, PMMR};
use crate::core::core::{
Block, BlockHeader, Input, KernelFeatures, Output, OutputIdentifier, TxKernel,
};
use crate::core::core::{Block, BlockHeader, KernelFeatures, Output, OutputIdentifier, TxKernel};
use crate::core::global;
use crate::core::ser::{PMMRable, ProtocolVersion};
use crate::error::{Error, ErrorKind};
@ -260,12 +258,12 @@ impl TxHashSet {
pub fn get_unspent(&self, output_id: &OutputIdentifier) -> Result<Option<CommitPos>, Error> {
let commit = output_id.commit;
match self.commit_index.get_output_pos_height(&commit) {
Ok(Some((pos, height))) => {
Ok(Some(pos)) => {
let output_pmmr: ReadonlyPMMR<'_, Output, _> =
ReadonlyPMMR::at(&self.output_pmmr_h.backend, self.output_pmmr_h.last_pos);
if let Some(out) = output_pmmr.get_data(pos) {
if let Some(out) = output_pmmr.get_data(pos.pos) {
if out == *output_id {
Ok(Some(CommitPos { pos, height }))
Ok(Some(pos))
} else {
Ok(None)
}
@ -568,7 +566,13 @@ impl TxHashSet {
// Note: MMR position is 1-based and not 0-based, so here must be '>' instead of '>='
break;
}
batch.save_output_pos_height(&commit, pos, h.height)?;
batch.save_output_pos_height(
&commit,
CommitPos {
pos,
height: h.height,
},
)?;
i += 1;
}
}
@ -1036,7 +1040,12 @@ impl<'a> Extension<'a> {
/// Apply a new block to the current txhashet extension (output, rangeproof, kernel MMRs).
/// Returns a vec of commit_pos representing the pos and height of the outputs spent
/// by this block.
pub fn apply_block(&mut self, b: &Block, batch: &Batch<'_>) -> Result<(), Error> {
pub fn apply_block(
&mut self,
b: &Block,
header_ext: &HeaderExtension<'_>,
batch: &Batch<'_>,
) -> Result<(), Error> {
let mut affected_pos = vec![];
// Apply the output to the output and rangeproof MMRs.
@ -1045,20 +1054,30 @@ impl<'a> Extension<'a> {
for out in b.outputs() {
let pos = self.apply_output(out, batch)?;
affected_pos.push(pos);
batch.save_output_pos_height(&out.commitment(), pos, b.header.height)?;
batch.save_output_pos_height(
&out.commitment(),
CommitPos {
pos,
height: b.header.height,
},
)?;
}
// Remove the output from the output and rangeproof MMRs.
// Use our utxo_view to identify outputs being spent by block inputs.
// Apply inputs to remove spent outputs from the output and rangeproof MMRs.
// Add spent_pos to affected_pos to update the accumulator later on.
// Remove the spent output from the output_pos index.
let mut spent = vec![];
let inputs: Vec<_> = b.inputs().into();
for input in &inputs {
let pos = self.apply_input(input, batch)?;
// Remove the spent outputs from the output_pos index.
let spent = self
.utxo_view(header_ext)
.validate_inputs(b.inputs(), batch)?;
for (out, pos) in &spent {
self.apply_input(out.commitment(), *pos)?;
affected_pos.push(pos.pos);
batch.delete_output_pos_height(&input.commitment())?;
spent.push(pos);
batch.delete_output_pos_height(&out.commitment())?;
}
// Update the spent index with spent pos.
let spent: Vec<_> = spent.into_iter().map(|(_, pos)| pos).collect();
batch.save_spent_index(&b.hash(), &spent)?;
// Apply the kernels to the kernel MMR.
@ -1090,31 +1109,18 @@ impl<'a> Extension<'a> {
)
}
fn apply_input(&mut self, input: &Input, batch: &Batch<'_>) -> Result<CommitPos, Error> {
let commit = input.commitment();
if let Some((pos, height)) = batch.get_output_pos_height(&commit)? {
// First check this input corresponds to an existing entry in the output MMR.
if let Some(out) = self.output_pmmr.get_data(pos) {
if OutputIdentifier::from(input) != out {
return Err(ErrorKind::TxHashSetErr("output pmmr mismatch".to_string()).into());
}
// Prune output and rangeproof PMMRs based on provided pos.
// Input is not valid if we cannot prune successfully.
fn apply_input(&mut self, commit: Commitment, pos: CommitPos) -> Result<(), Error> {
match self.output_pmmr.prune(pos.pos) {
Ok(true) => {
self.rproof_pmmr
.prune(pos.pos)
.map_err(ErrorKind::TxHashSetErr)?;
Ok(())
}
// Now prune the output_pmmr, rproof_pmmr and their storage.
// Input is not valid if we cannot prune successfully (to spend an unspent
// output).
match self.output_pmmr.prune(pos) {
Ok(true) => {
self.rproof_pmmr
.prune(pos)
.map_err(ErrorKind::TxHashSetErr)?;
Ok(CommitPos { pos, height })
}
Ok(false) => Err(ErrorKind::AlreadySpent(commit).into()),
Err(e) => Err(ErrorKind::TxHashSetErr(e).into()),
}
} else {
Err(ErrorKind::AlreadySpent(commit).into())
Ok(false) => Err(ErrorKind::AlreadySpent(commit).into()),
Err(e) => Err(ErrorKind::TxHashSetErr(e).into()),
}
}
@ -1333,8 +1339,8 @@ impl<'a> Extension<'a> {
// The output_pos index should be updated to reflect the old pos 1 when unspent.
if let Ok(spent) = spent {
let inputs: Vec<_> = block.inputs().into();
for (x, y) in inputs.iter().zip(spent) {
batch.save_output_pos_height(&x.commitment(), y.pos, y.height)?;
for (input, pos) in inputs.iter().zip(spent) {
batch.save_output_pos_height(&input.commitment(), pos)?;
}
}

View file

@ -16,11 +16,12 @@
use crate::core::core::hash::{Hash, Hashed};
use crate::core::core::pmmr::{self, ReadonlyPMMR};
use crate::core::core::{Block, BlockHeader, Input, Inputs, Output, OutputIdentifier, Transaction};
use crate::core::core::{Block, BlockHeader, Inputs, Output, OutputIdentifier, Transaction};
use crate::core::global;
use crate::error::{Error, ErrorKind};
use crate::store::Batch;
use crate::util::secp::pedersen::RangeProof;
use crate::types::CommitPos;
use crate::util::secp::pedersen::{Commitment, RangeProof};
use grin_store::pmmr::PMMRBackend;
/// Readonly view of the UTXO set (based on output MMR).
@ -47,45 +48,76 @@ impl<'a> UTXOView<'a> {
/// Validate a block against the current UTXO set.
/// Every input must spend an output that currently exists in the UTXO set.
/// No duplicate outputs.
pub fn validate_block(&self, block: &Block, batch: &Batch<'_>) -> Result<(), Error> {
pub fn validate_block(
&self,
block: &Block,
batch: &Batch<'_>,
) -> Result<Vec<(OutputIdentifier, CommitPos)>, Error> {
for output in block.outputs() {
self.validate_output(output, batch)?;
}
let inputs: Vec<_> = block.inputs().into();
for input in &inputs {
self.validate_input(input, batch)?;
}
Ok(())
self.validate_inputs(block.inputs(), batch)
}
/// Validate a transaction against the current UTXO set.
/// Every input must spend an output that currently exists in the UTXO set.
/// No duplicate outputs.
pub fn validate_tx(&self, tx: &Transaction, batch: &Batch<'_>) -> Result<(), Error> {
pub fn validate_tx(
&self,
tx: &Transaction,
batch: &Batch<'_>,
) -> Result<Vec<(OutputIdentifier, CommitPos)>, Error> {
for output in tx.outputs() {
self.validate_output(output, batch)?;
}
self.validate_inputs(tx.inputs(), batch)
}
let inputs: Vec<_> = tx.inputs().into();
for input in &inputs {
self.validate_input(input, batch)?;
/// Validate the provided inputs.
/// Returns a vec of output identifiers corresponding to outputs
/// that would be spent by the provided inputs.
pub fn validate_inputs(
&self,
inputs: Inputs,
batch: &Batch<'_>,
) -> Result<Vec<(OutputIdentifier, CommitPos)>, Error> {
match inputs {
Inputs::FeaturesAndCommit(inputs) => {
let outputs_spent: Result<Vec<_>, Error> = inputs
.iter()
.map(|input| {
self.validate_input(input.commitment(), batch)
.and_then(|(out, pos)| {
// Unspent output found.
// Check input matches full output identifier.
if out == input.into() {
Ok((out, pos))
} else {
Err(ErrorKind::Other("input mismatch".into()).into())
}
})
})
.collect();
outputs_spent
}
}
Ok(())
}
// Input is valid if it is spending an (unspent) output
// that currently exists in the output MMR.
// Compare against the entry in output MMR at the expected pos.
fn validate_input(&self, input: &Input, batch: &Batch<'_>) -> Result<(), Error> {
if let Ok(pos) = batch.get_output_pos(&input.commitment()) {
if let Some(out) = self.output_pmmr.get_data(pos) {
if OutputIdentifier::from(input) == out {
return Ok(());
}
// Note: We lookup by commitment. Caller must compare the full input as necessary.
fn validate_input(
&self,
input: Commitment,
batch: &Batch<'_>,
) -> Result<(OutputIdentifier, CommitPos), Error> {
let pos = batch.get_output_pos_height(&input)?;
if let Some(pos) = pos {
if let Some(out) = self.output_pmmr.get_data(pos.pos) {
return Ok((out, pos));
}
}
Err(ErrorKind::AlreadySpent(input.commitment()).into())
Err(ErrorKind::AlreadySpent(input).into())
}
// Output is valid if it would not result in a duplicate commitment in the output MMR.

View file

@ -43,7 +43,7 @@ fn check_known() {
let res = chain.process_block(latest.clone(), chain::Options::NONE);
assert_eq!(
res.unwrap_err().kind(),
ErrorKind::Unfit("already known in head".to_string()).into()
ErrorKind::Unfit("duplicate block".to_string()).into()
);
}
@ -53,7 +53,7 @@ fn check_known() {
let res = chain.process_block(genesis.clone(), chain::Options::NONE);
assert_eq!(
res.unwrap_err().kind(),
ErrorKind::Unfit("already known in store".to_string()).into()
ErrorKind::Unfit("duplicate block".to_string()).into()
);
}

View file

@ -804,6 +804,12 @@ impl TransactionBody {
self
}
/// Fully replace inputs.
pub fn replace_inputs(mut self, inputs: Inputs) -> TransactionBody {
self.inputs = inputs;
self
}
/// Builds a new TransactionBody with the provided output added. Existing
/// outputs, if any, are kept intact.
/// Sort order is maintained.
@ -1600,6 +1606,19 @@ impl From<Vec<Input>> for Inputs {
}
}
impl From<Vec<OutputIdentifier>> for Inputs {
fn from(outputs: Vec<OutputIdentifier>) -> Self {
let inputs = outputs
.into_iter()
.map(|out| Input {
features: out.features,
commit: out.commit,
})
.collect();
Inputs::FeaturesAndCommit(inputs)
}
}
impl Default for Inputs {
fn default() -> Self {
Inputs::FeaturesAndCommit(vec![])
@ -1623,6 +1642,11 @@ impl Inputs {
}
}
/// Empty inputs?
pub fn is_empty(&self) -> bool {
self.len() == 0
}
/// Verify inputs are sorted and unique.
fn verify_sorted_and_unique(&self) -> Result<(), ser::Error> {
match self {