From 4050f7fccb5bd88c5d418fe18c9bd36a03b38d77 Mon Sep 17 00:00:00 2001 From: Antioch Peverell Date: Wed, 24 Oct 2018 17:57:31 +0100 Subject: [PATCH] handle re-orgs in transaction_pool (#1829) * use reorg_cache in transaction_pool to safely handle txs during a re-org * rustfmt * comments --- core/src/core/block.rs | 64 +++++++++++++++++++++------ core/src/core/verifier_cache.rs | 4 +- pool/src/pool.rs | 25 ++++++----- pool/src/transaction_pool.rs | 49 +++++++++++++++++--- pool/src/types.rs | 9 ++++ servers/src/grin/dandelion_monitor.rs | 16 ++++++- 6 files changed, 132 insertions(+), 35 deletions(-) diff --git a/core/src/core/block.rs b/core/src/core/block.rs index 2e5f3563c..2336bda6b 100644 --- a/core/src/core/block.rs +++ b/core/src/core/block.rs @@ -418,6 +418,35 @@ impl Block { Ok(block) } + /// Extract tx data from this block as a single aggregate tx. + pub fn aggregate_transaction( + &self, + prev_kernel_offset: BlindingFactor, + ) -> Result, Error> { + let inputs = self.inputs().iter().cloned().collect(); + let outputs = self + .outputs() + .iter() + .filter(|x| !x.features.contains(OutputFeatures::COINBASE_OUTPUT)) + .cloned() + .collect(); + let kernels = self + .kernels() + .iter() + .filter(|x| !x.features.contains(KernelFeatures::COINBASE_KERNEL)) + .cloned() + .collect::>(); + + let tx = if kernels.is_empty() { + None + } else { + let tx = Transaction::new(inputs, outputs, kernels) + .with_offset(self.block_kernel_offset(prev_kernel_offset)?); + Some(tx) + }; + Ok(tx) + } + /// Hydrate a block from a compact block. /// Note: caller must validate the block themselves, we do not validate it /// here. @@ -589,6 +618,23 @@ impl Block { Ok(()) } + fn block_kernel_offset( + &self, + prev_kernel_offset: BlindingFactor, + ) -> Result { + let offset = if self.header.total_kernel_offset() == prev_kernel_offset { + // special case when the sum hasn't changed (typically an empty block), + // zero isn't a valid private key but it's a valid blinding factor + BlindingFactor::zero() + } else { + committed::sum_kernel_offsets( + vec![self.header.total_kernel_offset()], + vec![prev_kernel_offset], + )? + }; + Ok(offset) + } + /// Validates all the elements in a block that can be checked without /// additional data. Includes commitment sums and kernels, Merkle /// trees, reward, etc. @@ -596,7 +642,7 @@ impl Block { &self, prev_kernel_offset: &BlindingFactor, verifier: Arc>, - ) -> Result<(Commitment), Error> { + ) -> Result { self.body.validate(true, verifier)?; self.verify_kernel_lock_heights()?; @@ -604,18 +650,10 @@ impl Block { // take the kernel offset for this block (block offset minus previous) and // verify.body.outputs and kernel sums - let block_kernel_offset = if self.header.total_kernel_offset() == *prev_kernel_offset { - // special case when the sum hasn't changed (typically an empty block), - // zero isn't a valid private key but it's a valid blinding factor - BlindingFactor::zero() - } else { - committed::sum_kernel_offsets( - vec![self.header.total_kernel_offset()], - vec![*prev_kernel_offset], - )? - }; - let (_utxo_sum, kernel_sum) = - self.verify_kernel_sums(self.header.overage(), block_kernel_offset)?; + let (_utxo_sum, kernel_sum) = self.verify_kernel_sums( + self.header.overage(), + self.block_kernel_offset(*prev_kernel_offset)?, + )?; Ok(kernel_sum) } diff --git a/core/src/core/verifier_cache.rs b/core/src/core/verifier_cache.rs index aca2bc16c..ad88882da 100644 --- a/core/src/core/verifier_cache.rs +++ b/core/src/core/verifier_cache.rs @@ -67,7 +67,7 @@ impl VerifierCache for LruVerifierCache { .unwrap_or(&mut false) }).cloned() .collect::>(); - debug!( + trace!( "lru_verifier_cache: kernel sigs: {}, not cached (must verify): {}", kernels.len(), res.len() @@ -85,7 +85,7 @@ impl VerifierCache for LruVerifierCache { .unwrap_or(&mut false) }).cloned() .collect::>(); - debug!( + trace!( "lru_verifier_cache: rangeproofs: {}, not cached (must verify): {}", outputs.len(), res.len() diff --git a/pool/src/pool.rs b/pool/src/pool.rs index 636dc6494..8ade2a131 100644 --- a/pool/src/pool.rs +++ b/pool/src/pool.rs @@ -188,17 +188,6 @@ impl Pool { extra_txs: Vec, header: &BlockHeader, ) -> Result<(), PoolError> { - debug!( - "pool [{}]: add_to_pool: {}, {:?}, inputs: {}, outputs: {}, kernels: {} (at block {})", - self.name, - entry.tx.hash(), - entry.src, - entry.tx.inputs().len(), - entry.tx.outputs().len(), - entry.tx.kernels().len(), - header.hash(), - ); - // Combine all the txs from the pool with any extra txs provided. let mut txs = self.all_transactions(); @@ -226,7 +215,19 @@ impl Pool { self.validate_raw_tx(&agg_tx, header)?; // If we get here successfully then we can safely add the entry to the pool. - self.entries.push(entry); + self.entries.push(entry.clone()); + + debug!( + "add_to_pool [{}]: {} ({}), in/out/kern: {}/{}/{}, pool: {} (at block {})", + self.name, + entry.tx.hash(), + entry.src.debug_name, + entry.tx.inputs().len(), + entry.tx.outputs().len(), + entry.tx.kernels().len(), + self.size(), + header.hash(), + ); Ok(()) } diff --git a/pool/src/transaction_pool.rs b/pool/src/transaction_pool.rs index 899acab44..b05453b6c 100644 --- a/pool/src/transaction_pool.rs +++ b/pool/src/transaction_pool.rs @@ -17,6 +17,7 @@ //! resulting tx pool can be added to the current chain state to produce a //! valid chain state. +use std::collections::VecDeque; use std::sync::Arc; use util::RwLock; @@ -29,6 +30,9 @@ use core::core::{transaction, Block, BlockHeader, Transaction}; use pool::Pool; use types::{BlockChain, PoolAdapter, PoolConfig, PoolEntry, PoolEntryState, PoolError, TxSource}; +// Cache this many txs to handle a potential fork and re-org. +const REORG_CACHE_SIZE: usize = 100; + /// Transaction pool implementation. pub struct TransactionPool { /// Pool Config @@ -37,6 +41,8 @@ pub struct TransactionPool { pub txpool: Pool, /// Our Dandelion "stempool". pub stempool: Pool, + /// Cache of previous txs in case of a re-org. + pub reorg_cache: Arc>>, /// The blockchain pub blockchain: Arc, pub verifier_cache: Arc>, @@ -56,6 +62,7 @@ impl TransactionPool { config, txpool: Pool::new(chain.clone(), verifier_cache.clone(), format!("txpool")), stempool: Pool::new(chain.clone(), verifier_cache.clone(), format!("stempool")), + reorg_cache: Arc::new(RwLock::new(VecDeque::new())), blockchain: chain, verifier_cache, adapter, @@ -76,6 +83,16 @@ impl TransactionPool { Ok(()) } + fn add_to_reorg_cache(&mut self, entry: PoolEntry) -> Result<(), PoolError> { + let mut cache = self.reorg_cache.write(); + cache.push_back(entry); + if cache.len() > REORG_CACHE_SIZE { + cache.pop_front(); + } + debug!("added tx to reorg_cache: size now {}", cache.len()); + Ok(()) + } + fn add_to_txpool( &mut self, mut entry: PoolEntry, @@ -97,8 +114,10 @@ impl TransactionPool { // We now need to reconcile the stempool based on the new state of the txpool. // Some stempool txs may no longer be valid and we need to evict them. - let txpool_tx = self.txpool.aggregate_transaction()?; - self.stempool.reconcile(txpool_tx, header)?; + { + let txpool_tx = self.txpool.aggregate_transaction()?; + self.stempool.reconcile(txpool_tx, header)?; + } self.adapter.tx_accepted(&entry.tx); Ok(()) @@ -140,13 +159,25 @@ impl TransactionPool { }; if stem { + // TODO - what happens to txs in the stempool in a re-org scenario? self.add_to_stempool(entry, header)?; } else { - self.add_to_txpool(entry, header)?; + self.add_to_txpool(entry.clone(), header)?; + self.add_to_reorg_cache(entry)?; } Ok(()) } + fn reconcile_reorg_cache(&mut self, header: &BlockHeader) -> Result<(), PoolError> { + let entries = self.reorg_cache.read().iter().cloned().collect::>(); + debug!("reconcile_reorg_cache: size: {} ...", entries.len()); + for entry in entries { + let _ = &self.add_to_txpool(entry.clone(), header); + } + debug!("reconcile_reorg_cache: ... done."); + Ok(()) + } + /// Reconcile the transaction pool (both txpool and stempool) against the /// provided block. pub fn reconcile_block(&mut self, block: &Block) -> Result<(), PoolError> { @@ -154,10 +185,16 @@ impl TransactionPool { self.txpool.reconcile_block(block)?; self.txpool.reconcile(None, &block.header)?; - // Then reconcile the stempool, accounting for the txpool txs. - let txpool_tx = self.txpool.aggregate_transaction()?; + // Take our "reorg_cache" and see if this block means + // we need to (re)add old txs due to a fork and re-org. + self.reconcile_reorg_cache(&block.header)?; + + // Now reconcile our stempool, accounting for the updated txpool txs. self.stempool.reconcile_block(block)?; - self.stempool.reconcile(txpool_tx, &block.header)?; + { + let txpool_tx = self.txpool.aggregate_transaction()?; + self.stempool.reconcile(txpool_tx, &block.header)?; + } Ok(()) } diff --git a/pool/src/types.rs b/pool/src/types.rs index a268602cc..d3b1be73f 100644 --- a/pool/src/types.rs +++ b/pool/src/types.rs @@ -18,6 +18,7 @@ use chrono::prelude::{DateTime, Utc}; use core::consensus; +use core::core::block; use core::core::committed; use core::core::hash::Hash; use core::core::transaction::{self, Transaction}; @@ -163,6 +164,8 @@ pub struct TxSource { pub enum PoolError { /// An invalid pool entry caused by underlying tx validation error InvalidTx(transaction::Error), + /// An invalid pool entry caused by underlying block validation error + InvalidBlock(block::Error), /// Underlying keychain error. Keychain(keychain::Error), /// Underlying "committed" error. @@ -192,6 +195,12 @@ impl From for PoolError { } } +impl From for PoolError { + fn from(e: block::Error) -> PoolError { + PoolError::InvalidBlock(e) + } +} + impl From for PoolError { fn from(e: keychain::Error) -> PoolError { PoolError::Keychain(e) diff --git a/servers/src/grin/dandelion_monitor.rs b/servers/src/grin/dandelion_monitor.rs index 3106b484f..4620a937b 100644 --- a/servers/src/grin/dandelion_monitor.rs +++ b/servers/src/grin/dandelion_monitor.rs @@ -90,10 +90,16 @@ fn process_stem_phase( let header = tx_pool.chain_head()?; - let txpool_tx = tx_pool.txpool.aggregate_transaction()?; let stem_txs = tx_pool .stempool .get_transactions_in_state(PoolEntryState::ToStem); + + if stem_txs.is_empty() { + return Ok(()); + } + + let txpool_tx = tx_pool.txpool.aggregate_transaction()?; + let stem_txs = tx_pool .stempool .select_valid_transactions(stem_txs, txpool_tx, &header)?; @@ -130,10 +136,16 @@ fn process_fluff_phase( let header = tx_pool.chain_head()?; - let txpool_tx = tx_pool.txpool.aggregate_transaction()?; let stem_txs = tx_pool .stempool .get_transactions_in_state(PoolEntryState::ToFluff); + + if stem_txs.is_empty() { + return Ok(()); + } + + let txpool_tx = tx_pool.txpool.aggregate_transaction()?; + let stem_txs = tx_pool .stempool .select_valid_transactions(stem_txs, txpool_tx, &header)?;