diff --git a/chain/src/chain.rs b/chain/src/chain.rs index e715ce6b5..79afebf0b 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -607,12 +607,29 @@ impl Chain { }) } - fn validate_tx_against_utxo(&self, tx: &Transaction) -> Result<(), Error> { + fn validate_tx_against_utxo( + &self, + tx: &Transaction, + ) -> Result, Error> { let header_pmmr = self.header_pmmr.read(); let txhashset = self.txhashset.read(); txhashset::utxo_view(&header_pmmr, &txhashset, |utxo, batch| { - utxo.validate_tx(tx, batch)?; - Ok(()) + utxo.validate_tx(tx, batch) + }) + } + + /// Validates inputs against the current utxo. + /// Each input must spend an unspent output. + /// Returns the vec of output identifiers and their pos of the outputs + /// that would be spent by the inputs. + pub fn validate_inputs( + &self, + inputs: Inputs, + ) -> Result, Error> { + let header_pmmr = self.header_pmmr.read(); + let txhashset = self.txhashset.read(); + txhashset::utxo_view(&header_pmmr, &txhashset, |utxo, batch| { + utxo.validate_inputs(inputs, batch) }) } diff --git a/core/src/core/transaction.rs b/core/src/core/transaction.rs index 38f198f54..b15d6af0a 100644 --- a/core/src/core/transaction.rs +++ b/core/src/core/transaction.rs @@ -1023,7 +1023,6 @@ impl TransactionBody { self.verify_weight(weighting)?; self.verify_no_nrd_duplicates()?; self.verify_sorted()?; - self.verify_cut_through()?; Ok(()) } @@ -1036,6 +1035,7 @@ impl TransactionBody { verifier: Arc>, ) -> Result<(), Error> { self.validate_read(weighting)?; + self.verify_cut_through()?; // Find all the outputs that have not had their rangeproofs verified. let outputs = { @@ -1483,8 +1483,7 @@ pub fn deaggregate(mk_tx: Transaction, txs: &[Transaction]) -> Result for Input { } } +impl From<&OutputIdentifier> for Input { + fn from(out: &OutputIdentifier) -> Self { + Input { + features: out.features, + commit: out.commit, + } + } +} + impl ::std::hash::Hash for Input { fn hash(&self, state: &mut H) { let mut vec = Vec::new(); @@ -1830,7 +1838,7 @@ impl Output { /// An output_identifier can be build from either an input _or_ an output and /// contains everything we need to uniquely identify an output being spent. /// Needed because it is not sufficient to pass a commitment around. -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +#[derive(Serialize, Deserialize, Debug, Clone, Copy)] pub struct OutputIdentifier { /// Output features (coinbase vs. regular transaction output) /// We need to include this when hashing to ensure coinbase maturity can be @@ -1841,6 +1849,13 @@ pub struct OutputIdentifier { } impl DefaultHashable for OutputIdentifier {} +hashable_ord!(OutputIdentifier); + +impl AsRef for OutputIdentifier { + fn as_ref(&self) -> &Commitment { + &self.commit + } +} impl OutputIdentifier { /// Build a new output_identifier. diff --git a/pool/src/pool.rs b/pool/src/pool.rs index 8d72436b7..56198cc3f 100644 --- a/pool/src/pool.rs +++ b/pool/src/pool.rs @@ -20,7 +20,7 @@ use self::core::core::id::{ShortId, ShortIdentifiable}; use self::core::core::transaction; use self::core::core::verifier_cache::VerifierCache; use self::core::core::{ - Block, BlockHeader, BlockSums, Committed, Transaction, TxKernel, Weighting, + Block, BlockHeader, BlockSums, Committed, OutputIdentifier, Transaction, TxKernel, Weighting, }; use self::util::RwLock; use crate::types::{BlockChain, PoolEntry, PoolError}; @@ -163,14 +163,21 @@ where self.entries.iter().map(|x| x.tx.clone()).collect() } - /// Return a single aggregate tx representing all txs in the txpool. - /// Returns None if the txpool is empty. - pub fn all_transactions_aggregate(&self) -> Result, PoolError> { - let txs = self.all_transactions(); + /// Return a single aggregate tx representing all txs in the pool. + /// Takes an optional "extra tx" to include in the aggregation. + /// Returns None if there is nothing to aggregate. + /// Returns the extra tx if provided and pool is empty. + pub fn all_transactions_aggregate( + &self, + extra_tx: Option, + ) -> Result, PoolError> { + let mut txs = self.all_transactions(); if txs.is_empty() { - return Ok(None); + return Ok(extra_tx); } + txs.extend(extra_tx); + let tx = transaction::aggregate(&txs)?; // Validate the single aggregate transaction "as pool", not subject to tx weight limits. @@ -185,7 +192,7 @@ where pub fn add_to_pool( &mut self, entry: PoolEntry, - extra_txs: Vec, + extra_tx: Option, header: &BlockHeader, ) -> Result<(), PoolError> { // Combine all the txs from the pool with any extra txs provided. @@ -196,7 +203,9 @@ where return Err(PoolError::DuplicateTx); } - txs.extend(extra_txs); + // Make sure we take extra_tx into consideration here. + // When adding to stempool we need to account for current txpool. + txs.extend(extra_tx); let agg_tx = if txs.is_empty() { // If we have nothing to aggregate then simply return the tx itself. @@ -280,6 +289,53 @@ where Ok(valid_txs) } + /// Convert a transaction for v2 compatibility. + /// We may receive a transaction with "commit only" inputs. + /// We convert it to "features and commit" so we can safely relay it to v2 peers. + /// Converson is done by looking up outputs to be spent in both the pool and the current utxo. + pub fn convert_tx_v2( + &self, + tx: Transaction, + extra_tx: Option, + ) -> Result { + let mut inputs: Vec<_> = tx.inputs().into(); + + let agg_tx = self + .all_transactions_aggregate(extra_tx)? + .unwrap_or(Transaction::empty()); + let mut outputs: Vec = + agg_tx.outputs().iter().map(|out| out.into()).collect(); + + // By applying cut_through to tx inputs and agg_tx outputs we can + // determine the outputs being spent from the pool and those still unspent + // that need to be looked up via the current utxo. + let (inputs, _, _, spent_pool) = + transaction::cut_through(&mut inputs[..], &mut outputs[..])?; + + // Lookup remaining outputs to be spent from the current utxo. + let spent_utxo = self.blockchain.validate_inputs(inputs.into())?; + + // Combine outputs spent in utxo with outputs spent in pool to give us the + // full set of outputs being spent by this transaction. + // This is our source of truth for input features. + let mut spent = spent_pool.to_vec(); + spent.extend(spent_utxo); + spent.sort(); + + // Now build the resulting transaction based on our inputs and outputs from the original transaction. + // Remember to use the original kernels and kernel offset. + let mut outputs = tx.outputs().to_vec(); + let (inputs, outputs, _, _) = transaction::cut_through(&mut spent[..], &mut outputs[..])?; + let inputs: Vec<_> = inputs.iter().map(|out| out.into()).collect(); + let tx = Transaction::new(inputs.as_slice(), outputs, tx.kernels()).with_offset(tx.offset); + + // Validate the tx to ensure our converted inputs are correct. + tx.validate(Weighting::AsTransaction, self.verifier_cache.clone()) + .map_err(PoolError::InvalidTx)?; + + Ok(tx) + } + fn apply_tx_to_block_sums( &self, tx: &Transaction, @@ -313,16 +369,9 @@ where ) -> Result<(), PoolError> { let existing_entries = self.entries.clone(); self.entries.clear(); - - let mut extra_txs = vec![]; - if let Some(extra_tx) = extra_tx { - extra_txs.push(extra_tx); - } - for x in existing_entries { - let _ = self.add_to_pool(x, extra_txs.clone(), header); + let _ = self.add_to_pool(x, extra_tx.clone(), header); } - Ok(()) } diff --git a/pool/src/transaction_pool.rs b/pool/src/transaction_pool.rs index 2ce71effa..8ddaef5b2 100644 --- a/pool/src/transaction_pool.rs +++ b/pool/src/transaction_pool.rs @@ -86,15 +86,29 @@ where } // Add tx to stempool (passing in all txs from txpool to validate against). - fn add_to_stempool(&mut self, entry: PoolEntry, header: &BlockHeader) -> Result<(), PoolError> { + fn add_to_stempool( + &mut self, + entry: PoolEntry, + header: &BlockHeader, + ) -> Result { + let txpool_agg = self.txpool.all_transactions_aggregate(None)?; + + // Convert the tx to v2 looking for unspent outputs in both stempool and txpool, and utxo. + let src = entry.src; + let tx = entry.tx; + let tx_v2 = self.stempool.convert_tx_v2(tx, txpool_agg.clone())?; + let entry = PoolEntry::new(tx_v2, src); + self.stempool - .add_to_pool(entry, self.txpool.all_transactions(), header)?; - Ok(()) + .add_to_pool(entry.clone(), txpool_agg, header)?; + + // If all is good return our pool entry with the converted tx. + Ok(entry) } - fn add_to_reorg_cache(&mut self, entry: PoolEntry) { + fn add_to_reorg_cache(&mut self, entry: &PoolEntry) { let mut cache = self.reorg_cache.write(); - cache.push_back(entry); + cache.push_back(entry.clone()); // We cache 30 mins of txs but we have a hard limit to avoid catastrophic failure. // For simplicity use the same value as the actual tx pool limit. @@ -106,31 +120,41 @@ where fn add_to_txpool( &mut self, - mut entry: PoolEntry, + entry: PoolEntry, header: &BlockHeader, - ) -> Result<(), PoolError> { + ) -> Result { // First deaggregate the tx based on current txpool txs. - if entry.tx.kernels().len() > 1 { - let txs = self.txpool.find_matching_transactions(entry.tx.kernels()); + let entry = if entry.tx.kernels().len() == 1 { + entry + } else { + let tx = entry.tx.clone(); + let txs = self.txpool.find_matching_transactions(tx.kernels()); if !txs.is_empty() { - let tx = transaction::deaggregate(entry.tx, &txs)?; + let tx = transaction::deaggregate(tx, &txs)?; // Validate this deaggregated tx "as tx", subject to regular tx weight limits. tx.validate(Weighting::AsTransaction, self.verifier_cache.clone())?; - entry.tx = tx; - entry.src = TxSource::Deaggregate; + PoolEntry::new(tx, TxSource::Deaggregate) + } else { + entry } - } - self.txpool.add_to_pool(entry.clone(), vec![], header)?; + }; + + // Convert the deaggregated tx to v2 looking for unspent outputs in the txpool, and utxo. + let src = entry.src; + let tx_v2 = self.txpool.convert_tx_v2(entry.tx, None)?; + let entry = PoolEntry::new(tx_v2, src); + + self.txpool.add_to_pool(entry.clone(), None, header)?; // We now need to reconcile the stempool based on the new state of the txpool. // Some stempool txs may no longer be valid and we need to evict them. - { - let txpool_tx = self.txpool.all_transactions_aggregate()?; - self.stempool.reconcile(txpool_tx, header)?; - } - Ok(()) + let txpool_agg = self.txpool.all_transactions_aggregate(None)?; + self.stempool.reconcile(txpool_agg, header)?; + + // If all is good return our pool entry with the deaggregated and converted tx. + Ok(entry) } /// Verify the tx kernel variants and ensure they can all be accepted to the txpool/stempool @@ -194,23 +218,17 @@ where // Check coinbase maturity before we go any further. self.blockchain.verify_coinbase_maturity(&tx)?; - let entry = PoolEntry { - src, - tx_at: Utc::now(), - tx, - }; - // If this is a stem tx then attempt to add it to stempool. // If the adapter fails to accept the new stem tx then fallback to fluff via txpool. if stem { - self.add_to_stempool(entry.clone(), header)?; + let entry = self.add_to_stempool(PoolEntry::new(tx.clone(), src), header)?; if self.adapter.stem_tx_accepted(&entry).is_ok() { return Ok(()); } } - self.add_to_txpool(entry.clone(), header)?; - self.add_to_reorg_cache(entry.clone()); + let entry = self.add_to_txpool(PoolEntry::new(tx, src), header)?; + self.add_to_reorg_cache(&entry); self.adapter.tx_accepted(&entry); // Transaction passed all the checks but we have to make space for it @@ -247,7 +265,7 @@ where header.hash(), ); for entry in entries { - let _ = &self.add_to_txpool(entry.clone(), header); + let _ = self.add_to_txpool(entry, header); } debug!( "reconcile_reorg_cache: block: {:?} ... done.", @@ -266,7 +284,7 @@ where // Now reconcile our stempool, accounting for the updated txpool txs. self.stempool.reconcile_block(block); { - let txpool_tx = self.txpool.all_transactions_aggregate()?; + let txpool_tx = self.txpool.all_transactions_aggregate(None)?; self.stempool.reconcile(txpool_tx, &block.header)?; } diff --git a/pool/src/types.rs b/pool/src/types.rs index 75f782594..67395a2ef 100644 --- a/pool/src/types.rs +++ b/pool/src/types.rs @@ -15,14 +15,13 @@ //! The primary module containing the implementations of the transaction pool //! and its top-level members. -use chrono::prelude::{DateTime, Utc}; - use self::core::consensus; use self::core::core::block; use self::core::core::committed; use self::core::core::hash::Hash; use self::core::core::transaction::{self, Transaction}; -use self::core::core::{BlockHeader, BlockSums}; +use self::core::core::{BlockHeader, BlockSums, Inputs, OutputIdentifier}; +use chrono::prelude::*; use failure::Fail; use grin_core as core; use grin_keychain as keychain; @@ -159,13 +158,23 @@ pub struct PoolEntry { pub tx: Transaction, } +impl PoolEntry { + pub fn new(tx: Transaction, src: TxSource) -> PoolEntry { + PoolEntry { + src, + tx_at: Utc::now(), + tx, + } + } +} + /// Used to make decisions based on transaction acceptance priority from /// various sources. For example, a node may want to bypass pool size /// restrictions when accepting a transaction from a local wallet. /// /// Most likely this will evolve to contain some sort of network identifier, /// once we get a better sense of what transaction building might look like. -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)] pub enum TxSource { PushApi, Broadcast, @@ -272,8 +281,14 @@ pub trait BlockChain: Sync + Send { /// have matured sufficiently. fn verify_tx_lock_height(&self, tx: &transaction::Transaction) -> Result<(), PoolError>; + /// Validate a transaction against the current utxo. fn validate_tx(&self, tx: &Transaction) -> Result<(), PoolError>; + /// Validate inputs against the current utxo. + /// Returns the vec of output identifiers that would be spent + /// by these inputs if they can all be successfully spent. + fn validate_inputs(&self, inputs: Inputs) -> Result, PoolError>; + fn chain_head(&self) -> Result; fn get_block_header(&self, hash: &Hash) -> Result; diff --git a/pool/tests/common.rs b/pool/tests/common.rs index e9515b141..877831caa 100644 --- a/pool/tests/common.rs +++ b/pool/tests/common.rs @@ -19,7 +19,9 @@ use self::chain::Chain; use self::core::consensus; use self::core::core::hash::Hash; use self::core::core::verifier_cache::{LruVerifierCache, VerifierCache}; -use self::core::core::{Block, BlockHeader, BlockSums, KernelFeatures, Transaction, TxKernel}; +use self::core::core::{ + Block, BlockHeader, BlockSums, Inputs, KernelFeatures, OutputIdentifier, Transaction, TxKernel, +}; use self::core::genesis; use self::core::global; use self::core::libtx::{build, reward, ProofBuilder}; @@ -134,6 +136,13 @@ impl BlockChain for ChainAdapter { }) } + fn validate_inputs(&self, inputs: Inputs) -> Result, PoolError> { + self.chain + .validate_inputs(inputs) + .map(|outputs| outputs.into_iter().map(|(out, _)| out).collect::>()) + .map_err(|_| PoolError::Other("failed to validate inputs".into())) + } + fn verify_coinbase_maturity(&self, tx: &Transaction) -> Result<(), PoolError> { self.chain .verify_coinbase_maturity(tx) diff --git a/pool/tests/transaction_pool.rs b/pool/tests/transaction_pool.rs index 9cfe0b3d0..5c4b5016b 100644 --- a/pool/tests/transaction_pool.rs +++ b/pool/tests/transaction_pool.rs @@ -150,7 +150,11 @@ fn test_the_transaction_pool() { // Check we can take some entries from the stempool and "fluff" them into the // txpool. This also exercises multi-kernel txs. { - let agg_tx = pool.stempool.all_transactions_aggregate().unwrap().unwrap(); + let agg_tx = pool + .stempool + .all_transactions_aggregate(None) + .unwrap() + .unwrap(); assert_eq!(agg_tx.kernels().len(), 2); pool.add_to_pool(test_source(), agg_tx, false, &header) .unwrap(); @@ -182,6 +186,7 @@ fn test_the_transaction_pool() { // that is a superset of a tx already in the pool. { let tx4 = test_transaction(&keychain, vec![800], vec![799]); + // tx1 and tx2 are already in the txpool (in aggregated form) // tx4 is the "new" part of this aggregated tx that we care about let agg_tx = transaction::aggregate(&[tx1.clone(), tx2.clone(), tx4]).unwrap(); diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index 4c1806fd2..83fb9ca83 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -30,7 +30,7 @@ use crate::common::types::{ChainValidationMode, DandelionEpoch, ServerConfig}; use crate::core::core::hash::{Hash, Hashed}; use crate::core::core::transaction::Transaction; use crate::core::core::verifier_cache::VerifierCache; -use crate::core::core::{BlockHeader, BlockSums, CompactBlock}; +use crate::core::core::{BlockHeader, BlockSums, CompactBlock, Inputs, OutputIdentifier}; use crate::core::pow::Difficulty; use crate::core::{core, global}; use crate::p2p; @@ -938,6 +938,13 @@ impl pool::BlockChain for PoolToChainAdapter { .map_err(|_| pool::PoolError::Other("failed to validate tx".to_string())) } + fn validate_inputs(&self, inputs: Inputs) -> Result, pool::PoolError> { + self.chain() + .validate_inputs(inputs) + .map(|outputs| outputs.into_iter().map(|(out, _)| out).collect::>()) + .map_err(|_| pool::PoolError::Other("failed to validate tx".to_string())) + } + fn verify_coinbase_maturity(&self, tx: &Transaction) -> Result<(), pool::PoolError> { self.chain() .verify_coinbase_maturity(tx) diff --git a/servers/src/grin/dandelion_monitor.rs b/servers/src/grin/dandelion_monitor.rs index 286bfff72..37c84004e 100644 --- a/servers/src/grin/dandelion_monitor.rs +++ b/servers/src/grin/dandelion_monitor.rs @@ -131,7 +131,7 @@ fn process_fluff_phase( let header = tx_pool.chain_head()?; let fluffable_txs = { - let txpool_tx = tx_pool.txpool.all_transactions_aggregate()?; + let txpool_tx = tx_pool.txpool.all_transactions_aggregate(None)?; let txs: Vec<_> = all_entries.into_iter().map(|x| x.tx).collect(); tx_pool.stempool.validate_raw_txs( &txs,