Fix txpool race condition (#1385)

* fix txpool race condition

* rustfmt

* fixup pool tests

* rustfmt

* rebase against master, pool tests passing
This commit is contained in:
Antioch Peverell 2018-08-20 14:48:05 +01:00 committed by GitHub
parent ef4f426474
commit 82a467ac3c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 275 additions and 103 deletions

View file

@ -116,8 +116,6 @@ impl OutputHandler {
} }
} }
debug!(LOGGER, "outputs_by_ids: {:?}", commitments);
let mut outputs: Vec<Output> = vec![]; let mut outputs: Vec<Output> = vec![];
for x in commitments { for x in commitments {
if let Ok(output) = self.get_output(&x) { if let Ok(output) = self.get_output(&x) {
@ -738,7 +736,8 @@ where
}; };
info!( info!(
LOGGER, LOGGER,
"Pushing transaction, inputs: {}, outputs: {}, kernels: {}, to pool.", "Pushing transaction {} to pool (inputs: {}, outputs: {}, kernels: {})",
tx.hash(),
tx.inputs().len(), tx.inputs().len(),
tx.outputs().len(), tx.outputs().len(),
tx.kernels().len(), tx.kernels().len(),
@ -746,9 +745,13 @@ where
// Push to tx pool. // Push to tx pool.
let mut tx_pool = pool_arc.write().unwrap(); let mut tx_pool = pool_arc.write().unwrap();
let header = tx_pool.blockchain.chain_head().unwrap();
tx_pool tx_pool
.add_to_pool(source, tx, !fluff) .add_to_pool(source, tx, !fluff, &header.hash())
.map_err(|_| ErrorKind::RequestError("Bad request".to_owned()).into()) .map_err(|e| {
error!(LOGGER, "update_pool: failed with error: {:?}", e);
ErrorKind::RequestError("Bad request".to_owned()).into()
})
}), }),
) )
} }

View file

