handle re-orgs in transaction_pool (#1829)

* use reorg_cache in transaction_pool to safely handle txs during a re-org

* rustfmt

* comments
This commit is contained in:
Antioch Peverell 2018-10-24 17:57:31 +01:00 committed by GitHub
parent 3d74d7fe85
commit 4050f7fccb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 132 additions and 35 deletions

View file

@ -418,6 +418,35 @@ impl Block {
Ok(block)
}
/// Extract tx data from this block as a single aggregate tx.
pub fn aggregate_transaction(
&self,
prev_kernel_offset: BlindingFactor,
) -> Result<Option<Transaction>, Error> {
let inputs = self.inputs().iter().cloned().collect();
let outputs = self
.outputs()
.iter()
.filter(|x| !x.features.contains(OutputFeatures::COINBASE_OUTPUT))
.cloned()
.collect();
let kernels = self
.kernels()
.iter()
.filter(|x| !x.features.contains(KernelFeatures::COINBASE_KERNEL))
.cloned()
.collect::<Vec<_>>();
let tx = if kernels.is_empty() {
None
} else {
let tx = Transaction::new(inputs, outputs, kernels)
.with_offset(self.block_kernel_offset(prev_kernel_offset)?);
Some(tx)
};
Ok(tx)
}
/// Hydrate a block from a compact block.
/// Note: caller must validate the block themselves, we do not validate it
/// here.
@ -589,6 +618,23 @@ impl Block {
Ok(())
}
fn block_kernel_offset(
&self,
prev_kernel_offset: BlindingFactor,
) -> Result<BlindingFactor, Error> {
let offset = if self.header.total_kernel_offset() == prev_kernel_offset {
// special case when the sum hasn't changed (typically an empty block),
// zero isn't a valid private key but it's a valid blinding factor
BlindingFactor::zero()
} else {
committed::sum_kernel_offsets(
vec![self.header.total_kernel_offset()],
vec![prev_kernel_offset],
)?
};
Ok(offset)
}
/// Validates all the elements in a block that can be checked without
/// additional data. Includes commitment sums and kernels, Merkle
/// trees, reward, etc.
@ -596,7 +642,7 @@ impl Block {
&self,
prev_kernel_offset: &BlindingFactor,
verifier: Arc<RwLock<VerifierCache>>,
) -> Result<(Commitment), Error> {
) -> Result<Commitment, Error> {
self.body.validate(true, verifier)?;
self.verify_kernel_lock_heights()?;
@ -604,18 +650,10 @@ impl Block {
// take the kernel offset for this block (block offset minus previous) and
// verify.body.outputs and kernel sums
let block_kernel_offset = if self.header.total_kernel_offset() == *prev_kernel_offset {
// special case when the sum hasn't changed (typically an empty block),
// zero isn't a valid private key but it's a valid blinding factor
BlindingFactor::zero()
} else {
committed::sum_kernel_offsets(
vec![self.header.total_kernel_offset()],
vec![*prev_kernel_offset],
)?
};
let (_utxo_sum, kernel_sum) =
self.verify_kernel_sums(self.header.overage(), block_kernel_offset)?;
let (_utxo_sum, kernel_sum) = self.verify_kernel_sums(
self.header.overage(),
self.block_kernel_offset(*prev_kernel_offset)?,
)?;
Ok(kernel_sum)
}

View file

@ -67,7 +67,7 @@ impl VerifierCache for LruVerifierCache {
.unwrap_or(&mut false)
}).cloned()
.collect::<Vec<_>>();
debug!(
trace!(
"lru_verifier_cache: kernel sigs: {}, not cached (must verify): {}",
kernels.len(),
res.len()
@ -85,7 +85,7 @@ impl VerifierCache for LruVerifierCache {
.unwrap_or(&mut false)
}).cloned()
.collect::<Vec<_>>();
debug!(
trace!(
"lru_verifier_cache: rangeproofs: {}, not cached (must verify): {}",
outputs.len(),
res.len()

View file

@ -188,17 +188,6 @@ impl Pool {
extra_txs: Vec<Transaction>,
header: &BlockHeader,
) -> Result<(), PoolError> {
debug!(
"pool [{}]: add_to_pool: {}, {:?}, inputs: {}, outputs: {}, kernels: {} (at block {})",
self.name,
entry.tx.hash(),
entry.src,
entry.tx.inputs().len(),
entry.tx.outputs().len(),
entry.tx.kernels().len(),
header.hash(),
);
// Combine all the txs from the pool with any extra txs provided.
let mut txs = self.all_transactions();
@ -226,7 +215,19 @@ impl Pool {
self.validate_raw_tx(&agg_tx, header)?;
// If we get here successfully then we can safely add the entry to the pool.
self.entries.push(entry);
self.entries.push(entry.clone());
debug!(
"add_to_pool [{}]: {} ({}), in/out/kern: {}/{}/{}, pool: {} (at block {})",
self.name,
entry.tx.hash(),
entry.src.debug_name,
entry.tx.inputs().len(),
entry.tx.outputs().len(),
entry.tx.kernels().len(),
self.size(),
header.hash(),
);
Ok(())
}

View file

@ -17,6 +17,7 @@
//! resulting tx pool can be added to the current chain state to produce a
//! valid chain state.
use std::collections::VecDeque;
use std::sync::Arc;
use util::RwLock;
@ -29,6 +30,9 @@ use core::core::{transaction, Block, BlockHeader, Transaction};
use pool::Pool;
use types::{BlockChain, PoolAdapter, PoolConfig, PoolEntry, PoolEntryState, PoolError, TxSource};
// Cache this many txs to handle a potential fork and re-org.
const REORG_CACHE_SIZE: usize = 100;
/// Transaction pool implementation.
pub struct TransactionPool {
/// Pool Config
@ -37,6 +41,8 @@ pub struct TransactionPool {
pub txpool: Pool,
/// Our Dandelion "stempool".
pub stempool: Pool,
/// Cache of previous txs in case of a re-org.
pub reorg_cache: Arc<RwLock<VecDeque<PoolEntry>>>,
/// The blockchain
pub blockchain: Arc<BlockChain>,
pub verifier_cache: Arc<RwLock<VerifierCache>>,
@ -56,6 +62,7 @@ impl TransactionPool {
config,
txpool: Pool::new(chain.clone(), verifier_cache.clone(), format!("txpool")),
stempool: Pool::new(chain.clone(), verifier_cache.clone(), format!("stempool")),
reorg_cache: Arc::new(RwLock::new(VecDeque::new())),
blockchain: chain,
verifier_cache,
adapter,
@ -76,6 +83,16 @@ impl TransactionPool {
Ok(())
}
fn add_to_reorg_cache(&mut self, entry: PoolEntry) -> Result<(), PoolError> {
let mut cache = self.reorg_cache.write();
cache.push_back(entry);
if cache.len() > REORG_CACHE_SIZE {
cache.pop_front();
}
debug!("added tx to reorg_cache: size now {}", cache.len());
Ok(())
}
fn add_to_txpool(
&mut self,
mut entry: PoolEntry,
@ -97,8 +114,10 @@ impl TransactionPool {
// We now need to reconcile the stempool based on the new state of the txpool.
// Some stempool txs may no longer be valid and we need to evict them.
let txpool_tx = self.txpool.aggregate_transaction()?;
self.stempool.reconcile(txpool_tx, header)?;
{
let txpool_tx = self.txpool.aggregate_transaction()?;
self.stempool.reconcile(txpool_tx, header)?;
}
self.adapter.tx_accepted(&entry.tx);
Ok(())
@ -140,13 +159,25 @@ impl TransactionPool {
};
if stem {
// TODO - what happens to txs in the stempool in a re-org scenario?
self.add_to_stempool(entry, header)?;
} else {
self.add_to_txpool(entry, header)?;
self.add_to_txpool(entry.clone(), header)?;
self.add_to_reorg_cache(entry)?;
}
Ok(())
}
fn reconcile_reorg_cache(&mut self, header: &BlockHeader) -> Result<(), PoolError> {
let entries = self.reorg_cache.read().iter().cloned().collect::<Vec<_>>();
debug!("reconcile_reorg_cache: size: {} ...", entries.len());
for entry in entries {
let _ = &self.add_to_txpool(entry.clone(), header);
}
debug!("reconcile_reorg_cache: ... done.");
Ok(())
}
/// Reconcile the transaction pool (both txpool and stempool) against the
/// provided block.
pub fn reconcile_block(&mut self, block: &Block) -> Result<(), PoolError> {
@ -154,10 +185,16 @@ impl TransactionPool {
self.txpool.reconcile_block(block)?;
self.txpool.reconcile(None, &block.header)?;
// Then reconcile the stempool, accounting for the txpool txs.
let txpool_tx = self.txpool.aggregate_transaction()?;
// Take our "reorg_cache" and see if this block means
// we need to (re)add old txs due to a fork and re-org.
self.reconcile_reorg_cache(&block.header)?;
// Now reconcile our stempool, accounting for the updated txpool txs.
self.stempool.reconcile_block(block)?;
self.stempool.reconcile(txpool_tx, &block.header)?;
{
let txpool_tx = self.txpool.aggregate_transaction()?;
self.stempool.reconcile(txpool_tx, &block.header)?;
}
Ok(())
}

View file

@ -18,6 +18,7 @@
use chrono::prelude::{DateTime, Utc};
use core::consensus;
use core::core::block;
use core::core::committed;
use core::core::hash::Hash;
use core::core::transaction::{self, Transaction};
@ -163,6 +164,8 @@ pub struct TxSource {
pub enum PoolError {
/// An invalid pool entry caused by underlying tx validation error
InvalidTx(transaction::Error),
/// An invalid pool entry caused by underlying block validation error
InvalidBlock(block::Error),
/// Underlying keychain error.
Keychain(keychain::Error),
/// Underlying "committed" error.
@ -192,6 +195,12 @@ impl From<transaction::Error> for PoolError {
}
}
impl From<block::Error> for PoolError {
fn from(e: block::Error) -> PoolError {
PoolError::InvalidBlock(e)
}
}
impl From<keychain::Error> for PoolError {
fn from(e: keychain::Error) -> PoolError {
PoolError::Keychain(e)

View file

@ -90,10 +90,16 @@ fn process_stem_phase(
let header = tx_pool.chain_head()?;
let txpool_tx = tx_pool.txpool.aggregate_transaction()?;
let stem_txs = tx_pool
.stempool
.get_transactions_in_state(PoolEntryState::ToStem);
if stem_txs.is_empty() {
return Ok(());
}
let txpool_tx = tx_pool.txpool.aggregate_transaction()?;
let stem_txs = tx_pool
.stempool
.select_valid_transactions(stem_txs, txpool_tx, &header)?;
@ -130,10 +136,16 @@ fn process_fluff_phase(
let header = tx_pool.chain_head()?;
let txpool_tx = tx_pool.txpool.aggregate_transaction()?;
let stem_txs = tx_pool
.stempool
.get_transactions_in_state(PoolEntryState::ToFluff);
if stem_txs.is_empty() {
return Ok(());
}
let txpool_tx = tx_pool.txpool.aggregate_transaction()?;
let stem_txs = tx_pool
.stempool
.select_valid_transactions(stem_txs, txpool_tx, &header)?;