Reorg cache time based (cache 30mins of txs) (#2002)

* cache 30 mins of txs for reorg handling
hard limit set the same as the txpool size limit

* rustfmt
This commit is contained in:
Antioch Peverell 2018-11-20 21:47:59 +00:00 committed by GitHub
parent e86eb641f1
commit 61f1f6103b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -21,7 +21,8 @@ use std::collections::VecDeque;
use std::sync::Arc; use std::sync::Arc;
use util::RwLock; use util::RwLock;
use chrono::prelude::Utc; use chrono::prelude::*;
use chrono::Duration;
use core::core::hash::{Hash, Hashed}; use core::core::hash::{Hash, Hashed};
use core::core::id::ShortId; use core::core::id::ShortId;
@ -30,9 +31,6 @@ use core::core::{transaction, Block, BlockHeader, Transaction};
use pool::Pool; use pool::Pool;
use types::{BlockChain, PoolAdapter, PoolConfig, PoolEntry, PoolEntryState, PoolError, TxSource}; use types::{BlockChain, PoolAdapter, PoolConfig, PoolEntry, PoolEntryState, PoolError, TxSource};
// Cache this many txs to handle a potential fork and re-org.
const REORG_CACHE_SIZE: usize = 100;
/// Transaction pool implementation. /// Transaction pool implementation.
pub struct TransactionPool { pub struct TransactionPool {
/// Pool Config /// Pool Config
@ -87,14 +85,25 @@ impl TransactionPool {
Ok(()) Ok(())
} }
fn add_to_reorg_cache(&mut self, entry: PoolEntry) -> Result<(), PoolError> { fn add_to_reorg_cache(&mut self, entry: PoolEntry) {
let mut cache = self.reorg_cache.write(); let mut cache = self.reorg_cache.write();
cache.push_back(entry); cache.push_back(entry);
if cache.len() > REORG_CACHE_SIZE {
cache.pop_front(); // We cache 30 mins of txs but we have a hard limit to avoid catastrophic failure.
// For simplicity use the same value as the actual tx pool limit.
if cache.len() > self.config.max_pool_size {
let _ = cache.pop_front();
} }
debug!("added tx to reorg_cache: size now {}", cache.len()); debug!("added tx to reorg_cache: size now {}", cache.len());
Ok(()) }
// Old txs will "age out" after 30 mins.
fn truncate_reorg_cache(&mut self, cutoff: DateTime<Utc>) {
let mut cache = self.reorg_cache.write();
while cache.front().map(|x| x.tx_at < cutoff).unwrap_or(false) {
let _ = cache.pop_front();
}
} }
fn add_to_txpool( fn add_to_txpool(
@ -165,12 +174,16 @@ impl TransactionPool {
self.add_to_stempool(entry, header)?; self.add_to_stempool(entry, header)?;
} else { } else {
self.add_to_txpool(entry.clone(), header)?; self.add_to_txpool(entry.clone(), header)?;
self.add_to_reorg_cache(entry)?; self.add_to_reorg_cache(entry);
} }
Ok(()) Ok(())
} }
fn reconcile_reorg_cache(&mut self, header: &BlockHeader) -> Result<(), PoolError> { fn reconcile_reorg_cache(&mut self, header: &BlockHeader) -> Result<(), PoolError> {
// First "age out" any old txs in the reorg cache.
let cutoff = Utc::now() - Duration::minutes(30);
self.truncate_reorg_cache(cutoff);
let entries = self.reorg_cache.read().iter().cloned().collect::<Vec<_>>(); let entries = self.reorg_cache.read().iter().cloned().collect::<Vec<_>>();
debug!("reconcile_reorg_cache: size: {} ...", entries.len()); debug!("reconcile_reorg_cache: size: {} ...", entries.len());
for entry in entries { for entry in entries {
@ -255,6 +268,7 @@ impl TransactionPool {
/// Returns a vector of transactions from the txpool so we can build a /// Returns a vector of transactions from the txpool so we can build a
/// block from them. /// block from them.
pub fn prepare_mineable_transactions(&self) -> Result<Vec<Transaction>, PoolError> { pub fn prepare_mineable_transactions(&self) -> Result<Vec<Transaction>, PoolError> {
self.txpool.prepare_mineable_transactions(self.config.mineable_max_weight) self.txpool
.prepare_mineable_transactions(self.config.mineable_max_weight)
} }
} }