From 82a467ac3c4f9e1caf17540c72f2a8b584817484 Mon Sep 17 00:00:00 2001 From: Antioch Peverell <30642645+antiochp@users.noreply.github.com> Date: Mon, 20 Aug 2018 14:48:05 +0100 Subject: [PATCH] Fix txpool race condition (#1385) * fix txpool race condition * rustfmt * fixup pool tests * rustfmt * rebase against master, pool tests passing --- api/src/handlers.rs | 13 +++--- chain/src/chain.rs | 9 +++- chain/src/error.rs | 2 +- pool/src/pool.rs | 32 ++++++++++---- pool/src/transaction_pool.rs | 22 ++++----- pool/src/types.rs | 26 +++-------- pool/tests/block_building.rs | 42 ++++++++++++++---- pool/tests/block_reconciliation.rs | 53 ++++++++++++++++++---- pool/tests/coinbase_maturity.rs | 18 +++++--- pool/tests/common/mod.rs | 17 ++++++- pool/tests/transaction_pool.rs | 64 ++++++++++++++++++++------- servers/src/common/adapters.rs | 27 ++++++++--- servers/src/grin/dandelion_monitor.rs | 16 +++++-- servers/src/mining/mine_block.rs | 1 + wallet/src/libtx/build.rs | 2 +- wallet/src/libwallet/api.rs | 29 ++++++++++-- wallet/src/libwallet/controller.rs | 5 ++- 17 files changed, 275 insertions(+), 103 deletions(-) diff --git a/api/src/handlers.rs b/api/src/handlers.rs index 7b2c2c249..8def8fa0f 100644 --- a/api/src/handlers.rs +++ b/api/src/handlers.rs @@ -116,8 +116,6 @@ impl OutputHandler { } } - debug!(LOGGER, "outputs_by_ids: {:?}", commitments); - let mut outputs: Vec = vec![]; for x in commitments { if let Ok(output) = self.get_output(&x) { @@ -738,7 +736,8 @@ where }; info!( LOGGER, - "Pushing transaction, inputs: {}, outputs: {}, kernels: {}, to pool.", + "Pushing transaction {} to pool (inputs: {}, outputs: {}, kernels: {})", + tx.hash(), tx.inputs().len(), tx.outputs().len(), tx.kernels().len(), @@ -746,9 +745,13 @@ where // Push to tx pool. let mut tx_pool = pool_arc.write().unwrap(); + let header = tx_pool.blockchain.chain_head().unwrap(); tx_pool - .add_to_pool(source, tx, !fluff) - .map_err(|_| ErrorKind::RequestError("Bad request".to_owned()).into()) + .add_to_pool(source, tx, !fluff, &header.hash()) + .map_err(|e| { + error!(LOGGER, "update_pool: failed with error: {:?}", e); + ErrorKind::RequestError("Bad request".to_owned()).into() + }) }), ) } diff --git a/chain/src/chain.rs b/chain/src/chain.rs index ce90523fc..58e3d0008 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -399,7 +399,8 @@ impl Chain { Ok(bh.height + 1) } - /// Validate a vec of "raw" transactions against the current chain state. + /// Validate a vec of "raw" transactions against a known chain state + /// at the block with the specified block hash. /// Specifying a "pre_tx" if we need to adjust the state, for example when /// validating the txs in the stempool we adjust the state based on the /// txpool. @@ -407,9 +408,15 @@ impl Chain { &self, txs: Vec, pre_tx: Option, + block_hash: &Hash, ) -> Result, Error> { + // Get headers so we can rewind chain state correctly. + let header = self.store.get_block_header(block_hash)?; + let head_header = self.store.head_header()?; + let mut txhashset = self.txhashset.write().unwrap(); txhashset::extending_readonly(&mut txhashset, |extension| { + extension.rewind(&header, &head_header)?; let valid_txs = extension.validate_raw_txs(txs, pre_tx)?; Ok(valid_txs) }) diff --git a/chain/src/error.rs b/chain/src/error.rs index 8d2c676cc..8f1cb3c85 100644 --- a/chain/src/error.rs +++ b/chain/src/error.rs @@ -79,7 +79,7 @@ pub enum ErrorKind { #[fail(display = "Already Spent: {:?}", _0)] AlreadySpent(Commitment), /// An output with that commitment already exists (should be unique) - #[fail(display = "Dupliate Commitment: {:?}", _0)] + #[fail(display = "Duplicate Commitment: {:?}", _0)] DuplicateCommitment(Commitment), /// Attempt to spend a coinbase output before it sufficiently matures. #[fail(display = "Attempt to spend immature coinbase")] diff --git a/pool/src/pool.rs b/pool/src/pool.rs index 185908a68..d8cac9c39 100644 --- a/pool/src/pool.rs +++ b/pool/src/pool.rs @@ -19,7 +19,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; use core::consensus; -use core::core::hash::Hashed; +use core::core::hash::{Hash, Hashed}; use core::core::id::ShortIdentifiable; use core::core::transaction; use core::core::{Block, CompactBlock, Transaction, TxKernel}; @@ -80,6 +80,8 @@ where /// transactions, orders by fee over weight and ensures to total weight /// doesn't exceed block limits. pub fn prepare_mineable_transactions(&self) -> Vec { + let header = self.blockchain.chain_head().unwrap(); + let tx_buckets = self.bucket_transactions(); // flatten buckets using aggregate (with cut-through) @@ -105,7 +107,7 @@ where // make sure those txs are all valid together, no Error is expected // when passing None self.blockchain - .validate_raw_txs(flat_txs, None) + .validate_raw_txs(flat_txs, None, &header.hash()) .expect("should never happen") } @@ -128,6 +130,7 @@ where from_state: PoolEntryState, to_state: PoolEntryState, extra_tx: Option, + block_hash: &Hash, ) -> Result, PoolError> { let entries = &mut self .entries @@ -139,7 +142,9 @@ where if candidate_txs.is_empty() { return Ok(vec![]); } - let valid_txs = self.blockchain.validate_raw_txs(candidate_txs, extra_tx)?; + let valid_txs = self + .blockchain + .validate_raw_txs(candidate_txs, extra_tx, block_hash)?; // Update state on all entries included in final vec of valid txs. for x in &mut entries.iter_mut() { @@ -158,16 +163,18 @@ where &mut self, entry: PoolEntry, extra_txs: Vec, + block_hash: &Hash, ) -> Result<(), PoolError> { debug!( LOGGER, - "pool [{}]: add_to_pool: {}, {:?}, inputs: {}, outputs: {}, kernels: {}", + "pool [{}]: add_to_pool: {}, {:?}, inputs: {}, outputs: {}, kernels: {} (at block {})", self.name, entry.tx.hash(), entry.src, entry.tx.inputs().len(), entry.tx.outputs().len(), entry.tx.kernels().len(), + block_hash, ); // Combine all the txs from the pool with any extra txs provided. @@ -184,9 +191,10 @@ where transaction::aggregate(txs, None)? }; - // Validate aggregated tx against the current chain state (via txhashset + // Validate aggregated tx against a known chain state (via txhashset // extension). - self.blockchain.validate_raw_txs(vec![], Some(agg_tx))?; + self.blockchain + .validate_raw_txs(vec![], Some(agg_tx), block_hash)?; // If we get here successfully then we can safely add the entry to the pool. self.entries.push(entry); @@ -194,7 +202,11 @@ where Ok(()) } - pub fn reconcile(&mut self, extra_tx: Option) -> Result<(), PoolError> { + pub fn reconcile( + &mut self, + extra_tx: Option, + block_hash: &Hash, + ) -> Result<(), PoolError> { let candidate_txs = self.all_transactions(); let existing_len = candidate_txs.len(); @@ -203,8 +215,10 @@ where } // Go through the candidate txs and keep everything that validates incrementally - // against the current chain state, accounting for the "extra tx" as necessary. - let valid_txs = self.blockchain.validate_raw_txs(candidate_txs, extra_tx)?; + // against a known chain state, accounting for the "extra tx" as necessary. + let valid_txs = self + .blockchain + .validate_raw_txs(candidate_txs, extra_tx, block_hash)?; self.entries.retain(|x| valid_txs.contains(&x.tx)); debug!( diff --git a/pool/src/transaction_pool.rs b/pool/src/transaction_pool.rs index 4150ec71b..ea06ea0d3 100644 --- a/pool/src/transaction_pool.rs +++ b/pool/src/transaction_pool.rs @@ -20,11 +20,10 @@ use chrono::prelude::Utc; use std::sync::Arc; -use core::core::hash::Hashed; +use core::core::hash::Hash; use core::core::{transaction, Block, CompactBlock, Transaction}; use pool::Pool; use types::{BlockChain, PoolAdapter, PoolConfig, PoolEntry, PoolEntryState, PoolError, TxSource}; -use util::LOGGER; /// Transaction pool implementation. pub struct TransactionPool { @@ -57,17 +56,17 @@ where } } - fn add_to_stempool(&mut self, entry: PoolEntry) -> Result<(), PoolError> { + fn add_to_stempool(&mut self, entry: PoolEntry, block_hash: &Hash) -> Result<(), PoolError> { // Add tx to stempool (passing in all txs from txpool to validate against). self.stempool - .add_to_pool(entry.clone(), self.txpool.all_transactions())?; + .add_to_pool(entry.clone(), self.txpool.all_transactions(), block_hash)?; // Note: we do not notify the adapter here, // we let the dandelion monitor handle this. Ok(()) } - fn add_to_txpool(&mut self, mut entry: PoolEntry) -> Result<(), PoolError> { + fn add_to_txpool(&mut self, mut entry: PoolEntry, block_hash: &Hash) -> Result<(), PoolError> { // First deaggregate the tx based on current txpool txs. if entry.tx.kernels().len() > 1 { let txs = self @@ -78,12 +77,12 @@ where entry.src.debug_name = "deagg".to_string(); } } - self.txpool.add_to_pool(entry.clone(), vec![])?; + self.txpool.add_to_pool(entry.clone(), vec![], block_hash)?; // We now need to reconcile the stempool based on the new state of the txpool. // Some stempool txs may no longer be valid and we need to evict them. let txpool_tx = self.txpool.aggregate_transaction()?; - self.stempool.reconcile(txpool_tx)?; + self.stempool.reconcile(txpool_tx, block_hash)?; self.adapter.tx_accepted(&entry.tx); Ok(()) @@ -96,6 +95,7 @@ where src: TxSource, tx: Transaction, stem: bool, + block_hash: &Hash, ) -> Result<(), PoolError> { // Do we have the capacity to accept this transaction? self.is_acceptable(&tx)?; @@ -117,9 +117,9 @@ where }; if stem { - self.add_to_stempool(entry)?; + self.add_to_stempool(entry, block_hash)?; } else { - self.add_to_txpool(entry)?; + self.add_to_txpool(entry, block_hash)?; } Ok(()) } @@ -129,12 +129,12 @@ where pub fn reconcile_block(&mut self, block: &Block) -> Result<(), PoolError> { // First reconcile the txpool. self.txpool.reconcile_block(block)?; - self.txpool.reconcile(None)?; + self.txpool.reconcile(None, &block.hash())?; // Then reconcile the stempool, accounting for the txpool txs. let txpool_tx = self.txpool.aggregate_transaction()?; self.stempool.reconcile_block(block)?; - self.stempool.reconcile(txpool_tx)?; + self.stempool.reconcile(txpool_tx, &block.hash())?; Ok(()) } diff --git a/pool/src/types.rs b/pool/src/types.rs index 5e7122a5d..8dcdec9dd 100644 --- a/pool/src/types.rs +++ b/pool/src/types.rs @@ -15,10 +15,11 @@ //! The primary module containing the implementations of the transaction pool //! and its top-level members. -use std::{error, fmt}; use chrono::prelude::{DateTime, Utc}; use core::consensus; +use core::core::block::BlockHeader; +use core::core::hash::Hash; use core::core::transaction::{self, Transaction}; /// Dandelion relay timer @@ -183,30 +184,15 @@ impl From for PoolError { } } -impl error::Error for PoolError { - fn description(&self) -> &str { - match *self { - _ => "some kind of pool error", - } - } -} - -impl fmt::Display for PoolError { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match *self { - _ => write!(f, "some kind of pool error"), - } - } -} - /// Interface that the pool requires from a blockchain implementation. pub trait BlockChain { - /// Validate a vec of txs against the current chain state after applying the - /// pre_tx to the chain state. + /// Validate a vec of txs against known chain state at specific block + /// after applying the pre_tx to the chain state. fn validate_raw_txs( &self, txs: Vec, pre_tx: Option, + block_hash: &Hash, ) -> Result, PoolError>; /// Verify any coinbase outputs being spent @@ -216,6 +202,8 @@ pub trait BlockChain { /// Verify any coinbase outputs being spent /// have matured sufficiently. fn verify_tx_lock_height(&self, tx: &transaction::Transaction) -> Result<(), PoolError>; + + fn chain_head(&self) -> Result; } /// Bridge between the transaction pool and the rest of the system. Handles diff --git a/pool/tests/block_building.rs b/pool/tests/block_building.rs index f3c40a803..24dac6158 100644 --- a/pool/tests/block_building.rs +++ b/pool/tests/block_building.rs @@ -31,6 +31,7 @@ use core::core::{Block, BlockHeader}; use chain::txhashset; use chain::types::Tip; +use core::core::hash::Hashed; use core::core::target::Difficulty; use keychain::{ExtKeychain, Keychain}; @@ -52,12 +53,23 @@ fn test_transaction_pool_block_building() { let add_block = |height, txs| { let key_id = keychain.derive_key_id(height as u32).unwrap(); let reward = libtx::reward::output(&keychain, &key_id, 0, height).unwrap(); - let block = Block::new(&BlockHeader::default(), txs, Difficulty::one(), reward).unwrap(); + let mut block = Block::new(&BlockHeader::default(), txs, Difficulty::one(), reward).unwrap(); let mut txhashset = chain.txhashset.write().unwrap(); let mut batch = chain.store.batch().unwrap(); txhashset::extending(&mut txhashset, &mut batch, |extension| { - extension.apply_block(&block) + extension.apply_block(&block)?; + + // Now set the roots and sizes as necessary on the block header. + let roots = extension.roots(); + block.header.output_root = roots.output_root; + block.header.range_proof_root = roots.rproof_root; + block.header.kernel_root = roots.kernel_root; + let sizes = extension.sizes(); + block.header.output_mmr_size = sizes.0; + block.header.kernel_mmr_size = sizes.2; + + Ok(()) }).unwrap(); let tip = Tip::from_block(&block.header); @@ -91,21 +103,21 @@ fn test_transaction_pool_block_building() { // Add the three root txs to the pool. write_pool - .add_to_pool(test_source(), root_tx_1, false) + .add_to_pool(test_source(), root_tx_1, false, &header.hash()) .unwrap(); write_pool - .add_to_pool(test_source(), root_tx_2, false) + .add_to_pool(test_source(), root_tx_2, false, &header.hash()) .unwrap(); write_pool - .add_to_pool(test_source(), root_tx_3, false) + .add_to_pool(test_source(), root_tx_3, false, &header.hash()) .unwrap(); // Now add the two child txs to the pool. write_pool - .add_to_pool(test_source(), child_tx_1.clone(), false) + .add_to_pool(test_source(), child_tx_1.clone(), false, &header.hash()) .unwrap(); write_pool - .add_to_pool(test_source(), child_tx_2.clone(), false) + .add_to_pool(test_source(), child_tx_2.clone(), false, &header.hash()) .unwrap(); assert_eq!(write_pool.total_size(), 5); @@ -118,7 +130,7 @@ fn test_transaction_pool_block_building() { // children should have been aggregated into parents assert_eq!(txs.len(), 3); - let block = { + let mut block = { let key_id = keychain.derive_key_id(2).unwrap(); let fees = txs.iter().map(|tx| tx.fee()).sum(); let reward = libtx::reward::output(&keychain, &key_id, fees, 0).unwrap(); @@ -130,8 +142,22 @@ fn test_transaction_pool_block_building() { let mut txhashset = chain.txhashset.write().unwrap(); txhashset::extending(&mut txhashset, &mut batch, |extension| { extension.apply_block(&block)?; + + // Now set the roots and sizes as necessary on the block header. + let roots = extension.roots(); + block.header.output_root = roots.output_root; + block.header.range_proof_root = roots.rproof_root; + block.header.kernel_root = roots.kernel_root; + let sizes = extension.sizes(); + block.header.output_mmr_size = sizes.0; + block.header.kernel_mmr_size = sizes.2; + Ok(()) }).unwrap(); + + let tip = Tip::from_block(&block.header); + batch.save_block_header(&block.header).unwrap(); + batch.save_head(&tip).unwrap(); batch.commit().unwrap(); } diff --git a/pool/tests/block_reconciliation.rs b/pool/tests/block_reconciliation.rs index 7fe4cca69..b06d3c562 100644 --- a/pool/tests/block_reconciliation.rs +++ b/pool/tests/block_reconciliation.rs @@ -20,19 +20,22 @@ extern crate grin_pool as pool; extern crate grin_util as util; extern crate grin_wallet as wallet; -extern crate rand; extern crate chrono; +extern crate rand; pub mod common; use std::sync::{Arc, RwLock}; +use core::core::hash::Hashed; use core::core::{Block, BlockHeader}; +use chain::txhashset; use chain::types::Tip; -use chain::{txhashset, ChainStore}; -use common::{clean_output_dir, test_setup, test_source, test_transaction, - test_transaction_spending_coinbase, ChainAdapter}; +use common::{ + clean_output_dir, test_setup, test_source, test_transaction, + test_transaction_spending_coinbase, ChainAdapter, +}; use core::core::target::Difficulty; use keychain::{ExtKeychain, Keychain}; use wallet::libtx; @@ -51,12 +54,24 @@ fn test_transaction_pool_block_reconciliation() { let height = 1; let key_id = keychain.derive_key_id(height as u32).unwrap(); let reward = libtx::reward::output(&keychain, &key_id, 0, height).unwrap(); - let block = Block::new(&BlockHeader::default(), vec![], Difficulty::one(), reward).unwrap(); + let mut block = + Block::new(&BlockHeader::default(), vec![], Difficulty::one(), reward).unwrap(); let mut batch = chain.store.batch().unwrap(); let mut txhashset = chain.txhashset.write().unwrap(); txhashset::extending(&mut txhashset, &mut batch, |extension| { - extension.apply_block(&block) + extension.apply_block(&block)?; + + // Now set the roots and sizes as necessary on the block header. + let roots = extension.roots(); + block.header.output_root = roots.output_root; + block.header.range_proof_root = roots.rproof_root; + block.header.kernel_root = roots.kernel_root; + let sizes = extension.sizes(); + block.header.output_mmr_size = sizes.0; + block.header.kernel_mmr_size = sizes.2; + + Ok(()) }).unwrap(); let tip = Tip::from_block(&block.header); @@ -78,13 +93,23 @@ fn test_transaction_pool_block_reconciliation() { let key_id = keychain.derive_key_id(2).unwrap(); let fees = initial_tx.fee(); let reward = libtx::reward::output(&keychain, &key_id, fees, 0).unwrap(); - let block = Block::new(&header, vec![initial_tx], Difficulty::one(), reward).unwrap(); + let mut block = Block::new(&header, vec![initial_tx], Difficulty::one(), reward).unwrap(); let mut batch = chain.store.batch().unwrap(); { let mut txhashset = chain.txhashset.write().unwrap(); txhashset::extending(&mut txhashset, &mut batch, |extension| { extension.apply_block(&block)?; + + // Now set the roots and sizes as necessary on the block header. + let roots = extension.roots(); + block.header.output_root = roots.output_root; + block.header.range_proof_root = roots.rproof_root; + block.header.kernel_root = roots.kernel_root; + let sizes = extension.sizes(); + block.header.output_mmr_size = sizes.0; + block.header.kernel_mmr_size = sizes.2; + Ok(()) }).unwrap(); } @@ -153,7 +178,7 @@ fn test_transaction_pool_block_reconciliation() { for tx in &txs_to_add { write_pool - .add_to_pool(test_source(), tx.clone(), false) + .add_to_pool(test_source(), tx.clone(), false, &header.hash()) .unwrap(); } @@ -177,13 +202,23 @@ fn test_transaction_pool_block_reconciliation() { let key_id = keychain.derive_key_id(3).unwrap(); let fees = block_txs.iter().map(|tx| tx.fee()).sum(); let reward = libtx::reward::output(&keychain, &key_id, fees, 0).unwrap(); - let block = Block::new(&header, block_txs, Difficulty::one(), reward).unwrap(); + let mut block = Block::new(&header, block_txs, Difficulty::one(), reward).unwrap(); { let mut batch = chain.store.batch().unwrap(); let mut txhashset = chain.txhashset.write().unwrap(); txhashset::extending(&mut txhashset, &mut batch, |extension| { extension.apply_block(&block)?; + + // Now set the roots and sizes as necessary on the block header. + let roots = extension.roots(); + block.header.output_root = roots.output_root; + block.header.range_proof_root = roots.rproof_root; + block.header.kernel_root = roots.kernel_root; + let sizes = extension.sizes(); + block.header.output_mmr_size = sizes.0; + block.header.kernel_mmr_size = sizes.2; + Ok(()) }).unwrap(); batch.commit().unwrap(); diff --git a/pool/tests/coinbase_maturity.rs b/pool/tests/coinbase_maturity.rs index 3bfc12855..3f9c00ca4 100644 --- a/pool/tests/coinbase_maturity.rs +++ b/pool/tests/coinbase_maturity.rs @@ -20,18 +20,19 @@ extern crate grin_pool as pool; extern crate grin_util as util; extern crate grin_wallet as wallet; -extern crate rand; extern crate chrono; +extern crate rand; pub mod common; use std::sync::{Arc, RwLock}; use common::{test_source, test_transaction}; -use core::core::Transaction; +use core::core::hash::Hash; +use core::core::{BlockHeader, Transaction}; use keychain::{ExtKeychain, Keychain}; -use pool::TransactionPool; use pool::types::{BlockChain, NoopAdapter, PoolConfig, PoolError}; +use pool::TransactionPool; pub fn test_setup( chain: &Arc, @@ -56,14 +57,17 @@ impl CoinbaseMaturityErrorChainAdapter { } impl BlockChain for CoinbaseMaturityErrorChainAdapter { + fn chain_head(&self) -> Result { + unimplemented!(); + } + fn validate_raw_txs( &self, _txs: Vec, _pre_tx: Option, + _block_hash: &Hash, ) -> Result, PoolError> { - Err(PoolError::Other( - "not implemented, not a real chain adapter...".to_string(), - )) + unimplemented!(); } // Returns an ImmatureCoinbase for every tx we pass in. @@ -90,7 +94,7 @@ fn test_coinbase_maturity() { { let mut write_pool = pool.write().unwrap(); let tx = test_transaction(&keychain, vec![50], vec![49]); - match write_pool.add_to_pool(test_source(), tx.clone(), true) { + match write_pool.add_to_pool(test_source(), tx.clone(), true, &Hash::default()) { Err(PoolError::ImmatureCoinbase) => {} _ => panic!("Expected an immature coinbase error here."), } diff --git a/pool/tests/common/mod.rs b/pool/tests/common/mod.rs index 159228dca..dd05b0135 100644 --- a/pool/tests/common/mod.rs +++ b/pool/tests/common/mod.rs @@ -29,13 +29,12 @@ extern crate rand; use std::fs; use std::sync::{Arc, RwLock}; +use core::core::hash::Hash; use core::core::{BlockHeader, Transaction}; use chain::store::ChainStore; use chain::txhashset; use chain::txhashset::TxHashSet; -use core::core::hash::Hashed; -use core::core::merkle_proof::MerkleProof; use pool::*; use keychain::Keychain; @@ -68,13 +67,27 @@ impl ChainAdapter { } impl BlockChain for ChainAdapter { + fn chain_head(&self) -> Result { + self.store + .head_header() + .map_err(|_| PoolError::Other(format!("failed to get chain head"))) + } + fn validate_raw_txs( &self, txs: Vec, pre_tx: Option, + block_hash: &Hash, ) -> Result, PoolError> { + let header = self + .store + .get_block_header(&block_hash) + .map_err(|_| PoolError::Other(format!("failed to get header")))?; + let head_header = self.chain_head()?; + let mut txhashset = self.txhashset.write().unwrap(); let res = txhashset::extending_readonly(&mut txhashset, |extension| { + extension.rewind(&header, &head_header)?; let valid_txs = extension.validate_raw_txs(txs, pre_tx)?; Ok(valid_txs) }).map_err(|e| PoolError::Other(format!("Error: test chain adapter: {:?}", e)))?; diff --git a/pool/tests/transaction_pool.rs b/pool/tests/transaction_pool.rs index aa52f7ac6..510ae2106 100644 --- a/pool/tests/transaction_pool.rs +++ b/pool/tests/transaction_pool.rs @@ -27,12 +27,13 @@ pub mod common; use std::sync::{Arc, RwLock}; +use chain::txhashset; use chain::types::Tip; -use chain::{txhashset, ChainStore}; use common::{ clean_output_dir, test_setup, test_source, test_transaction, test_transaction_spending_coinbase, ChainAdapter, }; +use core::core::hash::Hashed; use core::core::target::Difficulty; use core::core::{transaction, Block, BlockHeader}; use keychain::{ExtKeychain, Keychain}; @@ -53,12 +54,24 @@ fn test_the_transaction_pool() { let height = 1; let key_id = keychain.derive_key_id(height as u32).unwrap(); let reward = libtx::reward::output(&keychain, &key_id, 0, height).unwrap(); - let block = Block::new(&BlockHeader::default(), vec![], Difficulty::one(), reward).unwrap(); + let mut block = + Block::new(&BlockHeader::default(), vec![], Difficulty::one(), reward).unwrap(); let mut txhashset = chain.txhashset.write().unwrap(); let mut batch = chain.store.batch().unwrap(); txhashset::extending(&mut txhashset, &mut batch, |extension| { - extension.apply_block(&block) + extension.apply_block(&block)?; + + // Now set the roots and sizes as necessary on the block header. + let roots = extension.roots(); + block.header.output_root = roots.output_root; + block.header.range_proof_root = roots.rproof_root; + block.header.kernel_root = roots.kernel_root; + let sizes = extension.sizes(); + block.header.output_mmr_size = sizes.0; + block.header.kernel_mmr_size = sizes.2; + + Ok(()) }).unwrap(); let tip = Tip::from_block(&block.header); @@ -86,7 +99,7 @@ fn test_the_transaction_pool() { { let mut write_pool = pool.write().unwrap(); write_pool - .add_to_pool(test_source(), initial_tx, false) + .add_to_pool(test_source(), initial_tx, false, &header.hash()) .unwrap(); assert_eq!(write_pool.total_size(), 1); } @@ -105,14 +118,14 @@ fn test_the_transaction_pool() { // First, add a simple tx to the pool in "stem" mode. write_pool - .add_to_pool(test_source(), tx1.clone(), true) + .add_to_pool(test_source(), tx1.clone(), true, &header.hash()) .unwrap(); assert_eq!(write_pool.total_size(), 1); assert_eq!(write_pool.stempool.size(), 1); // Add another tx spending outputs from the previous tx. write_pool - .add_to_pool(test_source(), tx2.clone(), true) + .add_to_pool(test_source(), tx2.clone(), true, &header.hash()) .unwrap(); assert_eq!(write_pool.total_size(), 1); assert_eq!(write_pool.stempool.size(), 2); @@ -125,7 +138,7 @@ fn test_the_transaction_pool() { let mut write_pool = pool.write().unwrap(); assert!( write_pool - .add_to_pool(test_source(), tx1.clone(), true) + .add_to_pool(test_source(), tx1.clone(), true, &header.hash()) .is_err() ); } @@ -135,14 +148,22 @@ fn test_the_transaction_pool() { { let tx1a = test_transaction(&keychain, vec![500, 600], vec![499, 599]); let mut write_pool = pool.write().unwrap(); - assert!(write_pool.add_to_pool(test_source(), tx1a, true).is_err()); + assert!( + write_pool + .add_to_pool(test_source(), tx1a, true, &header.hash()) + .is_err() + ); } // Test adding a tx attempting to spend a non-existent output. { let bad_tx = test_transaction(&keychain, vec![10_001], vec![10_000]); let mut write_pool = pool.write().unwrap(); - assert!(write_pool.add_to_pool(test_source(), bad_tx, true).is_err()); + assert!( + write_pool + .add_to_pool(test_source(), bad_tx, true, &header.hash()) + .is_err() + ); } // Test adding a tx that would result in a duplicate output (conflicts with @@ -152,14 +173,22 @@ fn test_the_transaction_pool() { { let tx = test_transaction(&keychain, vec![900], vec![498]); let mut write_pool = pool.write().unwrap(); - assert!(write_pool.add_to_pool(test_source(), tx, true).is_err()); + assert!( + write_pool + .add_to_pool(test_source(), tx, true, &header.hash()) + .is_err() + ); } // Confirm the tx pool correctly identifies an invalid tx (already spent). { let mut write_pool = pool.write().unwrap(); let tx3 = test_transaction(&keychain, vec![500], vec![497]); - assert!(write_pool.add_to_pool(test_source(), tx3, true).is_err()); + assert!( + write_pool + .add_to_pool(test_source(), tx3, true, &header.hash()) + .is_err() + ); assert_eq!(write_pool.total_size(), 1); assert_eq!(write_pool.stempool.size(), 2); } @@ -175,7 +204,7 @@ fn test_the_transaction_pool() { .unwrap(); assert_eq!(agg_tx.kernels().len(), 2); write_pool - .add_to_pool(test_source(), agg_tx, false) + .add_to_pool(test_source(), agg_tx, false, &header.hash()) .unwrap(); assert_eq!(write_pool.total_size(), 2); } @@ -192,7 +221,7 @@ fn test_the_transaction_pool() { // tx4 is the "new" part of this aggregated tx that we care about let agg_tx = transaction::aggregate(vec![tx1.clone(), tx2.clone(), tx4], None).unwrap(); write_pool - .add_to_pool(test_source(), agg_tx, false) + .add_to_pool(test_source(), agg_tx, false, &header.hash()) .unwrap(); assert_eq!(write_pool.total_size(), 3); let entry = write_pool.txpool.entries.last().unwrap(); @@ -211,14 +240,19 @@ fn test_the_transaction_pool() { // check we cannot add a double spend to the stempool assert!( write_pool - .add_to_pool(test_source(), double_spend_tx.clone(), true) + .add_to_pool(test_source(), double_spend_tx.clone(), true, &header.hash()) .is_err() ); // check we cannot add a double spend to the txpool assert!( write_pool - .add_to_pool(test_source(), double_spend_tx.clone(), false) + .add_to_pool( + test_source(), + double_spend_tx.clone(), + false, + &header.hash() + ) .is_err() ); } diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index 22c741573..b9ac25412 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -78,12 +78,13 @@ impl p2p::ChainAdapter for NetToChainAdapter { identifier: "?.?.?.?".to_string(), }; - let h = tx.hash(); + let tx_hash = tx.hash(); + let block_hash = w(&self.chain).head_header().unwrap().hash(); debug!( LOGGER, "Received tx {}, inputs: {}, outputs: {}, kernels: {}, going to process.", - h, + tx_hash, tx.inputs().len(), tx.outputs().len(), tx.kernels().len(), @@ -91,11 +92,11 @@ impl p2p::ChainAdapter for NetToChainAdapter { let res = { let mut tx_pool = self.tx_pool.write().unwrap(); - tx_pool.add_to_pool(source, tx, stem) + tx_pool.add_to_pool(source, tx, stem, &block_hash) }; if let Err(e) = res { - debug!(LOGGER, "Transaction {} rejected: {:?}", h, e); + debug!(LOGGER, "Transaction {} rejected: {:?}", tx_hash, e); } } @@ -738,14 +739,26 @@ impl PoolToChainAdapter { } impl pool::BlockChain for PoolToChainAdapter { + fn chain_head(&self) -> Result { + wo(&self.chain).head_header().map_err(|e| { + pool::PoolError::Other(format!( + "Chain adapter failed to retrieve chain head: {:?}", + e + )) + }) + } + fn validate_raw_txs( &self, txs: Vec, pre_tx: Option, + block_hash: &Hash, ) -> Result<(Vec), pool::PoolError> { - wo(&self.chain).validate_raw_txs(txs, pre_tx).map_err(|_| { - pool::PoolError::Other("Chain adapter failed to validate_raw_txs.".to_string()) - }) + wo(&self.chain) + .validate_raw_txs(txs, pre_tx, block_hash) + .map_err(|e| { + pool::PoolError::Other(format!("Chain adapter failed to validate_raw_txs: {:?}", e)) + }) } fn verify_coinbase_maturity(&self, tx: &Transaction) -> Result<(), pool::PoolError> { diff --git a/servers/src/grin/dandelion_monitor.rs b/servers/src/grin/dandelion_monitor.rs index 8bd042c11..e1f400dde 100644 --- a/servers/src/grin/dandelion_monitor.rs +++ b/servers/src/grin/dandelion_monitor.rs @@ -12,12 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use chrono::prelude::Utc; use rand::{self, Rng}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; use std::thread; use std::time::Duration; -use chrono::prelude::{Utc}; use core::core::hash::Hashed; use core::core::transaction; @@ -90,11 +90,14 @@ where { let mut tx_pool = tx_pool.write().unwrap(); + let header = tx_pool.blockchain.chain_head()?; + let txpool_tx = tx_pool.txpool.aggregate_transaction()?; let stem_txs = tx_pool.stempool.select_valid_transactions( PoolEntryState::ToStem, PoolEntryState::Stemmed, txpool_tx, + &header.hash(), )?; if stem_txs.len() > 0 { @@ -118,7 +121,7 @@ where identifier: "?.?.?.?".to_string(), }; - tx_pool.add_to_pool(src, agg_tx, false)?; + tx_pool.add_to_pool(src, agg_tx, false, &header.hash())?; } } Ok(()) @@ -130,11 +133,14 @@ where { let mut tx_pool = tx_pool.write().unwrap(); + let header = tx_pool.blockchain.chain_head()?; + let txpool_tx = tx_pool.txpool.aggregate_transaction()?; let stem_txs = tx_pool.stempool.select_valid_transactions( PoolEntryState::ToFluff, PoolEntryState::Fluffed, txpool_tx, + &header.hash(), )?; if stem_txs.len() > 0 { @@ -151,7 +157,7 @@ where identifier: "?.?.?.?".to_string(), }; - tx_pool.add_to_pool(src, agg_tx, false)?; + tx_pool.add_to_pool(src, agg_tx, false, &header.hash())?; } Ok(()) } @@ -231,12 +237,14 @@ where { let mut tx_pool = tx_pool.write().unwrap(); + let header = tx_pool.blockchain.chain_head()?; + for entry in expired_entries { let src = TxSource { debug_name: "embargo_expired".to_string(), identifier: "?.?.?.?".to_string(), }; - match tx_pool.add_to_pool(src, entry.tx, false) { + match tx_pool.add_to_pool(src, entry.tx, false, &header.hash()) { Ok(_) => debug!( LOGGER, "dand_mon: embargo expired, fluffed tx successfully." diff --git a/servers/src/mining/mine_block.rs b/servers/src/mining/mine_block.rs index 12ccdbefa..9c73cdbe3 100644 --- a/servers/src/mining/mine_block.rs +++ b/servers/src/mining/mine_block.rs @@ -167,6 +167,7 @@ fn build_block( b.header.clone().total_difficulty.to_num(), ); + // Now set txhashset roots and sizes on the header of the block being built. let roots_result = chain.set_txhashset_roots(&mut b, false); match roots_result { diff --git a/wallet/src/libtx/build.rs b/wallet/src/libtx/build.rs index d66c62297..7b50861c2 100644 --- a/wallet/src/libtx/build.rs +++ b/wallet/src/libtx/build.rs @@ -95,7 +95,7 @@ where move |build, (tx, kern, sum)| -> (Transaction, TxKernel, BlindSum) { let commit = build.keychain.commit(value, &key_id).unwrap(); - debug!(LOGGER, "Building an output: {}, {:?}", value, commit); + debug!(LOGGER, "Building output: {}, {:?}", value, commit); let rproof = proof::create(build.keychain, value, &key_id, commit, None).unwrap(); diff --git a/wallet/src/libwallet/api.rs b/wallet/src/libwallet/api.rs index f0874c2a6..889f9e99d 100644 --- a/wallet/src/libwallet/api.rs +++ b/wallet/src/libwallet/api.rs @@ -24,6 +24,7 @@ use std::sync::{Arc, Mutex}; use serde_json as json; +use core::core::hash::Hashed; use core::ser; use keychain::Keychain; use libtx::slate::Slate; @@ -321,8 +322,19 @@ where let mut w = self.wallet.lock().unwrap(); w.client().clone() }; - client.post_tx(&TxWrapper { tx_hex: tx_hex }, fluff)?; - Ok(()) + let res = client.post_tx(&TxWrapper { tx_hex: tx_hex }, fluff); + if let Err(e) = res { + error!(LOGGER, "api: post_tx: failed with error: {}", e); + Err(e) + } else { + debug!( + LOGGER, + "api: post_tx: successfully posted tx: {}, fluff? {}", + slate.tx.hash(), + fluff + ); + Ok(()) + } } /// Attempt to restore contents of wallet @@ -414,6 +426,17 @@ where w.open_with_credentials()?; let res = tx::receive_tx(&mut **w, slate); w.close()?; - res + + if let Err(e) = res { + error!(LOGGER, "api: receive_tx: failed with error: {}", e); + Err(e) + } else { + debug!( + LOGGER, + "api: receive_tx: successfully received tx: {}", + slate.tx.hash() + ); + Ok(()) + } } } diff --git a/wallet/src/libwallet/controller.rs b/wallet/src/libwallet/controller.rs index df7203e18..459fdcfac 100644 --- a/wallet/src/libwallet/controller.rs +++ b/wallet/src/libwallet/controller.rs @@ -395,7 +395,10 @@ where Box::new( parse_body(req).and_then(move |mut slate| match api.receive_tx(&mut slate) { Ok(_) => ok(slate.clone()), - Err(e) => err(e), + Err(e) => { + error!(LOGGER, "receive_tx: failed with error: {}", e); + err(e) + } }), ) }