From ab7a59b1c22ea5cbdfe0748dcd75fdcb196fea87 Mon Sep 17 00:00:00 2001 From: hashmap Date: Thu, 25 Oct 2018 14:21:36 +0200 Subject: [PATCH] Pool crate cleanup (#1835) * Refactor retrieve_transaction * Cleanup transaction_pool --- pool/src/pool.rs | 87 ++++++++++++++++-------------------- pool/src/transaction_pool.rs | 24 +++++----- pool/src/types.rs | 2 +- 3 files changed, 53 insertions(+), 60 deletions(-) diff --git a/pool/src/pool.rs b/pool/src/pool.rs index 8ade2a131..9bc473ac4 100644 --- a/pool/src/pool.rs +++ b/pool/src/pool.rs @@ -48,8 +48,8 @@ impl Pool { ) -> Pool { Pool { entries: vec![], - blockchain: chain.clone(), - verifier_cache: verifier_cache.clone(), + blockchain: chain, + verifier_cache, name, } } @@ -74,34 +74,34 @@ impl Pool { &self, hash: Hash, nonce: u64, - kern_ids: &Vec, + kern_ids: &[ShortId], ) -> (Vec, Vec) { - let mut rehashed = HashMap::new(); + let mut txs = vec![]; + let mut found_ids = vec![]; // Rehash all entries in the pool using short_ids based on provided hash and nonce. - for x in &self.entries { + 'outer: for x in &self.entries { for k in x.tx.kernels() { // rehash each kernel to calculate the block specific short_id let short_id = k.short_id(&hash, nonce); - rehashed.insert(short_id, x.tx.hash()); + if kern_ids.contains(&short_id) { + txs.push(x.tx.clone()); + found_ids.push(short_id); + } + if found_ids.len() == kern_ids.len() { + break 'outer; + } } } - - // Retrive the txs from the pool by the set of unique hashes. - let hashes: HashSet<_> = rehashed.values().collect(); - let txs = hashes.into_iter().filter_map(|x| self.get_tx(*x)).collect(); - - // Calculate the missing ids based on the ids passed in - // and the ids that successfully matched txs. - let matched_ids: HashSet<_> = rehashed.keys().collect(); - let all_ids: HashSet<_> = kern_ids.iter().collect(); - let missing_ids = all_ids - .difference(&matched_ids) - .map(|x| *x) - .cloned() - .collect(); - - (txs, missing_ids) + txs.dedup(); + ( + txs, + kern_ids + .into_iter() + .filter(|id| !found_ids.contains(id)) + .cloned() + .collect(), + ) } /// Take pool transactions, filtering and ordering them in a way that's @@ -171,10 +171,10 @@ impl Pool { } // Transition the specified pool entries to the new state. - pub fn transition_to_state(&mut self, txs: &Vec, state: PoolEntryState) { - for x in self.entries.iter_mut() { + pub fn transition_to_state(&mut self, txs: &[Transaction], state: PoolEntryState) { + for x in &mut self.entries { if txs.contains(&x.tx) { - x.state = state.clone(); + x.state = state; } } } @@ -214,9 +214,6 @@ impl Pool { // Validate aggregated tx against a known chain state. self.validate_raw_tx(&agg_tx, header)?; - // If we get here successfully then we can safely add the entry to the pool. - self.entries.push(entry.clone()); - debug!( "add_to_pool [{}]: {} ({}), in/out/kern: {}/{}/{}, pool: {} (at block {})", self.name, @@ -228,6 +225,8 @@ impl Pool { self.size(), header.hash(), ); + // If we get here successfully then we can safely add the entry to the pool. + self.entries.push(entry); Ok(()) } @@ -311,7 +310,7 @@ impl Pool { } for x in existing_entries { - let _ = self.add_to_pool(x.clone(), extra_txs.clone(), header); + let _ = self.add_to_pool(x, extra_txs.clone(), header); } Ok(()) @@ -352,20 +351,7 @@ impl Pool { tx_buckets } - // Filter txs in the pool based on the latest block. - // Reject any txs where we see a matching tx kernel in the block. - // Also reject any txs where we see a conflicting tx, - // where an input is spent in a different tx. - fn remaining_transactions(&self, block: &Block) -> Vec { - self.entries - .iter() - .filter(|x| !x.tx.kernels().iter().any(|y| block.kernels().contains(y))) - .filter(|x| !x.tx.inputs().iter().any(|y| block.inputs().contains(y))) - .map(|x| x.tx.clone()) - .collect() - } - - pub fn find_matching_transactions(&self, kernels: Vec) -> Vec { + pub fn find_matching_transactions(&self, kernels: &[TxKernel]) -> Vec { // While the inputs outputs can be cut-through the kernel will stay intact // In order to deaggregate tx we look for tx with the same kernel let mut found_txs = vec![]; @@ -375,7 +361,7 @@ impl Pool { // Check each transaction in the pool for entry in &self.entries { - let entry_kernel_set = entry.tx.kernels().iter().cloned().collect::>(); + let entry_kernel_set = entry.tx.kernels().iter().collect::>(); if entry_kernel_set.is_subset(&kernel_set) { found_txs.push(entry.tx.clone()); } @@ -385,10 +371,15 @@ impl Pool { /// Quick reconciliation step - we can evict any txs in the pool where /// inputs or kernels intersect with the block. - pub fn reconcile_block(&mut self, block: &Block) -> Result<(), PoolError> { - let candidate_txs = self.remaining_transactions(block); - self.entries.retain(|x| candidate_txs.contains(&x.tx)); - Ok(()) + pub fn reconcile_block(&mut self, block: &Block) { + // Filter txs in the pool based on the latest block. + // Reject any txs where we see a matching tx kernel in the block. + // Also reject any txs where we see a conflicting tx, + // where an input is spent in a different tx. + self.entries.retain(|x| { + !x.tx.kernels().iter().any(|y| block.kernels().contains(y)) + && !x.tx.inputs().iter().any(|y| block.inputs().contains(y)) + }); } pub fn size(&self) -> usize { diff --git a/pool/src/transaction_pool.rs b/pool/src/transaction_pool.rs index b05453b6c..063566d95 100644 --- a/pool/src/transaction_pool.rs +++ b/pool/src/transaction_pool.rs @@ -60,8 +60,12 @@ impl TransactionPool { ) -> TransactionPool { TransactionPool { config, - txpool: Pool::new(chain.clone(), verifier_cache.clone(), format!("txpool")), - stempool: Pool::new(chain.clone(), verifier_cache.clone(), format!("stempool")), + txpool: Pool::new(chain.clone(), verifier_cache.clone(), "txpool".to_string()), + stempool: Pool::new( + chain.clone(), + verifier_cache.clone(), + "stempool".to_string(), + ), reorg_cache: Arc::new(RwLock::new(VecDeque::new())), blockchain: chain, verifier_cache, @@ -76,7 +80,7 @@ impl TransactionPool { fn add_to_stempool(&mut self, entry: PoolEntry, header: &BlockHeader) -> Result<(), PoolError> { // Add tx to stempool (passing in all txs from txpool to validate against). self.stempool - .add_to_pool(entry.clone(), self.txpool.all_transactions(), header)?; + .add_to_pool(entry, self.txpool.all_transactions(), header)?; // Note: we do not notify the adapter here, // we let the dandelion monitor handle this. @@ -100,9 +104,7 @@ impl TransactionPool { ) -> Result<(), PoolError> { // First deaggregate the tx based on current txpool txs. if entry.tx.kernels().len() > 1 { - let txs = self - .txpool - .find_matching_transactions(entry.tx.kernels().clone()); + let txs = self.txpool.find_matching_transactions(entry.tx.kernels()); if !txs.is_empty() { let tx = transaction::deaggregate(entry.tx, txs)?; tx.validate(self.verifier_cache.clone())?; @@ -143,7 +145,7 @@ impl TransactionPool { // Make sure the transaction is valid before anything else. tx.validate(self.verifier_cache.clone()) - .map_err(|e| PoolError::InvalidTx(e))?; + .map_err(PoolError::InvalidTx)?; // Check the tx lock_time is valid based on current chain state. self.blockchain.verify_tx_lock_height(&tx)?; @@ -155,7 +157,7 @@ impl TransactionPool { state: PoolEntryState::Fresh, src, tx_at: Utc::now(), - tx: tx.clone(), + tx, }; if stem { @@ -182,7 +184,7 @@ impl TransactionPool { /// provided block. pub fn reconcile_block(&mut self, block: &Block) -> Result<(), PoolError> { // First reconcile the txpool. - self.txpool.reconcile_block(block)?; + self.txpool.reconcile_block(block); self.txpool.reconcile(None, &block.header)?; // Take our "reorg_cache" and see if this block means @@ -190,7 +192,7 @@ impl TransactionPool { self.reconcile_reorg_cache(&block.header)?; // Now reconcile our stempool, accounting for the updated txpool txs. - self.stempool.reconcile_block(block)?; + self.stempool.reconcile_block(block); { let txpool_tx = self.txpool.aggregate_transaction()?; self.stempool.reconcile(txpool_tx, &block.header)?; @@ -206,7 +208,7 @@ impl TransactionPool { &self, hash: Hash, nonce: u64, - kern_ids: &Vec, + kern_ids: &[ShortId], ) -> (Vec, Vec) { self.txpool.retrieve_transactions(hash, nonce, kern_ids) } diff --git a/pool/src/types.rs b/pool/src/types.rs index d3b1be73f..d51319e01 100644 --- a/pool/src/types.rs +++ b/pool/src/types.rs @@ -129,7 +129,7 @@ pub struct PoolEntry { } /// The possible states a pool entry can be in. -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Copy, Debug, PartialEq)] pub enum PoolEntryState { /// A new entry, not yet processed. Fresh,