convert tx for v2 compatibility on way into stempool/txpool (#3412)

cleanup passing extra_tx around

use output identifiers when converting tx to v2
This commit is contained in:
Antioch Peverell 2020-08-02 13:26:28 +01:00 committed by GitHub
parent 6a12155738
commit 4732a0b62b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 196 additions and 61 deletions

View file

@ -607,12 +607,29 @@ impl Chain {
})
}
fn validate_tx_against_utxo(&self, tx: &Transaction) -> Result<(), Error> {
fn validate_tx_against_utxo(
&self,
tx: &Transaction,
) -> Result<Vec<(OutputIdentifier, CommitPos)>, Error> {
let header_pmmr = self.header_pmmr.read();
let txhashset = self.txhashset.read();
txhashset::utxo_view(&header_pmmr, &txhashset, |utxo, batch| {
utxo.validate_tx(tx, batch)?;
Ok(())
utxo.validate_tx(tx, batch)
})
}
/// Validates inputs against the current utxo.
/// Each input must spend an unspent output.
/// Returns the vec of output identifiers and their pos of the outputs
/// that would be spent by the inputs.
pub fn validate_inputs(
&self,
inputs: Inputs,
) -> Result<Vec<(OutputIdentifier, CommitPos)>, Error> {
let header_pmmr = self.header_pmmr.read();
let txhashset = self.txhashset.read();
txhashset::utxo_view(&header_pmmr, &txhashset, |utxo, batch| {
utxo.validate_inputs(inputs, batch)
})
}

View file

@ -1023,7 +1023,6 @@ impl TransactionBody {
self.verify_weight(weighting)?;
self.verify_no_nrd_duplicates()?;
self.verify_sorted()?;
self.verify_cut_through()?;
Ok(())
}
@ -1036,6 +1035,7 @@ impl TransactionBody {
verifier: Arc<RwLock<dyn VerifierCache>>,
) -> Result<(), Error> {
self.validate_read(weighting)?;
self.verify_cut_through()?;
// Find all the outputs that have not had their rangeproofs verified.
let outputs = {
@ -1483,8 +1483,7 @@ pub fn deaggregate(mk_tx: Transaction, txs: &[Transaction]) -> Result<Transactio
kernels.sort_unstable();
// Build a new tx from the above data.
let tx = Transaction::new(&inputs, &outputs, &kernels).with_offset(total_kernel_offset);
Ok(tx)
Ok(Transaction::new(&inputs, &outputs, &kernels).with_offset(total_kernel_offset))
}
/// A transaction input.
@ -1512,6 +1511,15 @@ impl AsRef<Commitment> for Input {
}
}
impl From<&OutputIdentifier> for Input {
fn from(out: &OutputIdentifier) -> Self {
Input {
features: out.features,
commit: out.commit,
}
}
}
impl ::std::hash::Hash for Input {
fn hash<H: ::std::hash::Hasher>(&self, state: &mut H) {
let mut vec = Vec::new();
@ -1830,7 +1838,7 @@ impl Output {
/// An output_identifier can be build from either an input _or_ an output and
/// contains everything we need to uniquely identify an output being spent.
/// Needed because it is not sufficient to pass a commitment around.
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
pub struct OutputIdentifier {
/// Output features (coinbase vs. regular transaction output)
/// We need to include this when hashing to ensure coinbase maturity can be
@ -1841,6 +1849,13 @@ pub struct OutputIdentifier {
}
impl DefaultHashable for OutputIdentifier {}
hashable_ord!(OutputIdentifier);
impl AsRef<Commitment> for OutputIdentifier {
fn as_ref(&self) -> &Commitment {
&self.commit
}
}
impl OutputIdentifier {
/// Build a new output_identifier.

View file

@ -20,7 +20,7 @@ use self::core::core::id::{ShortId, ShortIdentifiable};
use self::core::core::transaction;
use self::core::core::verifier_cache::VerifierCache;
use self::core::core::{
Block, BlockHeader, BlockSums, Committed, Transaction, TxKernel, Weighting,
Block, BlockHeader, BlockSums, Committed, OutputIdentifier, Transaction, TxKernel, Weighting,
};
use self::util::RwLock;
use crate::types::{BlockChain, PoolEntry, PoolError};
@ -163,14 +163,21 @@ where
self.entries.iter().map(|x| x.tx.clone()).collect()
}
/// Return a single aggregate tx representing all txs in the txpool.
/// Returns None if the txpool is empty.
pub fn all_transactions_aggregate(&self) -> Result<Option<Transaction>, PoolError> {
let txs = self.all_transactions();
/// Return a single aggregate tx representing all txs in the pool.
/// Takes an optional "extra tx" to include in the aggregation.
/// Returns None if there is nothing to aggregate.
/// Returns the extra tx if provided and pool is empty.
pub fn all_transactions_aggregate(
&self,
extra_tx: Option<Transaction>,
) -> Result<Option<Transaction>, PoolError> {
let mut txs = self.all_transactions();
if txs.is_empty() {
return Ok(None);
return Ok(extra_tx);
}
txs.extend(extra_tx);
let tx = transaction::aggregate(&txs)?;
// Validate the single aggregate transaction "as pool", not subject to tx weight limits.
@ -185,7 +192,7 @@ where
pub fn add_to_pool(
&mut self,
entry: PoolEntry,
extra_txs: Vec<Transaction>,
extra_tx: Option<Transaction>,
header: &BlockHeader,
) -> Result<(), PoolError> {
// Combine all the txs from the pool with any extra txs provided.
@ -196,7 +203,9 @@ where
return Err(PoolError::DuplicateTx);
}
txs.extend(extra_txs);
// Make sure we take extra_tx into consideration here.
// When adding to stempool we need to account for current txpool.
txs.extend(extra_tx);
let agg_tx = if txs.is_empty() {
// If we have nothing to aggregate then simply return the tx itself.
@ -280,6 +289,53 @@ where
Ok(valid_txs)
}
/// Convert a transaction for v2 compatibility.
/// We may receive a transaction with "commit only" inputs.
/// We convert it to "features and commit" so we can safely relay it to v2 peers.
/// Converson is done by looking up outputs to be spent in both the pool and the current utxo.
pub fn convert_tx_v2(
&self,
tx: Transaction,
extra_tx: Option<Transaction>,
) -> Result<Transaction, PoolError> {
let mut inputs: Vec<_> = tx.inputs().into();
let agg_tx = self
.all_transactions_aggregate(extra_tx)?
.unwrap_or(Transaction::empty());
let mut outputs: Vec<OutputIdentifier> =
agg_tx.outputs().iter().map(|out| out.into()).collect();
// By applying cut_through to tx inputs and agg_tx outputs we can
// determine the outputs being spent from the pool and those still unspent
// that need to be looked up via the current utxo.
let (inputs, _, _, spent_pool) =
transaction::cut_through(&mut inputs[..], &mut outputs[..])?;
// Lookup remaining outputs to be spent from the current utxo.
let spent_utxo = self.blockchain.validate_inputs(inputs.into())?;
// Combine outputs spent in utxo with outputs spent in pool to give us the
// full set of outputs being spent by this transaction.
// This is our source of truth for input features.
let mut spent = spent_pool.to_vec();
spent.extend(spent_utxo);
spent.sort();
// Now build the resulting transaction based on our inputs and outputs from the original transaction.
// Remember to use the original kernels and kernel offset.
let mut outputs = tx.outputs().to_vec();
let (inputs, outputs, _, _) = transaction::cut_through(&mut spent[..], &mut outputs[..])?;
let inputs: Vec<_> = inputs.iter().map(|out| out.into()).collect();
let tx = Transaction::new(inputs.as_slice(), outputs, tx.kernels()).with_offset(tx.offset);
// Validate the tx to ensure our converted inputs are correct.
tx.validate(Weighting::AsTransaction, self.verifier_cache.clone())
.map_err(PoolError::InvalidTx)?;
Ok(tx)
}
fn apply_tx_to_block_sums(
&self,
tx: &Transaction,
@ -313,16 +369,9 @@ where
) -> Result<(), PoolError> {
let existing_entries = self.entries.clone();
self.entries.clear();
let mut extra_txs = vec![];
if let Some(extra_tx) = extra_tx {
extra_txs.push(extra_tx);
}
for x in existing_entries {
let _ = self.add_to_pool(x, extra_txs.clone(), header);
let _ = self.add_to_pool(x, extra_tx.clone(), header);
}
Ok(())
}

View file

@ -86,15 +86,29 @@ where
}
// Add tx to stempool (passing in all txs from txpool to validate against).
fn add_to_stempool(&mut self, entry: PoolEntry, header: &BlockHeader) -> Result<(), PoolError> {
fn add_to_stempool(
&mut self,
entry: PoolEntry,
header: &BlockHeader,
) -> Result<PoolEntry, PoolError> {
let txpool_agg = self.txpool.all_transactions_aggregate(None)?;
// Convert the tx to v2 looking for unspent outputs in both stempool and txpool, and utxo.
let src = entry.src;
let tx = entry.tx;
let tx_v2 = self.stempool.convert_tx_v2(tx, txpool_agg.clone())?;
let entry = PoolEntry::new(tx_v2, src);
self.stempool
.add_to_pool(entry, self.txpool.all_transactions(), header)?;
Ok(())
.add_to_pool(entry.clone(), txpool_agg, header)?;
// If all is good return our pool entry with the converted tx.
Ok(entry)
}
fn add_to_reorg_cache(&mut self, entry: PoolEntry) {
fn add_to_reorg_cache(&mut self, entry: &PoolEntry) {
let mut cache = self.reorg_cache.write();
cache.push_back(entry);
cache.push_back(entry.clone());
// We cache 30 mins of txs but we have a hard limit to avoid catastrophic failure.
// For simplicity use the same value as the actual tx pool limit.
@ -106,31 +120,41 @@ where
fn add_to_txpool(
&mut self,
mut entry: PoolEntry,
entry: PoolEntry,
header: &BlockHeader,
) -> Result<(), PoolError> {
) -> Result<PoolEntry, PoolError> {
// First deaggregate the tx based on current txpool txs.
if entry.tx.kernels().len() > 1 {
let txs = self.txpool.find_matching_transactions(entry.tx.kernels());
let entry = if entry.tx.kernels().len() == 1 {
entry
} else {
let tx = entry.tx.clone();
let txs = self.txpool.find_matching_transactions(tx.kernels());
if !txs.is_empty() {
let tx = transaction::deaggregate(entry.tx, &txs)?;
let tx = transaction::deaggregate(tx, &txs)?;
// Validate this deaggregated tx "as tx", subject to regular tx weight limits.
tx.validate(Weighting::AsTransaction, self.verifier_cache.clone())?;
entry.tx = tx;
entry.src = TxSource::Deaggregate;
PoolEntry::new(tx, TxSource::Deaggregate)
} else {
entry
}
}
self.txpool.add_to_pool(entry.clone(), vec![], header)?;
};
// Convert the deaggregated tx to v2 looking for unspent outputs in the txpool, and utxo.
let src = entry.src;
let tx_v2 = self.txpool.convert_tx_v2(entry.tx, None)?;
let entry = PoolEntry::new(tx_v2, src);
self.txpool.add_to_pool(entry.clone(), None, header)?;
// We now need to reconcile the stempool based on the new state of the txpool.
// Some stempool txs may no longer be valid and we need to evict them.
{
let txpool_tx = self.txpool.all_transactions_aggregate()?;
self.stempool.reconcile(txpool_tx, header)?;
}
Ok(())
let txpool_agg = self.txpool.all_transactions_aggregate(None)?;
self.stempool.reconcile(txpool_agg, header)?;
// If all is good return our pool entry with the deaggregated and converted tx.
Ok(entry)
}
/// Verify the tx kernel variants and ensure they can all be accepted to the txpool/stempool
@ -194,23 +218,17 @@ where
// Check coinbase maturity before we go any further.
self.blockchain.verify_coinbase_maturity(&tx)?;
let entry = PoolEntry {
src,
tx_at: Utc::now(),
tx,
};
// If this is a stem tx then attempt to add it to stempool.
// If the adapter fails to accept the new stem tx then fallback to fluff via txpool.
if stem {
self.add_to_stempool(entry.clone(), header)?;
let entry = self.add_to_stempool(PoolEntry::new(tx.clone(), src), header)?;
if self.adapter.stem_tx_accepted(&entry).is_ok() {
return Ok(());
}
}
self.add_to_txpool(entry.clone(), header)?;
self.add_to_reorg_cache(entry.clone());
let entry = self.add_to_txpool(PoolEntry::new(tx, src), header)?;
self.add_to_reorg_cache(&entry);
self.adapter.tx_accepted(&entry);
// Transaction passed all the checks but we have to make space for it
@ -247,7 +265,7 @@ where
header.hash(),
);
for entry in entries {
let _ = &self.add_to_txpool(entry.clone(), header);
let _ = self.add_to_txpool(entry, header);
}
debug!(
"reconcile_reorg_cache: block: {:?} ... done.",
@ -266,7 +284,7 @@ where
// Now reconcile our stempool, accounting for the updated txpool txs.
self.stempool.reconcile_block(block);
{
let txpool_tx = self.txpool.all_transactions_aggregate()?;
let txpool_tx = self.txpool.all_transactions_aggregate(None)?;
self.stempool.reconcile(txpool_tx, &block.header)?;
}

View file

@ -15,14 +15,13 @@
//! The primary module containing the implementations of the transaction pool
//! and its top-level members.
use chrono::prelude::{DateTime, Utc};
use self::core::consensus;
use self::core::core::block;
use self::core::core::committed;
use self::core::core::hash::Hash;
use self::core::core::transaction::{self, Transaction};
use self::core::core::{BlockHeader, BlockSums};
use self::core::core::{BlockHeader, BlockSums, Inputs, OutputIdentifier};
use chrono::prelude::*;
use failure::Fail;
use grin_core as core;
use grin_keychain as keychain;
@ -159,13 +158,23 @@ pub struct PoolEntry {
pub tx: Transaction,
}
impl PoolEntry {
pub fn new(tx: Transaction, src: TxSource) -> PoolEntry {
PoolEntry {
src,
tx_at: Utc::now(),
tx,
}
}
}
/// Used to make decisions based on transaction acceptance priority from
/// various sources. For example, a node may want to bypass pool size
/// restrictions when accepting a transaction from a local wallet.
///
/// Most likely this will evolve to contain some sort of network identifier,
/// once we get a better sense of what transaction building might look like.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
pub enum TxSource {
PushApi,
Broadcast,
@ -272,8 +281,14 @@ pub trait BlockChain: Sync + Send {
/// have matured sufficiently.
fn verify_tx_lock_height(&self, tx: &transaction::Transaction) -> Result<(), PoolError>;
/// Validate a transaction against the current utxo.
fn validate_tx(&self, tx: &Transaction) -> Result<(), PoolError>;
/// Validate inputs against the current utxo.
/// Returns the vec of output identifiers that would be spent
/// by these inputs if they can all be successfully spent.
fn validate_inputs(&self, inputs: Inputs) -> Result<Vec<OutputIdentifier>, PoolError>;
fn chain_head(&self) -> Result<BlockHeader, PoolError>;
fn get_block_header(&self, hash: &Hash) -> Result<BlockHeader, PoolError>;

View file

@ -19,7 +19,9 @@ use self::chain::Chain;
use self::core::consensus;
use self::core::core::hash::Hash;
use self::core::core::verifier_cache::{LruVerifierCache, VerifierCache};
use self::core::core::{Block, BlockHeader, BlockSums, KernelFeatures, Transaction, TxKernel};
use self::core::core::{
Block, BlockHeader, BlockSums, Inputs, KernelFeatures, OutputIdentifier, Transaction, TxKernel,
};
use self::core::genesis;
use self::core::global;
use self::core::libtx::{build, reward, ProofBuilder};
@ -134,6 +136,13 @@ impl BlockChain for ChainAdapter {
})
}
fn validate_inputs(&self, inputs: Inputs) -> Result<Vec<OutputIdentifier>, PoolError> {
self.chain
.validate_inputs(inputs)
.map(|outputs| outputs.into_iter().map(|(out, _)| out).collect::<Vec<_>>())
.map_err(|_| PoolError::Other("failed to validate inputs".into()))
}
fn verify_coinbase_maturity(&self, tx: &Transaction) -> Result<(), PoolError> {
self.chain
.verify_coinbase_maturity(tx)

View file

@ -150,7 +150,11 @@ fn test_the_transaction_pool() {
// Check we can take some entries from the stempool and "fluff" them into the
// txpool. This also exercises multi-kernel txs.
{
let agg_tx = pool.stempool.all_transactions_aggregate().unwrap().unwrap();
let agg_tx = pool
.stempool
.all_transactions_aggregate(None)
.unwrap()
.unwrap();
assert_eq!(agg_tx.kernels().len(), 2);
pool.add_to_pool(test_source(), agg_tx, false, &header)
.unwrap();
@ -182,6 +186,7 @@ fn test_the_transaction_pool() {
// that is a superset of a tx already in the pool.
{
let tx4 = test_transaction(&keychain, vec![800], vec![799]);
// tx1 and tx2 are already in the txpool (in aggregated form)
// tx4 is the "new" part of this aggregated tx that we care about
let agg_tx = transaction::aggregate(&[tx1.clone(), tx2.clone(), tx4]).unwrap();

View file

@ -30,7 +30,7 @@ use crate::common::types::{ChainValidationMode, DandelionEpoch, ServerConfig};
use crate::core::core::hash::{Hash, Hashed};
use crate::core::core::transaction::Transaction;
use crate::core::core::verifier_cache::VerifierCache;
use crate::core::core::{BlockHeader, BlockSums, CompactBlock};
use crate::core::core::{BlockHeader, BlockSums, CompactBlock, Inputs, OutputIdentifier};
use crate::core::pow::Difficulty;
use crate::core::{core, global};
use crate::p2p;
@ -938,6 +938,13 @@ impl pool::BlockChain for PoolToChainAdapter {
.map_err(|_| pool::PoolError::Other("failed to validate tx".to_string()))
}
fn validate_inputs(&self, inputs: Inputs) -> Result<Vec<OutputIdentifier>, pool::PoolError> {
self.chain()
.validate_inputs(inputs)
.map(|outputs| outputs.into_iter().map(|(out, _)| out).collect::<Vec<_>>())
.map_err(|_| pool::PoolError::Other("failed to validate tx".to_string()))
}
fn verify_coinbase_maturity(&self, tx: &Transaction) -> Result<(), pool::PoolError> {
self.chain()
.verify_coinbase_maturity(tx)

View file

@ -131,7 +131,7 @@ fn process_fluff_phase(
let header = tx_pool.chain_head()?;
let fluffable_txs = {
let txpool_tx = tx_pool.txpool.all_transactions_aggregate()?;
let txpool_tx = tx_pool.txpool.all_transactions_aggregate(None)?;
let txs: Vec<_> = all_entries.into_iter().map(|x| x.tx).collect();
tx_pool.stempool.validate_raw_txs(
&txs,