@ -399,7 +399,8 @@ impl Chain {
Ok(bh.height + 1) Ok(bh.height + 1)
} }
/// Validate a vec of "raw" transactions against the current chain state. /// Validate a vec of "raw" transactions against a known chain state
/// at the block with the specified block hash.
/// Specifying a "pre_tx" if we need to adjust the state, for example when /// Specifying a "pre_tx" if we need to adjust the state, for example when
/// validating the txs in the stempool we adjust the state based on the /// validating the txs in the stempool we adjust the state based on the
/// txpool. /// txpool.
@ -407,9 +408,15 @@ impl Chain {
&self, &self,
txs: Vec<Transaction>, txs: Vec<Transaction>,
pre_tx: Option<Transaction>, pre_tx: Option<Transaction>,
block_hash: &Hash,
) -> Result<Vec<Transaction>, Error> { ) -> Result<Vec<Transaction>, Error> {
// Get headers so we can rewind chain state correctly.
let header = self.store.get_block_header(block_hash)?;
let head_header = self.store.head_header()?;
let mut txhashset = self.txhashset.write().unwrap(); let mut txhashset = self.txhashset.write().unwrap();
txhashset::extending_readonly(&mut txhashset, |extension| { txhashset::extending_readonly(&mut txhashset, |extension| {
extension.rewind(&header, &head_header)?;
let valid_txs = extension.validate_raw_txs(txs, pre_tx)?; let valid_txs = extension.validate_raw_txs(txs, pre_tx)?;
Ok(valid_txs) Ok(valid_txs)
}) })

View file

@ -79,7 +79,7 @@ pub enum ErrorKind {
#[fail(display = "Already Spent: {:?}", _0)] #[fail(display = "Already Spent: {:?}", _0)]
AlreadySpent(Commitment), AlreadySpent(Commitment),
/// An output with that commitment already exists (should be unique) /// An output with that commitment already exists (should be unique)
#[fail(display = "Dupliate Commitment: {:?}", _0)] #[fail(display = "Duplicate Commitment: {:?}", _0)]
DuplicateCommitment(Commitment), DuplicateCommitment(Commitment),
/// Attempt to spend a coinbase output before it sufficiently matures. /// Attempt to spend a coinbase output before it sufficiently matures.
#[fail(display = "Attempt to spend immature coinbase")] #[fail(display = "Attempt to spend immature coinbase")]

View file

@ -19,7 +19,7 @@ use std::collections::{HashMap, HashSet};
use std::sync::Arc; use std::sync::Arc;
use core::consensus; use core::consensus;
use core::core::hash::Hashed; use core::core::hash::{Hash, Hashed};
use core::core::id::ShortIdentifiable; use core::core::id::ShortIdentifiable;
use core::core::transaction; use core::core::transaction;
use core::core::{Block, CompactBlock, Transaction, TxKernel}; use core::core::{Block, CompactBlock, Transaction, TxKernel};
@ -80,6 +80,8 @@ where
/// transactions, orders by fee over weight and ensures to total weight /// transactions, orders by fee over weight and ensures to total weight
/// doesn't exceed block limits. /// doesn't exceed block limits.
pub fn prepare_mineable_transactions(&self) -> Vec<Transaction> { pub fn prepare_mineable_transactions(&self) -> Vec<Transaction> {
let header = self.blockchain.chain_head().unwrap();
let tx_buckets = self.bucket_transactions(); let tx_buckets = self.bucket_transactions();
// flatten buckets using aggregate (with cut-through) // flatten buckets using aggregate (with cut-through)
@ -105,7 +107,7 @@ where
// make sure those txs are all valid together, no Error is expected // make sure those txs are all valid together, no Error is expected
// when passing None // when passing None
self.blockchain self.blockchain
.validate_raw_txs(flat_txs, None) .validate_raw_txs(flat_txs, None, &header.hash())
.expect("should never happen") .expect("should never happen")
} }
@ -128,6 +130,7 @@ where
from_state: PoolEntryState, from_state: PoolEntryState,
to_state: PoolEntryState, to_state: PoolEntryState,
extra_tx: Option<Transaction>, extra_tx: Option<Transaction>,
block_hash: &Hash,
) -> Result<Vec<Transaction>, PoolError> { ) -> Result<Vec<Transaction>, PoolError> {
let entries = &mut self let entries = &mut self
.entries .entries
@ -139,7 +142,9 @@ where
if candidate_txs.is_empty() { if candidate_txs.is_empty() {
return Ok(vec![]); return Ok(vec![]);
} }
let valid_txs = self.blockchain.validate_raw_txs(candidate_txs, extra_tx)?; let valid_txs = self
.blockchain
.validate_raw_txs(candidate_txs, extra_tx, block_hash)?;
// Update state on all entries included in final vec of valid txs. // Update state on all entries included in final vec of valid txs.
for x in &mut entries.iter_mut() { for x in &mut entries.iter_mut() {
@ -158,16 +163,18 @@ where
&mut self, &mut self,
entry: PoolEntry, entry: PoolEntry,
extra_txs: Vec<Transaction>, extra_txs: Vec<Transaction>,
block_hash: &Hash,
) -> Result<(), PoolError> { ) -> Result<(), PoolError> {
debug!( debug!(
LOGGER, LOGGER,
"pool [{}]: add_to_pool: {}, {:?}, inputs: {}, outputs: {}, kernels: {}", "pool [{}]: add_to_pool: {}, {:?}, inputs: {}, outputs: {}, kernels: {} (at block {})",
self.name, self.name,
entry.tx.hash(), entry.tx.hash(),
entry.src, entry.src,
entry.tx.inputs().len(), entry.tx.inputs().len(),
entry.tx.outputs().len(), entry.tx.outputs().len(),
entry.tx.kernels().len(), entry.tx.kernels().len(),
block_hash,
); );
// Combine all the txs from the pool with any extra txs provided. // Combine all the txs from the pool with any extra txs provided.
@ -184,9 +191,10 @@ where
transaction::aggregate(txs, None)? transaction::aggregate(txs, None)?
}; };
// Validate aggregated tx against the current chain state (via txhashset // Validate aggregated tx against a known chain state (via txhashset
// extension). // extension).
self.blockchain.validate_raw_txs(vec![], Some(agg_tx))?; self.blockchain
.validate_raw_txs(vec![], Some(agg_tx), block_hash)?;
// If we get here successfully then we can safely add the entry to the pool. // If we get here successfully then we can safely add the entry to the pool.
self.entries.push(entry); self.entries.push(entry);
@ -194,7 +202,11 @@ where
Ok(()) Ok(())
} }
pub fn reconcile(&mut self, extra_tx: Option<Transaction>) -> Result<(), PoolError> { pub fn reconcile(
&mut self,
extra_tx: Option<Transaction>,
block_hash: &Hash,
) -> Result<(), PoolError> {
let candidate_txs = self.all_transactions(); let candidate_txs = self.all_transactions();
let existing_len = candidate_txs.len(); let existing_len = candidate_txs.len();
@ -203,8 +215,10 @@ where
} }
// Go through the candidate txs and keep everything that validates incrementally // Go through the candidate txs and keep everything that validates incrementally
// against the current chain state, accounting for the "extra tx" as necessary. // against a known chain state, accounting for the "extra tx" as necessary.
let valid_txs = self.blockchain.validate_raw_txs(candidate_txs, extra_tx)?; let valid_txs = self
.blockchain
.validate_raw_txs(candidate_txs, extra_tx, block_hash)?;
self.entries.retain(|x| valid_txs.contains(&x.tx)); self.entries.retain(|x| valid_txs.contains(&x.tx));
debug!( debug!(

View file

@ -20,11 +20,10 @@
use chrono::prelude::Utc; use chrono::prelude::Utc;
use std::sync::Arc; use std::sync::Arc;
use core::core::hash::Hashed; use core::core::hash::Hash;
use core::core::{transaction, Block, CompactBlock, Transaction}; use core::core::{transaction, Block, CompactBlock, Transaction};
use pool::Pool; use pool::Pool;
use types::{BlockChain, PoolAdapter, PoolConfig, PoolEntry, PoolEntryState, PoolError, TxSource}; use types::{BlockChain, PoolAdapter, PoolConfig, PoolEntry, PoolEntryState, PoolError, TxSource};
use util::LOGGER;
/// Transaction pool implementation. /// Transaction pool implementation.
pub struct TransactionPool<T> { pub struct TransactionPool<T> {
@ -57,17 +56,17 @@ where
} }
} }
fn add_to_stempool(&mut self, entry: PoolEntry) -> Result<(), PoolError> { fn add_to_stempool(&mut self, entry: PoolEntry, block_hash: &Hash) -> Result<(), PoolError> {
// Add tx to stempool (passing in all txs from txpool to validate against). // Add tx to stempool (passing in all txs from txpool to validate against).
self.stempool self.stempool
.add_to_pool(entry.clone(), self.txpool.all_transactions())?; .add_to_pool(entry.clone(), self.txpool.all_transactions(), block_hash)?;
// Note: we do not notify the adapter here, // Note: we do not notify the adapter here,
// we let the dandelion monitor handle this. // we let the dandelion monitor handle this.
Ok(()) Ok(())
} }
fn add_to_txpool(&mut self, mut entry: PoolEntry) -> Result<(), PoolError> { fn add_to_txpool(&mut self, mut entry: PoolEntry, block_hash: &Hash) -> Result<(), PoolError> {
// First deaggregate the tx based on current txpool txs. // First deaggregate the tx based on current txpool txs.
if entry.tx.kernels().len() > 1 { if entry.tx.kernels().len() > 1 {
let txs = self let txs = self
@ -78,12 +77,12 @@ where
entry.src.debug_name = "deagg".to_string(); entry.src.debug_name = "deagg".to_string();
} }
} }
self.txpool.add_to_pool(entry.clone(), vec![])?; self.txpool.add_to_pool(entry.clone(), vec![], block_hash)?;
// We now need to reconcile the stempool based on the new state of the txpool. // We now need to reconcile the stempool based on the new state of the txpool.
// Some stempool txs may no longer be valid and we need to evict them. // Some stempool txs may no longer be valid and we need to evict them.
let txpool_tx = self.txpool.aggregate_transaction()?; let txpool_tx = self.txpool.aggregate_transaction()?;
self.stempool.reconcile(txpool_tx)?; self.stempool.reconcile(txpool_tx, block_hash)?;
self.adapter.tx_accepted(&entry.tx); self.adapter.tx_accepted(&entry.tx);
Ok(()) Ok(())
@ -96,6 +95,7 @@ where
src: TxSource, src: TxSource,
tx: Transaction, tx: Transaction,
stem: bool, stem: bool,
block_hash: &Hash,
) -> Result<(), PoolError> { ) -> Result<(), PoolError> {
// Do we have the capacity to accept this transaction? // Do we have the capacity to accept this transaction?
self.is_acceptable(&tx)?; self.is_acceptable(&tx)?;
@ -117,9 +117,9 @@ where
}; };
if stem { if stem {
self.add_to_stempool(entry)?; self.add_to_stempool(entry, block_hash)?;
} else { } else {
self.add_to_txpool(entry)?; self.add_to_txpool(entry, block_hash)?;
} }
Ok(()) Ok(())
} }
@ -129,12 +129,12 @@ where
pub fn reconcile_block(&mut self, block: &Block) -> Result<(), PoolError> { pub fn reconcile_block(&mut self, block: &Block) -> Result<(), PoolError> {
// First reconcile the txpool. // First reconcile the txpool.
self.txpool.reconcile_block(block)?; self.txpool.reconcile_block(block)?;
self.txpool.reconcile(None)?; self.txpool.reconcile(None, &block.hash())?;
// Then reconcile the stempool, accounting for the txpool txs. // Then reconcile the stempool, accounting for the txpool txs.
let txpool_tx = self.txpool.aggregate_transaction()?; let txpool_tx = self.txpool.aggregate_transaction()?;
self.stempool.reconcile_block(block)?; self.stempool.reconcile_block(block)?;
self.stempool.reconcile(txpool_tx)?; self.stempool.reconcile(txpool_tx, &block.hash())?;
Ok(()) Ok(())
} }

