diff --git a/pool/src/transaction_pool.rs b/pool/src/transaction_pool.rs index 63c922268..911af6b56 100644 --- a/pool/src/transaction_pool.rs +++ b/pool/src/transaction_pool.rs @@ -21,7 +21,8 @@ use std::collections::VecDeque; use std::sync::Arc; use util::RwLock; -use chrono::prelude::Utc; +use chrono::prelude::*; +use chrono::Duration; use core::core::hash::{Hash, Hashed}; use core::core::id::ShortId; @@ -30,9 +31,6 @@ use core::core::{transaction, Block, BlockHeader, Transaction}; use pool::Pool; use types::{BlockChain, PoolAdapter, PoolConfig, PoolEntry, PoolEntryState, PoolError, TxSource}; -// Cache this many txs to handle a potential fork and re-org. -const REORG_CACHE_SIZE: usize = 100; - /// Transaction pool implementation. pub struct TransactionPool { /// Pool Config @@ -87,14 +85,25 @@ impl TransactionPool { Ok(()) } - fn add_to_reorg_cache(&mut self, entry: PoolEntry) -> Result<(), PoolError> { + fn add_to_reorg_cache(&mut self, entry: PoolEntry) { let mut cache = self.reorg_cache.write(); cache.push_back(entry); - if cache.len() > REORG_CACHE_SIZE { - cache.pop_front(); + + // We cache 30 mins of txs but we have a hard limit to avoid catastrophic failure. + // For simplicity use the same value as the actual tx pool limit. + if cache.len() > self.config.max_pool_size { + let _ = cache.pop_front(); } debug!("added tx to reorg_cache: size now {}", cache.len()); - Ok(()) + } + + // Old txs will "age out" after 30 mins. + fn truncate_reorg_cache(&mut self, cutoff: DateTime) { + let mut cache = self.reorg_cache.write(); + + while cache.front().map(|x| x.tx_at < cutoff).unwrap_or(false) { + let _ = cache.pop_front(); + } } fn add_to_txpool( @@ -165,12 +174,16 @@ impl TransactionPool { self.add_to_stempool(entry, header)?; } else { self.add_to_txpool(entry.clone(), header)?; - self.add_to_reorg_cache(entry)?; + self.add_to_reorg_cache(entry); } Ok(()) } fn reconcile_reorg_cache(&mut self, header: &BlockHeader) -> Result<(), PoolError> { + // First "age out" any old txs in the reorg cache. + let cutoff = Utc::now() - Duration::minutes(30); + self.truncate_reorg_cache(cutoff); + let entries = self.reorg_cache.read().iter().cloned().collect::>(); debug!("reconcile_reorg_cache: size: {} ...", entries.len()); for entry in entries { @@ -255,6 +268,7 @@ impl TransactionPool { /// Returns a vector of transactions from the txpool so we can build a /// block from them. pub fn prepare_mineable_transactions(&self) -> Result, PoolError> { - self.txpool.prepare_mineable_transactions(self.config.mineable_max_weight) + self.txpool + .prepare_mineable_transactions(self.config.mineable_max_weight) } }