Pool crate cleanup (#1835)

* Refactor retrieve_transaction
* Cleanup transaction_pool
This commit is contained in:
hashmap 2018-10-25 14:21:36 +02:00 committed by GitHub
parent 4050f7fccb
commit ab7a59b1c2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 53 additions and 60 deletions

View file

@ -48,8 +48,8 @@ impl Pool {
) -> Pool { ) -> Pool {
Pool { Pool {
entries: vec![], entries: vec![],
blockchain: chain.clone(), blockchain: chain,
verifier_cache: verifier_cache.clone(), verifier_cache,
name, name,
} }
} }
@ -74,34 +74,34 @@ impl Pool {
&self, &self,
hash: Hash, hash: Hash,
nonce: u64, nonce: u64,
kern_ids: &Vec<ShortId>, kern_ids: &[ShortId],
) -> (Vec<Transaction>, Vec<ShortId>) { ) -> (Vec<Transaction>, Vec<ShortId>) {
let mut rehashed = HashMap::new(); let mut txs = vec![];
let mut found_ids = vec![];
// Rehash all entries in the pool using short_ids based on provided hash and nonce. // Rehash all entries in the pool using short_ids based on provided hash and nonce.
for x in &self.entries { 'outer: for x in &self.entries {
for k in x.tx.kernels() { for k in x.tx.kernels() {
// rehash each kernel to calculate the block specific short_id // rehash each kernel to calculate the block specific short_id
let short_id = k.short_id(&hash, nonce); let short_id = k.short_id(&hash, nonce);
rehashed.insert(short_id, x.tx.hash()); if kern_ids.contains(&short_id) {
txs.push(x.tx.clone());
found_ids.push(short_id);
}
if found_ids.len() == kern_ids.len() {
break 'outer;
}
} }
} }
txs.dedup();
// Retrive the txs from the pool by the set of unique hashes. (
let hashes: HashSet<_> = rehashed.values().collect(); txs,
let txs = hashes.into_iter().filter_map(|x| self.get_tx(*x)).collect(); kern_ids
.into_iter()
// Calculate the missing ids based on the ids passed in .filter(|id| !found_ids.contains(id))
// and the ids that successfully matched txs. .cloned()
let matched_ids: HashSet<_> = rehashed.keys().collect(); .collect(),
let all_ids: HashSet<_> = kern_ids.iter().collect(); )
let missing_ids = all_ids
.difference(&matched_ids)
.map(|x| *x)
.cloned()
.collect();
(txs, missing_ids)
} }
/// Take pool transactions, filtering and ordering them in a way that's /// Take pool transactions, filtering and ordering them in a way that's
@ -171,10 +171,10 @@ impl Pool {
} }
// Transition the specified pool entries to the new state. // Transition the specified pool entries to the new state.
pub fn transition_to_state(&mut self, txs: &Vec<Transaction>, state: PoolEntryState) { pub fn transition_to_state(&mut self, txs: &[Transaction], state: PoolEntryState) {
for x in self.entries.iter_mut() { for x in &mut self.entries {
if txs.contains(&x.tx) { if txs.contains(&x.tx) {
x.state = state.clone(); x.state = state;
} }
} }
} }
@ -214,9 +214,6 @@ impl Pool {
// Validate aggregated tx against a known chain state. // Validate aggregated tx against a known chain state.
self.validate_raw_tx(&agg_tx, header)?; self.validate_raw_tx(&agg_tx, header)?;
// If we get here successfully then we can safely add the entry to the pool.
self.entries.push(entry.clone());
debug!( debug!(
"add_to_pool [{}]: {} ({}), in/out/kern: {}/{}/{}, pool: {} (at block {})", "add_to_pool [{}]: {} ({}), in/out/kern: {}/{}/{}, pool: {} (at block {})",
self.name, self.name,
@ -228,6 +225,8 @@ impl Pool {
self.size(), self.size(),
header.hash(), header.hash(),
); );
// If we get here successfully then we can safely add the entry to the pool.
self.entries.push(entry);
Ok(()) Ok(())
} }
@ -311,7 +310,7 @@ impl Pool {
} }
for x in existing_entries { for x in existing_entries {
let _ = self.add_to_pool(x.clone(), extra_txs.clone(), header); let _ = self.add_to_pool(x, extra_txs.clone(), header);
} }
Ok(()) Ok(())
@ -352,20 +351,7 @@ impl Pool {
tx_buckets tx_buckets
} }
// Filter txs in the pool based on the latest block. pub fn find_matching_transactions(&self, kernels: &[TxKernel]) -> Vec<Transaction> {
// Reject any txs where we see a matching tx kernel in the block.
// Also reject any txs where we see a conflicting tx,
// where an input is spent in a different tx.
fn remaining_transactions(&self, block: &Block) -> Vec<Transaction> {
self.entries
.iter()
.filter(|x| !x.tx.kernels().iter().any(|y| block.kernels().contains(y)))
.filter(|x| !x.tx.inputs().iter().any(|y| block.inputs().contains(y)))
.map(|x| x.tx.clone())
.collect()
}
pub fn find_matching_transactions(&self, kernels: Vec<TxKernel>) -> Vec<Transaction> {
// While the inputs outputs can be cut-through the kernel will stay intact // While the inputs outputs can be cut-through the kernel will stay intact
// In order to deaggregate tx we look for tx with the same kernel // In order to deaggregate tx we look for tx with the same kernel
let mut found_txs = vec![]; let mut found_txs = vec![];
@ -375,7 +361,7 @@ impl Pool {
// Check each transaction in the pool // Check each transaction in the pool
for entry in &self.entries { for entry in &self.entries {
let entry_kernel_set = entry.tx.kernels().iter().cloned().collect::<HashSet<_>>(); let entry_kernel_set = entry.tx.kernels().iter().collect::<HashSet<_>>();
if entry_kernel_set.is_subset(&kernel_set) { if entry_kernel_set.is_subset(&kernel_set) {
found_txs.push(entry.tx.clone()); found_txs.push(entry.tx.clone());
} }
@ -385,10 +371,15 @@ impl Pool {
/// Quick reconciliation step - we can evict any txs in the pool where /// Quick reconciliation step - we can evict any txs in the pool where
/// inputs or kernels intersect with the block. /// inputs or kernels intersect with the block.
pub fn reconcile_block(&mut self, block: &Block) -> Result<(), PoolError> { pub fn reconcile_block(&mut self, block: &Block) {
let candidate_txs = self.remaining_transactions(block); // Filter txs in the pool based on the latest block.
self.entries.retain(|x| candidate_txs.contains(&x.tx)); // Reject any txs where we see a matching tx kernel in the block.
Ok(()) // Also reject any txs where we see a conflicting tx,
// where an input is spent in a different tx.
self.entries.retain(|x| {
!x.tx.kernels().iter().any(|y| block.kernels().contains(y))
&& !x.tx.inputs().iter().any(|y| block.inputs().contains(y))
});
} }
pub fn size(&self) -> usize { pub fn size(&self) -> usize {

View file

@ -60,8 +60,12 @@ impl TransactionPool {
) -> TransactionPool { ) -> TransactionPool {
TransactionPool { TransactionPool {
config, config,
txpool: Pool::new(chain.clone(), verifier_cache.clone(), format!("txpool")), txpool: Pool::new(chain.clone(), verifier_cache.clone(), "txpool".to_string()),
stempool: Pool::new(chain.clone(), verifier_cache.clone(), format!("stempool")), stempool: Pool::new(
chain.clone(),
verifier_cache.clone(),
"stempool".to_string(),
),
reorg_cache: Arc::new(RwLock::new(VecDeque::new())), reorg_cache: Arc::new(RwLock::new(VecDeque::new())),
blockchain: chain, blockchain: chain,
verifier_cache, verifier_cache,
@ -76,7 +80,7 @@ impl TransactionPool {
fn add_to_stempool(&mut self, entry: PoolEntry, header: &BlockHeader) -> Result<(), PoolError> { fn add_to_stempool(&mut self, entry: PoolEntry, header: &BlockHeader) -> Result<(), PoolError> {
// Add tx to stempool (passing in all txs from txpool to validate against). // Add tx to stempool (passing in all txs from txpool to validate against).
self.stempool self.stempool
.add_to_pool(entry.clone(), self.txpool.all_transactions(), header)?; .add_to_pool(entry, self.txpool.all_transactions(), header)?;
// Note: we do not notify the adapter here, // Note: we do not notify the adapter here,
// we let the dandelion monitor handle this. // we let the dandelion monitor handle this.
@ -100,9 +104,7 @@ impl TransactionPool {
) -> Result<(), PoolError> { ) -> Result<(), PoolError> {
// First deaggregate the tx based on current txpool txs. // First deaggregate the tx based on current txpool txs.
if entry.tx.kernels().len() > 1 { if entry.tx.kernels().len() > 1 {
let txs = self let txs = self.txpool.find_matching_transactions(entry.tx.kernels());
.txpool
.find_matching_transactions(entry.tx.kernels().clone());
if !txs.is_empty() { if !txs.is_empty() {
let tx = transaction::deaggregate(entry.tx, txs)?; let tx = transaction::deaggregate(entry.tx, txs)?;
tx.validate(self.verifier_cache.clone())?; tx.validate(self.verifier_cache.clone())?;
@ -143,7 +145,7 @@ impl TransactionPool {
// Make sure the transaction is valid before anything else. // Make sure the transaction is valid before anything else.
tx.validate(self.verifier_cache.clone()) tx.validate(self.verifier_cache.clone())
.map_err(|e| PoolError::InvalidTx(e))?; .map_err(PoolError::InvalidTx)?;
// Check the tx lock_time is valid based on current chain state. // Check the tx lock_time is valid based on current chain state.
self.blockchain.verify_tx_lock_height(&tx)?; self.blockchain.verify_tx_lock_height(&tx)?;
@ -155,7 +157,7 @@ impl TransactionPool {
state: PoolEntryState::Fresh, state: PoolEntryState::Fresh,
src, src,
tx_at: Utc::now(), tx_at: Utc::now(),
tx: tx.clone(), tx,
}; };
if stem { if stem {
@ -182,7 +184,7 @@ impl TransactionPool {
/// provided block. /// provided block.
pub fn reconcile_block(&mut self, block: &Block) -> Result<(), PoolError> { pub fn reconcile_block(&mut self, block: &Block) -> Result<(), PoolError> {
// First reconcile the txpool. // First reconcile the txpool.
self.txpool.reconcile_block(block)?; self.txpool.reconcile_block(block);
self.txpool.reconcile(None, &block.header)?; self.txpool.reconcile(None, &block.header)?;
// Take our "reorg_cache" and see if this block means // Take our "reorg_cache" and see if this block means
@ -190,7 +192,7 @@ impl TransactionPool {
self.reconcile_reorg_cache(&block.header)?; self.reconcile_reorg_cache(&block.header)?;
// Now reconcile our stempool, accounting for the updated txpool txs. // Now reconcile our stempool, accounting for the updated txpool txs.
self.stempool.reconcile_block(block)?; self.stempool.reconcile_block(block);
{ {
let txpool_tx = self.txpool.aggregate_transaction()?; let txpool_tx = self.txpool.aggregate_transaction()?;
self.stempool.reconcile(txpool_tx, &block.header)?; self.stempool.reconcile(txpool_tx, &block.header)?;
@ -206,7 +208,7 @@ impl TransactionPool {
&self, &self,
hash: Hash, hash: Hash,
nonce: u64, nonce: u64,
kern_ids: &Vec<ShortId>, kern_ids: &[ShortId],
) -> (Vec<Transaction>, Vec<ShortId>) { ) -> (Vec<Transaction>, Vec<ShortId>) {
self.txpool.retrieve_transactions(hash, nonce, kern_ids) self.txpool.retrieve_transactions(hash, nonce, kern_ids)
} }

View file

@ -129,7 +129,7 @@ pub struct PoolEntry {
} }
/// The possible states a pool entry can be in. /// The possible states a pool entry can be in.
#[derive(Clone, Debug, PartialEq)] #[derive(Clone, Copy, Debug, PartialEq)]
pub enum PoolEntryState { pub enum PoolEntryState {
/// A new entry, not yet processed. /// A new entry, not yet processed.
Fresh, Fresh,