View file

@ -15,10 +15,11 @@
//! The primary module containing the implementations of the transaction pool //! The primary module containing the implementations of the transaction pool
//! and its top-level members. //! and its top-level members.
use std::{error, fmt};
use chrono::prelude::{DateTime, Utc}; use chrono::prelude::{DateTime, Utc};
use core::consensus; use core::consensus;
use core::core::block::BlockHeader;
use core::core::hash::Hash;
use core::core::transaction::{self, Transaction}; use core::core::transaction::{self, Transaction};
/// Dandelion relay timer /// Dandelion relay timer
@ -183,30 +184,15 @@ impl From<transaction::Error> for PoolError {
} }
} }
impl error::Error for PoolError {
fn description(&self) -> &str {
match *self {
_ => "some kind of pool error",
}
}
}
impl fmt::Display for PoolError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
_ => write!(f, "some kind of pool error"),
}
}
}
/// Interface that the pool requires from a blockchain implementation. /// Interface that the pool requires from a blockchain implementation.
pub trait BlockChain { pub trait BlockChain {
/// Validate a vec of txs against the current chain state after applying the /// Validate a vec of txs against known chain state at specific block
/// pre_tx to the chain state. /// after applying the pre_tx to the chain state.
fn validate_raw_txs( fn validate_raw_txs(
&self, &self,
txs: Vec<transaction::Transaction>, txs: Vec<transaction::Transaction>,
pre_tx: Option<transaction::Transaction>, pre_tx: Option<transaction::Transaction>,
block_hash: &Hash,
) -> Result<Vec<transaction::Transaction>, PoolError>; ) -> Result<Vec<transaction::Transaction>, PoolError>;
/// Verify any coinbase outputs being spent /// Verify any coinbase outputs being spent
@ -216,6 +202,8 @@ pub trait BlockChain {
/// Verify any coinbase outputs being spent /// Verify any coinbase outputs being spent
/// have matured sufficiently. /// have matured sufficiently.
fn verify_tx_lock_height(&self, tx: &transaction::Transaction) -> Result<(), PoolError>; fn verify_tx_lock_height(&self, tx: &transaction::Transaction) -> Result<(), PoolError>;
fn chain_head(&self) -> Result<BlockHeader, PoolError>;
} }
/// Bridge between the transaction pool and the rest of the system. Handles /// Bridge between the transaction pool and the rest of the system. Handles

View file

@ -31,6 +31,7 @@ use core::core::{Block, BlockHeader};
use chain::txhashset; use chain::txhashset;
use chain::types::Tip; use chain::types::Tip;
use core::core::hash::Hashed;
use core::core::target::Difficulty; use core::core::target::Difficulty;
use keychain::{ExtKeychain, Keychain}; use keychain::{ExtKeychain, Keychain};
@ -52,12 +53,23 @@ fn test_transaction_pool_block_building() {
let add_block = |height, txs| { let add_block = |height, txs| {
let key_id = keychain.derive_key_id(height as u32).unwrap(); let key_id = keychain.derive_key_id(height as u32).unwrap();
let reward = libtx::reward::output(&keychain, &key_id, 0, height).unwrap(); let reward = libtx::reward::output(&keychain, &key_id, 0, height).unwrap();
let block = Block::new(&BlockHeader::default(), txs, Difficulty::one(), reward).unwrap(); let mut block = Block::new(&BlockHeader::default(), txs, Difficulty::one(), reward).unwrap();
let mut txhashset = chain.txhashset.write().unwrap(); let mut txhashset = chain.txhashset.write().unwrap();
let mut batch = chain.store.batch().unwrap(); let mut batch = chain.store.batch().unwrap();
txhashset::extending(&mut txhashset, &mut batch, |extension| { txhashset::extending(&mut txhashset, &mut batch, |extension| {
extension.apply_block(&block) extension.apply_block(&block)?;
// Now set the roots and sizes as necessary on the block header.
let roots = extension.roots();
block.header.output_root = roots.output_root;
block.header.range_proof_root = roots.rproof_root;
block.header.kernel_root = roots.kernel_root;
let sizes = extension.sizes();
block.header.output_mmr_size = sizes.0;
block.header.kernel_mmr_size = sizes.2;
Ok(())
}).unwrap(); }).unwrap();
let tip = Tip::from_block(&block.header); let tip = Tip::from_block(&block.header);
@ -91,21 +103,21 @@ fn test_transaction_pool_block_building() {
// Add the three root txs to the pool. // Add the three root txs to the pool.
write_pool write_pool
.add_to_pool(test_source(), root_tx_1, false) .add_to_pool(test_source(), root_tx_1, false, &header.hash())
.unwrap(); .unwrap();
write_pool write_pool
.add_to_pool(test_source(), root_tx_2, false) .add_to_pool(test_source(), root_tx_2, false, &header.hash())
.unwrap(); .unwrap();
write_pool write_pool
.add_to_pool(test_source(), root_tx_3, false) .add_to_pool(test_source(), root_tx_3, false, &header.hash())
.unwrap(); .unwrap();
// Now add the two child txs to the pool. // Now add the two child txs to the pool.
write_pool write_pool
.add_to_pool(test_source(), child_tx_1.clone(), false) .add_to_pool(test_source(), child_tx_1.clone(), false, &header.hash())
.unwrap(); .unwrap();
write_pool write_pool
.add_to_pool(test_source(), child_tx_2.clone(), false) .add_to_pool(test_source(), child_tx_2.clone(), false, &header.hash())
.unwrap(); .unwrap();
assert_eq!(write_pool.total_size(), 5); assert_eq!(write_pool.total_size(), 5);
@ -118,7 +130,7 @@ fn test_transaction_pool_block_building() {
// children should have been aggregated into parents // children should have been aggregated into parents
assert_eq!(txs.len(), 3); assert_eq!(txs.len(), 3);
let block = { let mut block = {
let key_id = keychain.derive_key_id(2).unwrap(); let key_id = keychain.derive_key_id(2).unwrap();
let fees = txs.iter().map(|tx| tx.fee()).sum(); let fees = txs.iter().map(|tx| tx.fee()).sum();
let reward = libtx::reward::output(&keychain, &key_id, fees, 0).unwrap(); let reward = libtx::reward::output(&keychain, &key_id, fees, 0).unwrap();
@ -130,8 +142,22 @@ fn test_transaction_pool_block_building() {
let mut txhashset = chain.txhashset.write().unwrap(); let mut txhashset = chain.txhashset.write().unwrap();
txhashset::extending(&mut txhashset, &mut batch, |extension| { txhashset::extending(&mut txhashset, &mut batch, |extension| {
extension.apply_block(&block)?; extension.apply_block(&block)?;
// Now set the roots and sizes as necessary on the block header.
let roots = extension.roots();
block.header.output_root = roots.output_root;
block.header.range_proof_root = roots.rproof_root;
block.header.kernel_root = roots.kernel_root;
let sizes = extension.sizes();
block.header.output_mmr_size = sizes.0;
block.header.kernel_mmr_size = sizes.2;
Ok(()) Ok(())
}).unwrap(); }).unwrap();
let tip = Tip::from_block(&block.header);
batch.save_block_header(&block.header).unwrap();
batch.save_head(&tip).unwrap();
batch.commit().unwrap(); batch.commit().unwrap();
} }

View file

@ -20,19 +20,22 @@ extern crate grin_pool as pool;
extern crate grin_util as util; extern crate grin_util as util;
extern crate grin_wallet as wallet; extern crate grin_wallet as wallet;
extern crate rand;
extern crate chrono; extern crate chrono;
extern crate rand;
pub mod common; pub mod common;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use core::core::hash::Hashed;
use core::core::{Block, BlockHeader}; use core::core::{Block, BlockHeader};
use chain::txhashset;
use chain::types::Tip; use chain::types::Tip;
use chain::{txhashset, ChainStore}; use common::{
use common::{clean_output_dir, test_setup, test_source, test_transaction, clean_output_dir, test_setup, test_source, test_transaction,
test_transaction_spending_coinbase, ChainAdapter}; test_transaction_spending_coinbase, ChainAdapter,
};
use core::core::target::Difficulty; use core::core::target::Difficulty;
use keychain::{ExtKeychain, Keychain}; use keychain::{ExtKeychain, Keychain};
use wallet::libtx; use wallet::libtx;
@ -51,12 +54,24 @@ fn test_transaction_pool_block_reconciliation() {
let height = 1; let height = 1;
let key_id = keychain.derive_key_id(height as u32).unwrap(); let key_id = keychain.derive_key_id(height as u32).unwrap();
let reward = libtx::reward::output(&keychain, &key_id, 0, height).unwrap(); let reward = libtx::reward::output(&keychain, &key_id, 0, height).unwrap();
let block = Block::new(&BlockHeader::default(), vec![], Difficulty::one(), reward).unwrap(); let mut block =
Block::new(&BlockHeader::default(), vec![], Difficulty::one(), reward).unwrap();
let mut batch = chain.store.batch().unwrap(); let mut batch = chain.store.batch().unwrap();
let mut txhashset = chain.txhashset.write().unwrap(); let mut txhashset = chain.txhashset.write().unwrap();
txhashset::extending(&mut txhashset, &mut batch, |extension| { txhashset::extending(&mut txhashset, &mut batch, |extension| {
extension.apply_block(&block) extension.apply_block(&block)?;
// Now set the roots and sizes as necessary on the block header.
let roots = extension.roots();
block.header.output_root = roots.output_root;
block.header.range_proof_root = roots.rproof_root;
block.header.kernel_root = roots.kernel_root;
let sizes = extension.sizes();
block.header.output_mmr_size = sizes.0;
block.header.kernel_mmr_size = sizes.2;
Ok(())
}).unwrap(); }).unwrap();
let tip = Tip::from_block(&block.header); let tip = Tip::from_block(&block.header);
@ -78,13 +93,23 @@ fn test_transaction_pool_block_reconciliation() {
let key_id = keychain.derive_key_id(2).unwrap(); let key_id = keychain.derive_key_id(2).unwrap();
let fees = initial_tx.fee(); let fees = initial_tx.fee();
let reward = libtx::reward::output(&keychain, &key_id, fees, 0).unwrap(); let reward = libtx::reward::output(&keychain, &key_id, fees, 0).unwrap();
let block = Block::new(&header, vec![initial_tx], Difficulty::one(), reward).unwrap(); let mut block = Block::new(&header, vec![initial_tx], Difficulty::one(), reward).unwrap();
let mut batch = chain.store.batch().unwrap(); let mut batch = chain.store.batch().unwrap();
{ {
let mut txhashset = chain.txhashset.write().unwrap(); let mut txhashset = chain.txhashset.write().unwrap();
txhashset::extending(&mut txhashset, &mut batch, |extension| { txhashset::extending(&mut txhashset, &mut batch, |extension| {
extension.apply_block(&block)?; extension.apply_block(&block)?;
// Now set the roots and sizes as necessary on the block header.
let roots = extension.roots();
block.header.output_root = roots.output_root;
block.header.range_proof_root = roots.rproof_root;
block.header.kernel_root = roots.kernel_root;
let sizes = extension.sizes();
block.header.output_mmr_size = sizes.0;
block.header.kernel_mmr_size = sizes.2;
Ok(()) Ok(())
}).unwrap(); }).unwrap();
} }
@ -153,7 +178,7 @@ fn test_transaction_pool_block_reconciliation() {
for tx in &txs_to_add { for tx in &txs_to_add {
write_pool write_pool
.add_to_pool(test_source(), tx.clone(), false) .add_to_pool(test_source(), tx.clone(), false, &header.hash())
.unwrap(); .unwrap();
} }
@ -177,13 +202,23 @@ fn test_transaction_pool_block_reconciliation() {
let key_id = keychain.derive_key_id(3).unwrap(); let key_id = keychain.derive_key_id(3).unwrap();
let fees = block_txs.iter().map(|tx| tx.fee()).sum(); let fees = block_txs.iter().map(|tx| tx.fee()).sum();
let reward = libtx::reward::output(&keychain, &key_id, fees, 0).unwrap(); let reward = libtx::reward::output(&keychain, &key_id, fees, 0).unwrap();
let block = Block::new(&header, block_txs, Difficulty::one(), reward).unwrap(); let mut block = Block::new(&header, block_txs, Difficulty::one(), reward).unwrap();
{ {
let mut batch = chain.store.batch().unwrap(); let mut batch = chain.store.batch().unwrap();
let mut txhashset = chain.txhashset.write().unwrap(); let mut txhashset = chain.txhashset.write().unwrap();
txhashset::extending(&mut txhashset, &mut batch, |extension| { txhashset::extending(&mut txhashset, &mut batch, |extension| {
extension.apply_block(&block)?; extension.apply_block(&block)?;
// Now set the roots and sizes as necessary on the block header.
let roots = extension.roots();
block.header.output_root = roots.output_root;
block.header.range_proof_root = roots.rproof_root;
block.header.kernel_root = roots.kernel_root;
let sizes = extension.sizes();
block.header.output_mmr_size = sizes.0;
block.header.kernel_mmr_size = sizes.2;
Ok(()) Ok(())
}).unwrap(); }).unwrap();
batch.commit().unwrap(); batch.commit().unwrap();

View file

@ -20,18 +20,19 @@ extern crate grin_pool as pool;
extern crate grin_util as util; extern crate grin_util as util;
extern crate grin_wallet as wallet; extern crate grin_wallet as wallet;
extern crate rand;
extern crate chrono; extern crate chrono;
extern crate rand;
pub mod common; pub mod common;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use common::{test_source, test_transaction}; use common::{test_source, test_transaction};
use core::core::Transaction; use core::core::hash::Hash;
use core::core::{BlockHeader, Transaction};
use keychain::{ExtKeychain, Keychain}; use keychain::{ExtKeychain, Keychain};
use pool::TransactionPool;
use pool::types::{BlockChain, NoopAdapter, PoolConfig, PoolError}; use pool::types::{BlockChain, NoopAdapter, PoolConfig, PoolError};
use pool::TransactionPool;
pub fn test_setup( pub fn test_setup(
chain: &Arc<CoinbaseMaturityErrorChainAdapter>, chain: &Arc<CoinbaseMaturityErrorChainAdapter>,
@ -56,14 +57,17 @@ impl CoinbaseMaturityErrorChainAdapter {
} }
impl BlockChain for CoinbaseMaturityErrorChainAdapter { impl BlockChain for CoinbaseMaturityErrorChainAdapter {
fn chain_head(&self) -> Result<BlockHeader, PoolError> {
unimplemented!();
}
fn validate_raw_txs( fn validate_raw_txs(
&self, &self,
_txs: Vec<Transaction>, _txs: Vec<Transaction>,
_pre_tx: Option<Transaction>, _pre_tx: Option<Transaction>,
_block_hash: &Hash,
) -> Result<Vec<Transaction>, PoolError> { ) -> Result<Vec<Transaction>, PoolError> {
Err(PoolError::Other( unimplemented!();
"not implemented, not a real chain adapter...".to_string(),
))
} }
// Returns an ImmatureCoinbase for every tx we pass in. // Returns an ImmatureCoinbase for every tx we pass in.
@ -90,7 +94,7 @@ fn test_coinbase_maturity() {
{ {
let mut write_pool = pool.write().unwrap(); let mut write_pool = pool.write().unwrap();
let tx = test_transaction(&keychain, vec![50], vec![49]); let tx = test_transaction(&keychain, vec![50], vec![49]);
match write_pool.add_to_pool(test_source(), tx.clone(), true) { match write_pool.add_to_pool(test_source(), tx.clone(), true, &Hash::default()) {
Err(PoolError::ImmatureCoinbase) => {} Err(PoolError::ImmatureCoinbase) => {}
_ => panic!("Expected an immature coinbase error here."), _ => panic!("Expected an immature coinbase error here."),
} }

View file

@ -29,13 +29,12 @@ extern crate rand;
use std::fs; use std::fs;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use core::core::hash::Hash;
use core::core::{BlockHeader, Transaction}; use core::core::{BlockHeader, Transaction};
use chain::store::ChainStore; use chain::store::ChainStore;
use chain::txhashset; use chain::txhashset;
use chain::txhashset::TxHashSet; use chain::txhashset::TxHashSet;
use core::core::hash::Hashed;
use core::core::merkle_proof::MerkleProof;
use pool::*; use pool::*;
use keychain::Keychain; use keychain::Keychain;
@ -68,13 +67,27 @@ impl ChainAdapter {
} }
impl BlockChain for ChainAdapter { impl BlockChain for ChainAdapter {
fn chain_head(&self) -> Result<BlockHeader, PoolError> {
self.store
.head_header()
.map_err(|_| PoolError::Other(format!("failed to get chain head")))
}
fn validate_raw_txs( fn validate_raw_txs(
&self, &self,
txs: Vec<Transaction>, txs: Vec<Transaction>,
pre_tx: Option<Transaction>, pre_tx: Option<Transaction>,
block_hash: &Hash,
) -> Result<Vec<Transaction>, PoolError> { ) -> Result<Vec<Transaction>, PoolError> {
let header = self
.store
.get_block_header(&block_hash)
.map_err(|_| PoolError::Other(format!("failed to get header")))?;
let head_header = self.chain_head()?;
let mut txhashset = self.txhashset.write().unwrap(); let mut txhashset = self.txhashset.write().unwrap();
let res = txhashset::extending_readonly(&mut txhashset, |extension| { let res = txhashset::extending_readonly(&mut txhashset, |extension| {
extension.rewind(&header, &head_header)?;
let valid_txs = extension.validate_raw_txs(txs, pre_tx)?; let valid_txs = extension.validate_raw_txs(txs, pre_tx)?;
Ok(valid_txs) Ok(valid_txs)
}).map_err(|e| PoolError::Other(format!("Error: test chain adapter: {:?}", e)))?; }).map_err(|e| PoolError::Other(format!("Error: test chain adapter: {:?}", e)))?;

View file

@ -27,12 +27,13 @@ pub mod common;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use chain::txhashset;
use chain::types::Tip; use chain::types::Tip;
use chain::{txhashset, ChainStore};
use common::{ use common::{
clean_output_dir, test_setup, test_source, test_transaction, clean_output_dir, test_setup, test_source, test_transaction,
test_transaction_spending_coinbase, ChainAdapter, test_transaction_spending_coinbase, ChainAdapter,
}; };
use core::core::hash::Hashed;
use core::core::target::Difficulty; use core::core::target::Difficulty;
use core::core::{transaction, Block, BlockHeader}; use core::core::{transaction, Block, BlockHeader};
use keychain::{ExtKeychain, Keychain}; use keychain::{ExtKeychain, Keychain};
@ -53,12 +54,24 @@ fn test_the_transaction_pool() {
let height = 1; let height = 1;
let key_id = keychain.derive_key_id(height as u32).unwrap(); let key_id = keychain.derive_key_id(height as u32).unwrap();
let reward = libtx::reward::output(&keychain, &key_id, 0, height).unwrap(); let reward = libtx::reward::output(&keychain, &key_id, 0, height).unwrap();
let block = Block::new(&BlockHeader::default(), vec![], Difficulty::one(), reward).unwrap(); let mut block =
Block::new(&BlockHeader::default(), vec![], Difficulty::one(), reward).unwrap();
let mut txhashset = chain.txhashset.write().unwrap(); let mut txhashset = chain.txhashset.write().unwrap();
let mut batch = chain.store.batch().unwrap(); let mut batch = chain.store.batch().unwrap();
txhashset::extending(&mut txhashset, &mut batch, |extension| { txhashset::extending(&mut txhashset, &mut batch, |extension| {
extension.apply_block(&block) extension.apply_block(&block)?;
// Now set the roots and sizes as necessary on the block header.
let roots = extension.roots();
block.header.output_root = roots.output_root;
block.header.range_proof_root = roots.rproof_root;
block.header.kernel_root = roots.kernel_root;
let sizes = extension.sizes();
block.header.output_mmr_size = sizes.0;
block.header.kernel_mmr_size = sizes.2;
Ok(())
}).unwrap(); }).unwrap();
let tip = Tip::from_block(&block.header); let tip = Tip::from_block(&block.header);
@ -86,7 +99,7 @@ fn test_the_transaction_pool() {
{ {
let mut write_pool = pool.write().unwrap(); let mut write_pool = pool.write().unwrap();
write_pool write_pool
.add_to_pool(test_source(), initial_tx, false) .add_to_pool(test_source(), initial_tx, false, &header.hash())
.unwrap(); .unwrap();
assert_eq!(write_pool.total_size(), 1); assert_eq!(write_pool.total_size(), 1);
} }
@ -105,14 +118,14 @@ fn test_the_transaction_pool() {
// First, add a simple tx to the pool in "stem" mode. // First, add a simple tx to the pool in "stem" mode.
write_pool write_pool
.add_to_pool(test_source(), tx1.clone(), true) .add_to_pool(test_source(), tx1.clone(), true, &header.hash())
.unwrap(); .unwrap();
assert_eq!(write_pool.total_size(), 1); assert_eq!(write_pool.total_size(), 1);
assert_eq!(write_pool.stempool.size(), 1); assert_eq!(write_pool.stempool.size(), 1);
// Add another tx spending outputs from the previous tx. // Add another tx spending outputs from the previous tx.
write_pool write_pool
.add_to_pool(test_source(), tx2.clone(), true) .add_to_pool(test_source(), tx2.clone(), true, &header.hash())
.unwrap(); .unwrap();
assert_eq!(write_pool.total_size(), 1); assert_eq!(write_pool.total_size(), 1);
assert_eq!(write_pool.stempool.size(), 2); assert_eq!(write_pool.stempool.size(), 2);
@ -125,7 +138,7 @@ fn test_the_transaction_pool() {
let mut write_pool = pool.write().unwrap(); let mut write_pool = pool.write().unwrap();
assert!( assert!(
write_pool write_pool
.add_to_pool(test_source(), tx1.clone(), true) .add_to_pool(test_source(), tx1.clone(), true, &header.hash())
.is_err() .is_err()
); );
} }
@ -135,14 +148,22 @@ fn test_the_transaction_pool() {
{ {
let tx1a = test_transaction(&keychain, vec![500, 600], vec![499, 599]); let tx1a = test_transaction(&keychain, vec![500, 600], vec![499, 599]);
let mut write_pool = pool.write().unwrap(); let mut write_pool = pool.write().unwrap();
assert!(write_pool.add_to_pool(test_source(), tx1a, true).is_err()); assert!(
write_pool
.add_to_pool(test_source(), tx1a, true, &header.hash())
.is_err()
);
} }
// Test adding a tx attempting to spend a non-existent output. // Test adding a tx attempting to spend a non-existent output.
{ {
let bad_tx = test_transaction(&keychain, vec![10_001], vec![10_000]); let bad_tx = test_transaction(&keychain, vec![10_001], vec![10_000]);
let mut write_pool = pool.write().unwrap(); let mut write_pool = pool.write().unwrap();
assert!(write_pool.add_to_pool(test_source(), bad_tx, true).is_err()); assert!(
write_pool
.add_to_pool(test_source(), bad_tx, true, &header.hash())
.is_err()
);
} }
// Test adding a tx that would result in a duplicate output (conflicts with // Test adding a tx that would result in a duplicate output (conflicts with
@ -152,14 +173,22 @@ fn test_the_transaction_pool() {
{ {
let tx = test_transaction(&keychain, vec![900], vec![498]); let tx = test_transaction(&keychain, vec![900], vec![498]);
let mut write_pool = pool.write().unwrap(); let mut write_pool = pool.write().unwrap();
assert!(write_pool.add_to_pool(test_source(), tx, true).is_err()); assert!(
write_pool
.add_to_pool(test_source(), tx, true, &header.hash())
.is_err()
);
} }
// Confirm the tx pool correctly identifies an invalid tx (already spent). // Confirm the tx pool correctly identifies an invalid tx (already spent).
{ {
let mut write_pool = pool.write().unwrap(); let mut write_pool = pool.write().unwrap();
let tx3 = test_transaction(&keychain, vec![500], vec![497]); let tx3 = test_transaction(&keychain, vec![500], vec![497]);
assert!(write_pool.add_to_pool(test_source(), tx3, true).is_err()); assert!(
write_pool
.add_to_pool(test_source(), tx3, true, &header.hash())
.is_err()
);
assert_eq!(write_pool.total_size(), 1); assert_eq!(write_pool.total_size(), 1);
assert_eq!(write_pool.stempool.size(), 2); assert_eq!(write_pool.stempool.size(), 2);
} }
@ -175,7 +204,7 @@ fn test_the_transaction_pool() {
.unwrap(); .unwrap();
assert_eq!(agg_tx.kernels().len(), 2); assert_eq!(agg_tx.kernels().len(), 2);
write_pool write_pool
.add_to_pool(test_source(), agg_tx, false) .add_to_pool(test_source(), agg_tx, false, &header.hash())
.unwrap(); .unwrap();
assert_eq!(write_pool.total_size(), 2); assert_eq!(write_pool.total_size(), 2);
} }
@ -192,7 +221,7 @@ fn test_the_transaction_pool() {
// tx4 is the "new" part of this aggregated tx that we care about // tx4 is the "new" part of this aggregated tx that we care about
let agg_tx = transaction::aggregate(vec![tx1.clone(), tx2.clone(), tx4], None).unwrap(); let agg_tx = transaction::aggregate(vec![tx1.clone(), tx2.clone(), tx4], None).unwrap();
write_pool write_pool
.add_to_pool(test_source(), agg_tx, false) .add_to_pool(test_source(), agg_tx, false, &header.hash())
.unwrap(); .unwrap();
assert_eq!(write_pool.total_size(), 3); assert_eq!(write_pool.total_size(), 3);
let entry = write_pool.txpool.entries.last().unwrap(); let entry = write_pool.txpool.entries.last().unwrap();
@ -211,14 +240,19 @@ fn test_the_transaction_pool() {
// check we cannot add a double spend to the stempool // check we cannot add a double spend to the stempool
assert!( assert!(
write_pool write_pool
.add_to_pool(test_source(), double_spend_tx.clone(), true) .add_to_pool(test_source(), double_spend_tx.clone(), true, &header.hash())
.is_err() .is_err()
); );
// check we cannot add a double spend to the txpool // check we cannot add a double spend to the txpool
assert!( assert!(
write_pool write_pool
.add_to_pool(test_source(), double_spend_tx.clone(), false) .add_to_pool(
test_source(),
double_spend_tx.clone(),
false,
&header.hash()
)
.is_err() .is_err()
); );
} }

View file

@ -78,12 +78,13 @@ impl p2p::ChainAdapter for NetToChainAdapter {
identifier: "?.?.?.?".to_string(), identifier: "?.?.?.?".to_string(),
}; };
let h = tx.hash(); let tx_hash = tx.hash();
let block_hash = w(&self.chain).head_header().unwrap().hash();
debug!( debug!(
LOGGER, LOGGER,
"Received tx {}, inputs: {}, outputs: {}, kernels: {}, going to process.", "Received tx {}, inputs: {}, outputs: {}, kernels: {}, going to process.",
h, tx_hash,
tx.inputs().len(), tx.inputs().len(),
tx.outputs().len(), tx.outputs().len(),
tx.kernels().len(), tx.kernels().len(),
@ -91,11 +92,11 @@ impl p2p::ChainAdapter for NetToChainAdapter {
let res = { let res = {
let mut tx_pool = self.tx_pool.write().unwrap(); let mut tx_pool = self.tx_pool.write().unwrap();
tx_pool.add_to_pool(source, tx, stem) tx_pool.add_to_pool(source, tx, stem, &block_hash)
}; };
if let Err(e) = res { if let Err(e) = res {
debug!(LOGGER, "Transaction {} rejected: {:?}", h, e); debug!(LOGGER, "Transaction {} rejected: {:?}", tx_hash, e);
} }
} }
@ -738,14 +739,26 @@ impl PoolToChainAdapter {
} }
impl pool::BlockChain for PoolToChainAdapter { impl pool::BlockChain for PoolToChainAdapter {
fn chain_head(&self) -> Result<BlockHeader, pool::PoolError> {
wo(&self.chain).head_header().map_err(|e| {
pool::PoolError::Other(format!(
"Chain adapter failed to retrieve chain head: {:?}",
e
))
})
}
fn validate_raw_txs( fn validate_raw_txs(
&self, &self,
txs: Vec<Transaction>, txs: Vec<Transaction>,
pre_tx: Option<Transaction>, pre_tx: Option<Transaction>,
block_hash: &Hash,
) -> Result<(Vec<Transaction>), pool::PoolError> { ) -> Result<(Vec<Transaction>), pool::PoolError> {
wo(&self.chain).validate_raw_txs(txs, pre_tx).map_err(|_| { wo(&self.chain)
pool::PoolError::Other("Chain adapter failed to validate_raw_txs.".to_string()) .validate_raw_txs(txs, pre_tx, block_hash)
}) .map_err(|e| {
pool::PoolError::Other(format!("Chain adapter failed to validate_raw_txs: {:?}", e))
})
} }
fn verify_coinbase_maturity(&self, tx: &Transaction) -> Result<(), pool::PoolError> { fn verify_coinbase_maturity(&self, tx: &Transaction) -> Result<(), pool::PoolError> {

View file

@ -12,12 +12,12 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use chrono::prelude::Utc;
use rand::{self, Rng}; use rand::{self, Rng};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
use chrono::prelude::{Utc};
use core::core::hash::Hashed; use core::core::hash::Hashed;
use core::core::transaction; use core::core::transaction;
@ -90,11 +90,14 @@ where
{ {
let mut tx_pool = tx_pool.write().unwrap(); let mut tx_pool = tx_pool.write().unwrap();
let header = tx_pool.blockchain.chain_head()?;
let txpool_tx = tx_pool.txpool.aggregate_transaction()?; let txpool_tx = tx_pool.txpool.aggregate_transaction()?;
let stem_txs = tx_pool.stempool.select_valid_transactions( let stem_txs = tx_pool.stempool.select_valid_transactions(
PoolEntryState::ToStem, PoolEntryState::ToStem,
PoolEntryState::Stemmed, PoolEntryState::Stemmed,
txpool_tx, txpool_tx,
&header.hash(),
)?; )?;
if stem_txs.len() > 0 { if stem_txs.len() > 0 {
@ -118,7 +121,7 @@ where
identifier: "?.?.?.?".to_string(), identifier: "?.?.?.?".to_string(),
}; };
tx_pool.add_to_pool(src, agg_tx, false)?; tx_pool.add_to_pool(src, agg_tx, false, &header.hash())?;
} }
} }
Ok(()) Ok(())
@ -130,11 +133,14 @@ where
{ {
let mut tx_pool = tx_pool.write().unwrap(); let mut tx_pool = tx_pool.write().unwrap();
let header = tx_pool.blockchain.chain_head()?;
let txpool_tx = tx_pool.txpool.aggregate_transaction()?; let txpool_tx = tx_pool.txpool.aggregate_transaction()?;
let stem_txs = tx_pool.stempool.select_valid_transactions( let stem_txs = tx_pool.stempool.select_valid_transactions(
PoolEntryState::ToFluff, PoolEntryState::ToFluff,
PoolEntryState::Fluffed, PoolEntryState::Fluffed,
txpool_tx, txpool_tx,
&header.hash(),
)?; )?;
if stem_txs.len() > 0 { if stem_txs.len() > 0 {
@ -151,7 +157,7 @@ where
identifier: "?.?.?.?".to_string(), identifier: "?.?.?.?".to_string(),
}; };
tx_pool.add_to_pool(src, agg_tx, false)?; tx_pool.add_to_pool(src, agg_tx, false, &header.hash())?;
} }
Ok(()) Ok(())
} }
@ -231,12 +237,14 @@ where
{ {
let mut tx_pool = tx_pool.write().unwrap(); let mut tx_pool = tx_pool.write().unwrap();
let header = tx_pool.blockchain.chain_head()?;
for entry in expired_entries { for entry in expired_entries {
let src = TxSource { let src = TxSource {
debug_name: "embargo_expired".to_string(), debug_name: "embargo_expired".to_string(),
identifier: "?.?.?.?".to_string(), identifier: "?.?.?.?".to_string(),
}; };
match tx_pool.add_to_pool(src, entry.tx, false) { match tx_pool.add_to_pool(src, entry.tx, false, &header.hash()) {
Ok(_) => debug!( Ok(_) => debug!(
LOGGER, LOGGER,
"dand_mon: embargo expired, fluffed tx successfully." "dand_mon: embargo expired, fluffed tx successfully."

View file

@ -167,6 +167,7 @@ fn build_block(
b.header.clone().total_difficulty.to_num(), b.header.clone().total_difficulty.to_num(),
); );
// Now set txhashset roots and sizes on the header of the block being built.
let roots_result = chain.set_txhashset_roots(&mut b, false); let roots_result = chain.set_txhashset_roots(&mut b, false);
match roots_result { match roots_result {

View file

@ -95,7 +95,7 @@ where
move |build, (tx, kern, sum)| -> (Transaction, TxKernel, BlindSum) { move |build, (tx, kern, sum)| -> (Transaction, TxKernel, BlindSum) {
let commit = build.keychain.commit(value, &key_id).unwrap(); let commit = build.keychain.commit(value, &key_id).unwrap();
debug!(LOGGER, "Building an output: {}, {:?}", value, commit); debug!(LOGGER, "Building output: {}, {:?}", value, commit);
let rproof = proof::create(build.keychain, value, &key_id, commit, None).unwrap(); let rproof = proof::create(build.keychain, value, &key_id, commit, None).unwrap();

View file

@ -24,6 +24,7 @@ use std::sync::{Arc, Mutex};
use serde_json as json; use serde_json as json;
use core::core::hash::Hashed;
use core::ser; use core::ser;
use keychain::Keychain; use keychain::Keychain;
use libtx::slate::Slate; use libtx::slate::Slate;
@ -321,8 +322,19 @@ where
let mut w = self.wallet.lock().unwrap(); let mut w = self.wallet.lock().unwrap();
w.client().clone() w.client().clone()
}; };
client.post_tx(&TxWrapper { tx_hex: tx_hex }, fluff)?; let res = client.post_tx(&TxWrapper { tx_hex: tx_hex }, fluff);
Ok(()) if let Err(e) = res {
error!(LOGGER, "api: post_tx: failed with error: {}", e);
Err(e)
} else {
debug!(
LOGGER,
"api: post_tx: successfully posted tx: {}, fluff? {}",
slate.tx.hash(),
fluff
);
Ok(())
}
} }
/// Attempt to restore contents of wallet /// Attempt to restore contents of wallet
@ -414,6 +426,17 @@ where
w.open_with_credentials()?; w.open_with_credentials()?;
let res = tx::receive_tx(&mut **w, slate); let res = tx::receive_tx(&mut **w, slate);
w.close()?; w.close()?;
res
if let Err(e) = res {
error!(LOGGER, "api: receive_tx: failed with error: {}", e);
Err(e)
} else {
debug!(
LOGGER,
"api: receive_tx: successfully received tx: {}",
slate.tx.hash()
);
Ok(())
}
} }
} }

View file

@ -395,7 +395,10 @@ where
Box::new( Box::new(
parse_body(req).and_then(move |mut slate| match api.receive_tx(&mut slate) { parse_body(req).and_then(move |mut slate| match api.receive_tx(&mut slate) {
Ok(_) => ok(slate.clone()), Ok(_) => ok(slate.clone()),
Err(e) => err(e), Err(e) => {
error!(LOGGER, "receive_tx: failed with error: {}", e);
err(e)
}
}), }),
) )
} }