Basic Dandelion transaction routing ()

* Initial Dandelion Commit
* Changed stem_tx_pool to tx_stempool
* Introduction of stem memory pool and stem pool config
* Pool push now send to stem memory pool
* Add stem transaction functions
* Add stem transaction pool
* Drastically simplified code structure
* Add monitor transactions
* Add Dandelion monitor and remove transactions from stempool
* Add peer relay monitor
* Reconcile block with stempool
* Fix total size bug
* Add fluff option for pool push
* Added details on dandelion monitor
* Fix issue with missing parent
* Child transaction with stempool parent are now forced stem
* Update Dandelion Relay from outgoing peers
* Fix missing pool reconciliation
* Add the ability to fluff a transaction directly
* Fix tests for Dandelion
* Missing send_stem_transaction method...
* Add fluff handler for wallet
* Add logger when successfully updated Dandelion relay
* Launch transaction monitor last
* Fix dandelion relay misplaced
* Add logging and updating for stempool
* Additionnal check for stem transaction
* Fix 2 Locks in a row
This commit is contained in:
Quentin Le Sceller 2018-03-19 22:18:54 -05:00 committed by Ignotus Peverell
parent 7816f35238
commit fcfe7bc6a4
24 changed files with 962 additions and 83 deletions

View file

@ -516,9 +516,10 @@ struct TxWrapper {
tx_hex: String,
}
// Push new transactions to our transaction pool, that should broadcast it
// Push new transactions to our stem transaction pool, that should broadcast it
// to the network if valid.
struct PoolPushHandler<T> {
peers: Weak<p2p::Peers>,
tx_pool: Weak<RwLock<pool::TransactionPool<T>>>,
}
@ -548,8 +549,28 @@ where
tx.outputs.len()
);
let mut fluff = false;
if let Ok(params) = req.get_ref::<UrlEncodedQuery>() {
if let Some(_) = params.get("fluff") {
fluff = true;
}
}
// Will not do a stem transaction if our dandelion peer relay is empty
if !fluff && w(&self.peers).get_dandelion_relay().is_empty() {
debug!(
LOGGER,
"Missing Dandelion relay: will push stem transaction normally"
);
fluff = true;
}
// Push into the pool or stempool
let pool_arc = w(&self.tx_pool);
let res = pool_arc.write().unwrap().add_to_memory_pool(source, tx);
let res = pool_arc
.write()
.unwrap()
.add_to_memory_pool(source, tx, !fluff);
match res {
Ok(()) => Ok(Response::with(status::Ok)),
@ -626,6 +647,7 @@ pub fn start_rest_apis<T>(
tx_pool: tx_pool.clone(),
};
let pool_push_handler = PoolPushHandler {
peers: peers.clone(),
tx_pool: tx_pool.clone(),
};
let peers_all_handler = PeersAllHandler {

View file

@ -62,7 +62,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
w(&self.chain).head().unwrap().height
}
fn transaction_received(&self, tx: core::Transaction) {
fn transaction_received(&self, tx: core::Transaction, stem: bool) {
let source = pool::TxSource {
debug_name: "p2p".to_string(),
identifier: "?.?.?.?".to_string(),
@ -75,7 +75,11 @@ impl p2p::ChainAdapter for NetToChainAdapter {
);
let h = tx.hash();
if let Err(e) = self.tx_pool.write().unwrap().add_to_memory_pool(source, tx) {
if let Err(e) = self.tx_pool
.write()
.unwrap()
.add_to_memory_pool(source, tx, stem)
{
debug!(LOGGER, "Transaction {} rejected: {:?}", h, e);
}
}
@ -543,6 +547,9 @@ pub struct PoolToNetAdapter {
}
impl pool::PoolAdapter for PoolToNetAdapter {
fn stem_tx_accepted(&self, tx: &core::Transaction) {
wo(&self.peers).broadcast_stem_transaction(tx);
}
fn tx_accepted(&self, tx: &core::Transaction) {
wo(&self.peers).broadcast_transaction(tx);
}

View file

@ -0,0 +1,85 @@
// Copyright 2018 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::thread;
use std::time::Duration;
use std::sync::{Arc, RwLock};
use std::sync::atomic::{AtomicBool, Ordering};
use time::now_utc;
use util::LOGGER;
use pool::TransactionPool;
use pool::PoolConfig;
use pool::TxSource;
use pool::BlockChain;
/// A process to monitor transactions in the stempool.
/// With Dandelion, transaction can be broadcasted in stem or fluff phase.
/// When sent in stem phase, the transaction is relayed to only node: the dandelion relay. In
/// order to maintain reliability a timer is started for each transaction sent in stem phase.
/// This function will monitor the stempool and test if the timer is expired for each transaction.
/// In that case the transaction will be sent in fluff phase (to multiple peers) instead of
/// sending only to the peer relay.
pub fn monitor_transactions<T>(
config: PoolConfig,
tx_pool: Arc<RwLock<TransactionPool<T>>>,
stop: Arc<AtomicBool>,
) where
T: BlockChain + Send + Sync + 'static,
{
debug!(LOGGER, "Started Dandelion transaction monitor");
let _ = thread::Builder::new()
.name("dandelion".to_string())
.spawn(move || {
loop {
let tx_pool = tx_pool.clone();
let stem_transactions = tx_pool.read().unwrap().stem_transactions.clone();
let time_stem_transactions = tx_pool.read().unwrap().time_stem_transactions.clone();
for tx_hash in stem_transactions.keys() {
let time_transaction = time_stem_transactions.get(tx_hash).unwrap();
let interval = now_utc().to_timespec().sec - time_transaction;
// TODO Randomize between 30 and 60 seconds
if interval >= config.dandelion_embargo {
let source = TxSource {
debug_name: "dandelion-monitor".to_string(),
identifier: "?.?.?.?".to_string(),
};
let stem_transaction = stem_transactions.get(tx_hash).unwrap();
let res = tx_pool.write().unwrap().add_to_memory_pool(
source,
*stem_transaction.clone(),
false,
);
match res {
Ok(()) => {
info!(LOGGER, "Fluffing transaction after embargo timer expired.")
}
Err(e) => debug!(LOGGER, "error - {:?}", e),
};
// Remove from tx pool
tx_pool.write().unwrap().remove_from_stempool(tx_hash);
}
}
thread::sleep(Duration::from_secs(1));
if stop.load(Ordering::Relaxed) {
break;
}
}
});
}

View file

@ -44,6 +44,7 @@ extern crate grin_util as util;
extern crate grin_wallet as wallet;
mod adapters;
mod dandelion_monitor;
mod server;
mod seed;
mod sync;

View file

@ -30,6 +30,7 @@ use hyper;
use p2p;
use util::LOGGER;
const DANDELION_RELAY_TIME: i64 = 600;
const BAN_WINDOW: i64 = 10800;
const PEER_MAX_COUNT: u32 = 25;
const PEER_PREFERRED_COUNT: u32 = 8;
@ -92,6 +93,21 @@ fn monitor_peers(
total_count,
);
// Dandelion Relay Updater
let dandelion_relay = peers.get_dandelion_relay();
if dandelion_relay.is_empty() {
debug!(LOGGER, "monitor_peers: no dandelion relay updating");
peers.update_dandelion_relay();
} else {
for last_added in dandelion_relay.keys() {
let dandelion_interval = now_utc().to_timespec().sec - last_added;
if dandelion_interval >= DANDELION_RELAY_TIME {
debug!(LOGGER, "monitor_peers: updating expired dandelion relay");
peers.update_dandelion_relay();
}
}
}
let mut healthy_count = 0;
let mut banned_count = 0;
let mut defunct_count = 0;

View file

@ -27,6 +27,7 @@ use api;
use chain;
use core::{consensus, genesis, global};
use core::core::target::Difficulty;
use dandelion_monitor;
use miner;
use p2p;
use pool;
@ -193,6 +194,16 @@ impl Server {
Arc::downgrade(&p2p_server.peers),
);
info!(
LOGGER,
"Starting dandelion monitor: {}", &config.api_http_addr
);
dandelion_monitor::monitor_transactions(
config.pool_config.clone(),
tx_pool.clone(),
stop.clone(),
);
warn!(LOGGER, "Grin server started.");
Ok(Server {
config: config,

169
grin/tests/dandelion.rs Normal file
View file

@ -0,0 +1,169 @@
// Copyright 2018 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#[macro_use]
extern crate router;
#[macro_use]
extern crate slog;
extern crate grin_api as api;
extern crate grin_chain as chain;
extern crate grin_config as config;
extern crate grin_core as core;
extern crate grin_grin as grin;
extern crate grin_p2p as p2p;
extern crate grin_pow as pow;
extern crate grin_util as util;
extern crate grin_wallet as wallet;
mod framework;
use std::{thread, time};
use std::sync::{Arc, Mutex};
use framework::{LocalServerContainer, LocalServerContainerConfig};
use util::LOGGER;
/// Start 1 node mining, 1 non mining node and two wallets.
/// Then send a transaction from one wallet to another and propagate it a stem transaction
/// but without stem relay and check if the transaction is still broadcasted.
#[test]
#[ignore]
fn test_dandelion_timeout() {
let test_name_dir = "test_dandelion_timeout";
core::global::set_mining_mode(core::global::ChainTypes::AutomatedTesting);
framework::clean_all_output(test_name_dir);
let mut log_config = util::LoggingConfig::default();
//log_config.stdout_log_level = util::LogLevel::Trace;
log_config.stdout_log_level = util::LogLevel::Info;
//init_logger(Some(log_config));
util::init_test_logger();
// Run a separate coinbase wallet for coinbase transactions
let mut coinbase_config = LocalServerContainerConfig::default();
coinbase_config.name = String::from("coinbase_wallet");
coinbase_config.wallet_validating_node_url = String::from("http://127.0.0.1:30001");
coinbase_config.wallet_port = 10002;
let coinbase_wallet = Arc::new(Mutex::new(
LocalServerContainer::new(coinbase_config).unwrap(),
));
let coinbase_wallet_config = { coinbase_wallet.lock().unwrap().wallet_config.clone() };
let coinbase_seed = LocalServerContainer::get_wallet_seed(&coinbase_wallet_config);
let _ = thread::spawn(move || {
let mut w = coinbase_wallet.lock().unwrap();
w.run_wallet(0);
});
let mut recp_config = LocalServerContainerConfig::default();
recp_config.name = String::from("target_wallet");
recp_config.wallet_validating_node_url = String::from("http://127.0.0.1:30001");
recp_config.wallet_port = 20002;
let target_wallet = Arc::new(Mutex::new(LocalServerContainer::new(recp_config).unwrap()));
let target_wallet_cloned = target_wallet.clone();
let recp_wallet_config = { target_wallet.lock().unwrap().wallet_config.clone() };
let recp_seed = LocalServerContainer::get_wallet_seed(&recp_wallet_config);
//Start up a second wallet, to receive
let _ = thread::spawn(move || {
let mut w = target_wallet_cloned.lock().unwrap();
w.run_wallet(0);
});
// Spawn server and let it run for a bit
let mut server_one_config = LocalServerContainerConfig::default();
server_one_config.name = String::from("server_one");
server_one_config.p2p_server_port = 30000;
server_one_config.api_server_port = 30001;
server_one_config.start_miner = true;
server_one_config.start_wallet = false;
server_one_config.is_seeding = false;
server_one_config.coinbase_wallet_address =
String::from(format!("http://{}:{}", server_one_config.base_addr, 10002));
let mut server_one = LocalServerContainer::new(server_one_config).unwrap();
let mut server_two_config = LocalServerContainerConfig::default();
server_two_config.name = String::from("server_two");
server_two_config.p2p_server_port = 40000;
server_two_config.api_server_port = 40001;
server_two_config.start_miner = false;
server_two_config.start_wallet = false;
server_two_config.is_seeding = true;
let mut server_two = LocalServerContainer::new(server_two_config.clone()).unwrap();
server_one.add_peer(format!(
"{}:{}",
server_two_config.base_addr, server_two_config.p2p_server_port
));
// Spawn servers and let them run for a bit
let _ = thread::spawn(move || {
server_two.run_server(120);
});
// Wait for the first server to start
thread::sleep(time::Duration::from_millis(5000));
let _ = thread::spawn(move || {
server_one.run_server(120);
});
// Let them do a handshake and properly update their peer relay
thread::sleep(time::Duration::from_millis(30000));
//Wait until we have some funds to send
let mut coinbase_info =
LocalServerContainer::get_wallet_info(&coinbase_wallet_config, &coinbase_seed);
let mut slept_time = 0;
while coinbase_info.amount_currently_spendable < 100000000000 {
thread::sleep(time::Duration::from_millis(500));
slept_time += 500;
if slept_time > 10000 {
panic!("Coinbase not confirming in time");
}
coinbase_info =
LocalServerContainer::get_wallet_info(&coinbase_wallet_config, &coinbase_seed);
}
warn!(LOGGER, "Sending 50 Grins to recipient wallet");
// Sending stem transaction
LocalServerContainer::send_amount_to(
&coinbase_wallet_config,
"50.00",
1,
"not_all",
"http://127.0.0.1:20002",
false,
);
let coinbase_info =
LocalServerContainer::get_wallet_info(&coinbase_wallet_config, &coinbase_seed);
println!("Coinbase wallet info: {:?}", coinbase_info);
let recipient_info = LocalServerContainer::get_wallet_info(&recp_wallet_config, &recp_seed);
// The transaction should be waiting in the node stempool thus cannot be mined.
println!("Recipient wallet info: {:?}", recipient_info);
assert!(
recipient_info.data_confirmed && recipient_info.amount_awaiting_confirmation == 50000000000
);
// Wait for stem timeout
thread::sleep(time::Duration::from_millis(35000));
println!("Recipient wallet info: {:?}", recipient_info);
let recipient_info = LocalServerContainer::get_wallet_info(&recp_wallet_config, &recp_seed);
assert!(recipient_info.amount_currently_spendable == 50000000000);
}

View file

@ -307,6 +307,7 @@ impl LocalServerContainer {
minimum_confirmations: u64,
selection_strategy: &str,
dest: &str,
fluff: bool,
) {
let amount = core::core::amount_from_hr_string(amount)
.expect("Could not parse amount as a number with optional decimal point.");
@ -326,6 +327,7 @@ impl LocalServerContainer {
dest.to_string(),
max_outputs,
selection_strategy == "all",
fluff,
);
match result {
Ok(_) => println!(

View file

@ -114,6 +114,7 @@ fn basic_wallet_transactions() {
1,
"not_all",
"http://127.0.0.1:20002",
true,
);
//Wait for a confirmation
@ -124,7 +125,6 @@ fn basic_wallet_transactions() {
let recipient_info = LocalServerContainer::get_wallet_info(&recp_wallet_config, &recp_seed);
println!("Recipient wallet info: {:?}", recipient_info);
assert!(
recipient_info.data_confirmed && recipient_info.amount_currently_spendable == 50000000000
);
@ -133,13 +133,14 @@ fn basic_wallet_transactions() {
LOGGER,
"Sending many small transactions to recipient wallet"
);
for _ in 0..10 {
for i in 0..10 {
LocalServerContainer::send_amount_to(
&coinbase_wallet_config,
"1.00",
1,
"not_all",
"http://127.0.0.1:20002",
true,
);
}
@ -160,6 +161,7 @@ fn basic_wallet_transactions() {
1,
"all",
"http://127.0.0.1:10002",
true,
);
thread::sleep(time::Duration::from_millis(5000));

View file

@ -64,6 +64,7 @@ enum_from_primitive! {
Block,
GetCompactBlock,
CompactBlock,
StemTransaction,
Transaction,
TxHashSetRequest,
TxHashSetArchive

View file

@ -235,6 +235,26 @@ impl Peer {
}
}
/// Sends the provided stem transaction to the remote peer. The request may be
/// dropped if the remote peer is known to already have the stem transaction.
pub fn send_stem_transaction(&self, tx: &core::Transaction) -> Result<(), Error> {
if !self.tracking_adapter.has(tx.hash()) {
debug!(LOGGER, "Send tx {} to {}", tx.hash(), self.info.addr);
self.connection
.as_ref()
.unwrap()
.send(tx, msg::Type::StemTransaction)
} else {
debug!(
LOGGER,
"Not sending tx {} to {} (already seen)",
tx.hash(),
self.info.addr
);
Ok(())
}
}
/// Sends a request for block headers from the provided block locator
pub fn send_header_request(&self, locator: Vec<Hash>) -> Result<(), Error> {
self.connection
@ -356,9 +376,9 @@ impl ChainAdapter for TrackingAdapter {
self.adapter.total_height()
}
fn transaction_received(&self, tx: core::Transaction) {
fn transaction_received(&self, tx: core::Transaction, stem: bool) {
self.push(tx.hash());
self.adapter.transaction_received(tx)
self.adapter.transaction_received(tx, stem)
}
fn block_received(&self, b: core::Block, addr: SocketAddr) -> bool {

View file

@ -33,6 +33,7 @@ pub struct Peers {
pub adapter: Arc<ChainAdapter>,
store: PeerStore,
peers: RwLock<HashMap<SocketAddr, Arc<RwLock<Peer>>>>,
dandelion_relay: RwLock<HashMap<i64, Arc<RwLock<Peer>>>>,
}
unsafe impl Send for Peers {}
@ -44,6 +45,7 @@ impl Peers {
adapter,
store,
peers: RwLock::new(HashMap::new()),
dandelion_relay: RwLock::new(HashMap::new()),
}
}
@ -71,6 +73,34 @@ impl Peers {
apeer.clone()
}
// Update the dandelion relay
pub fn update_dandelion_relay(&self) {
let peers = self.outgoing_connected_peers();
match thread_rng().choose(&peers) {
Some(peer) => {
// Clear the map and add new relay
let dandelion_relay = &self.dandelion_relay;
dandelion_relay.write().unwrap().clear();
dandelion_relay
.write()
.unwrap()
.insert(time::now_utc().to_timespec().sec, peer.clone());
debug!(
LOGGER,
"Successfully updated Dandelion relay to: {}",
peer.try_read().unwrap().info.addr
);
}
None => error!(LOGGER, "Could not update dandelion relay"),
};
}
// Get the dandelion relay
pub fn get_dandelion_relay(&self) -> HashMap<i64, Arc<RwLock<Peer>>> {
let res = self.dandelion_relay.read().unwrap().clone();
res
}
pub fn is_known(&self, addr: &SocketAddr) -> bool {
self.get_connected_peer(addr).is_some()
}
@ -87,6 +117,19 @@ impl Peers {
res
}
pub fn outgoing_connected_peers(&self) -> Vec<Arc<RwLock<Peer>>> {
let peers = self.connected_peers();
let res = peers
.iter()
.filter(|x| match x.try_read() {
Ok(peer) => peer.info.direction == Direction::Outbound,
Err(_) => false,
})
.cloned()
.collect::<Vec<_>>();
res
}
/// Get a peer we're connected to by address.
pub fn get_connected_peer(&self, addr: &SocketAddr) -> Option<Arc<RwLock<Peer>>> {
self.peers.read().unwrap().get(addr).map(|p| p.clone())
@ -298,6 +341,30 @@ impl Peers {
);
}
/// Broadcasts the provided stem transaction to our peer relay.
pub fn broadcast_stem_transaction(&self, tx: &core::Transaction) {
let dandelion_relay = self.get_dandelion_relay();
if dandelion_relay.is_empty() {
debug!(LOGGER, "No dandelion relay updating");
self.update_dandelion_relay();
}
// If still empty broadcast then broadcast transaction normally
if dandelion_relay.is_empty() {
self.broadcast_transaction(tx);
}
for relay in dandelion_relay.values() {
let relay = relay.read().unwrap();
if relay.is_connected() {
if let Err(e) = relay.send_stem_transaction(tx) {
debug!(
LOGGER,
"Error sending stem transaction to peer relay: {:?}", e
);
}
}
}
}
/// Broadcasts the provided transaction to PEER_PREFERRED_COUNT of our peers.
/// We may be connected to PEER_MAX_COUNT peers so we only
/// want to broadcast to a random subset of peers.
@ -438,8 +505,8 @@ impl ChainAdapter for Peers {
fn total_height(&self) -> u64 {
self.adapter.total_height()
}
fn transaction_received(&self, tx: core::Transaction) {
self.adapter.transaction_received(tx)
fn transaction_received(&self, tx: core::Transaction, stem: bool) {
self.adapter.transaction_received(tx, stem)
}
fn block_received(&self, b: core::Block, peer_addr: SocketAddr) -> bool {
let hash = b.hash();

View file

@ -63,7 +63,13 @@ impl MessageHandler for Protocol {
Type::Transaction => {
let tx: core::Transaction = msg.body()?;
adapter.transaction_received(tx);
adapter.transaction_received(tx, false);
Ok(None)
}
Type::StemTransaction => {
let tx: core::Transaction = msg.body()?;
adapter.transaction_received(tx, true);
Ok(None)
}

View file

@ -205,7 +205,7 @@ impl ChainAdapter for DummyAdapter {
fn total_height(&self) -> u64 {
0
}
fn transaction_received(&self, _: core::Transaction) {}
fn transaction_received(&self, _: core::Transaction, stem: bool) {}
fn compact_block_received(&self, _cb: core::CompactBlock, _addr: SocketAddr) -> bool {
true
}

View file

@ -167,7 +167,7 @@ pub trait ChainAdapter: Sync + Send {
fn total_height(&self) -> u64;
/// A valid transaction has been received from one of our peers
fn transaction_received(&self, tx: core::Transaction);
fn transaction_received(&self, tx: core::Transaction, stem: bool);
/// A block has been received from one of our peers. Returns true if the
/// block could be handled properly and is not deemed defective by the

View file

@ -61,6 +61,7 @@ fn estimate_transaction_size(_tx: &core::transaction::Transaction) -> u64 {
/// An edge connecting graph vertices.
/// For various use cases, one of either the source or destination may be
/// unpopulated
#[derive(Clone)]
pub struct Edge {
// Source and Destination are the vertex id's, the transaction (kernel)
// hash.

View file

@ -14,9 +14,13 @@
//! Top-level Pool type, methods, and tests
use std::sync::Arc;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use time;
use rand;
use rand::Rng;
use core::core::hash::Hash;
use core::core::hash::Hashed;
use core::core::id::ShortIdentifiable;
use core::core::transaction;
@ -33,8 +37,15 @@ pub use graph;
/// keyed by their transaction hash.
pub struct TransactionPool<T> {
config: PoolConfig,
/// All transactions hash in the stempool with a time attached to ensure
/// propagation
pub time_stem_transactions: HashMap<hash::Hash, i64>,
/// All transactions in the stempool
pub stem_transactions: HashMap<hash::Hash, Box<transaction::Transaction>>,
/// All transactions in the pool
pub transactions: HashMap<hash::Hash, Box<transaction::Transaction>>,
/// The stem pool
pub stempool: Pool,
/// The pool itself
pub pool: Pool,
/// Orphans in the pool
@ -54,7 +65,10 @@ where
pub fn new(config: PoolConfig, chain: Arc<T>, adapter: Arc<PoolAdapter>) -> TransactionPool<T> {
TransactionPool {
config: config,
time_stem_transactions: HashMap::new(),
stem_transactions: HashMap::new(),
transactions: HashMap::new(),
stempool: Pool::empty(),
pool: Pool::empty(),
orphans: Orphans::empty(),
blockchain: chain,
@ -110,12 +124,17 @@ where
// The current best unspent set is:
// Pool unspent + (blockchain unspent - pool->blockchain spent)
// Pool unspents are unconditional so we check those first
self.pool
.get_available_output(&output_ref.commit)
.map(|x| {
self.search_stempool_spents(&output_ref.commit)
.or(self.pool.get_available_output(&output_ref.commit).map(|x| {
let tx_ref = x.source_hash().unwrap();
Parent::PoolTransaction { tx_ref }
})
}))
.or(self.stempool
.get_available_output(&output_ref.commit)
.map(|x| {
let tx_ref = x.source_hash().unwrap();
Parent::StemPoolTransaction { tx_ref }
}))
.or(self.search_blockchain_unspents(output_ref))
.or(self.search_pool_spents(&output_ref.commit))
.unwrap_or(Parent::Unknown)
@ -131,7 +150,13 @@ where
let other_tx = x.destination_hash().unwrap();
Parent::AlreadySpent { other_tx }
}
None => Parent::BlockTransaction,
None => match self.stempool.get_blockchain_spent(&output_ref.commit) {
Some(x) => {
let other_tx = x.destination_hash().unwrap();
Parent::AlreadySpent { other_tx }
}
None => Parent::BlockTransaction,
},
}
})
}
@ -147,6 +172,22 @@ where
})
}
// search_pool_spents is the second half of pool input detection, after the
// available_outputs have been checked. This returns either a
// Parent::AlreadySpent or None.
fn search_stempool_spents(&self, output_commitment: &Commitment) -> Option<Parent> {
self.stempool
.get_internal_spent(output_commitment)
.map(|x| Parent::AlreadySpent {
other_tx: x.destination_hash().unwrap(),
})
}
/// Get the number of transactions in the stempool
pub fn stempool_size(&self) -> usize {
self.stempool.num_transactions()
}
/// Get the number of transactions in the pool
pub fn pool_size(&self) -> usize {
self.pool.num_transactions()
@ -157,14 +198,16 @@ where
self.orphans.num_transactions()
}
/// Get the total size (transactions + orphans) of the pool
/// Get the total size (stem transactions + transactions + orphans) of the
/// pool
pub fn total_size(&self) -> usize {
self.pool.num_transactions() + self.orphans.num_transactions()
self.stempool.num_transactions() + self.pool.num_transactions()
+ self.orphans.num_transactions()
}
/// Attempts to add a transaction to the pool.
/// Attempts to add a transaction to the stempool or the memory pool.
///
/// Adds a transaction to the memory pool, deferring to the orphans pool
/// Adds a transaction to the stem memory pool, deferring to the orphans pool
/// if necessary, and performing any connection-related validity checks.
/// Happens under an exclusive mutable reference gated by the write portion
/// of a RWLock.
@ -172,6 +215,7 @@ where
&mut self,
_: TxSource,
tx: transaction::Transaction,
stem: bool,
) -> Result<(), PoolError> {
// Do we have the capacity to accept this transaction?
if let Err(e) = self.is_acceptable(&tx) {
@ -181,8 +225,8 @@ where
// Making sure the transaction is valid before anything else.
tx.validate().map_err(|e| PoolError::InvalidTx(e))?;
// The first check involves ensuring that an identical transaction is
// not already in the pool's transaction set.
// The first check involves ensuring that an indentical transaction is not
// alreay in the stem transaction or regular transaction pool.
// A non-authoritative similar check should be performed under the
// pool's read lock before we get to this point, which would catch the
// majority of duplicate cases. The race condition is caught here.
@ -191,17 +235,23 @@ where
// The current tx.hash() method, for example, does not cover changes
// to fees or other elements of the signature preimage.
let tx_hash = graph::transaction_identifier(&tx);
if self.transactions.contains_key(&tx_hash) {
return Err(PoolError::AlreadyInPool);
if let Err(e) = self.check_pools(&tx_hash, stem) {
return Err(e);
}
// Check that the transaction is mature
let head_header = self.blockchain.head_header()?;
if head_header.height < tx.lock_height() {
return Err(PoolError::ImmatureTransaction {
lock_height: tx.lock_height(),
});
if let Err(e) = self.is_mature(&tx, &head_header) {
return Err(e);
}
// Here if we have a stem transaction, decide wether it will be broadcasted
// in stem or fluff phase
let mut rng = rand::thread_rng();
let random = rng.gen_range(0, 101);
let stem_propagation = random <= self.config.dandelion_probability;
let mut will_stem = stem && stem_propagation;
// The next issue is to identify all unspent outputs that
// this transaction will consume and make sure they exist in the set.
let mut pool_refs: Vec<graph::Edge> = Vec::new();
@ -218,6 +268,20 @@ where
// into the pool.
match self.search_for_best_output(&output) {
Parent::PoolTransaction { tx_ref: x } => pool_refs.push(base.with_source(Some(x))),
Parent::StemPoolTransaction { tx_ref: x } => {
if will_stem {
// Going to stem this transaction if parent is in stempool it's ok.
debug!(LOGGER, "Going is in stempool");
pool_refs.push(base.with_source(Some(x)));
} else {
will_stem = true;
debug!(
LOGGER,
"Parent is in stempool, force transaction to go in stempool"
);
pool_refs.push(base.with_source(Some(x)));
}
}
Parent::BlockTransaction => {
let height = head_header.height + 1;
self.blockchain.is_matured(&input, height)?;
@ -267,12 +331,33 @@ where
// In the non-orphan (pool) case, we've ensured that every input
// maps one-to-one with an unspent (available) output, and each
// output is unique. No further checks are necessary.
self.pool
.add_pool_transaction(pool_entry, blockchain_refs, pool_refs, new_unspents);
if will_stem {
// Stem phase: transaction is added to the stem memory pool and broadcasted to a
// randomly selected node.
self.stempool.add_stempool_transaction(
pool_entry,
blockchain_refs,
pool_refs,
new_unspents,
);
self.adapter.stem_tx_accepted(&tx);
self.stem_transactions.insert(tx_hash, Box::new(tx));
// Track this transaction
self.time_stem_transactions
.insert(tx_hash, time::now_utc().to_timespec().sec);
} else {
// Fluff phase: transaction is added to memory pool and broadcasted normally
self.pool.add_pool_transaction(
pool_entry,
blockchain_refs,
pool_refs,
new_unspents,
);
self.adapter.tx_accepted(&tx);
self.transactions.insert(tx_hash, Box::new(tx));
}
self.reconcile_orphans().unwrap();
self.adapter.tx_accepted(&tx);
self.transactions.insert(tx_hash, Box::new(tx));
Ok(())
} else {
// At this point, we're pretty sure the transaction is an orphan,
@ -306,6 +391,7 @@ where
// We have passed all failure modes.
pool_refs.append(&mut blockchain_refs);
error!(LOGGER, "Add to orphan");
self.orphans.add_orphan_transaction(
pool_entry,
pool_refs,
@ -352,6 +438,18 @@ where
None => {}
};
// Check for existence of this output in the stempool
match self.stempool.find_output(&output.commitment()) {
Some(x) => {
return Err(PoolError::DuplicateOutput {
other_tx: Some(x),
in_chain: false,
output: output.commit,
})
}
None => {}
};
// If the transaction might go into orphans, perform the same
// checks as above but against the orphan set instead.
if is_orphan {
@ -450,6 +548,7 @@ where
/// evicted transactions elsewhere so that we can make a best effort at
/// returning them to the pool in the event of a reorg that invalidates
/// this block.
/// TODO also consider stempool here
pub fn reconcile_block(
&mut self,
block: &block::Block,
@ -484,6 +583,7 @@ where
// After the pool has been successfully processed, an orphans
// reconciliation job is triggered.
let mut marked_transactions: HashSet<hash::Hash> = HashSet::new();
let mut marked_stem_transactions: HashSet<hash::Hash> = HashSet::new();
{
// find all conflicting txs based on inputs to the block
@ -494,6 +594,14 @@ where
.filter_map(|x| x.destination_hash())
.collect();
// find all conflicting stem txs based on inputs to the block
let conflicting_stem_txs: HashSet<hash::Hash> = block
.inputs
.iter()
.filter_map(|x| self.stempool.get_external_spent_output(&x.commitment()))
.filter_map(|x| x.destination_hash())
.collect();
// find all outputs that conflict - potential for duplicates so use a HashSet
// here
let conflicting_outputs: HashSet<hash::Hash> = block
@ -507,16 +615,37 @@ where
.filter_map(|x| x.source_hash())
.collect();
// Similarly find all outputs that conflict in the stempool- potential for
// duplicates so use a HashSet here
let conflicting_stem_outputs: HashSet<hash::Hash> = block
.outputs
.iter()
.filter_map(|x: &transaction::Output| {
self.stempool
.get_internal_spent_output(&x.commitment())
.or(self.stempool.get_available_output(&x.commitment()))
})
.filter_map(|x| x.source_hash())
.collect();
// now iterate over all conflicting hashes from both txs and outputs
// we can just use the union of the two sets here to remove duplicates
for &txh in conflicting_txs.union(&conflicting_outputs) {
self.mark_transaction(txh, &mut marked_transactions);
self.mark_transaction(txh, &mut marked_transactions, false);
}
// Do the same for the stempool
for &txh in conflicting_stem_txs.union(&conflicting_stem_outputs) {
self.mark_transaction(txh, &mut marked_stem_transactions, true);
}
}
let freed_txs = self.sweep_transactions(marked_transactions);
let freed_txs = self.sweep_transactions(marked_transactions, false);
let freed_stem_txs = self.sweep_transactions(marked_stem_transactions, true);
self.reconcile_orphans().unwrap();
// Return something else here ?
Ok(freed_txs)
}
@ -530,7 +659,12 @@ where
///
/// Marked transactions are added to the mutable marked_txs HashMap which
/// is supplied by the calling function.
fn mark_transaction(&self, conflicting_tx: hash::Hash, marked_txs: &mut HashSet<hash::Hash>) {
fn mark_transaction(
&self,
conflicting_tx: hash::Hash,
marked_txs: &mut HashSet<hash::Hash>,
stem: bool,
) {
// we can stop recursively visiting txs if we have already seen this one
if marked_txs.contains(&conflicting_tx) {
return;
@ -538,15 +672,30 @@ where
marked_txs.insert(conflicting_tx);
let tx_ref = self.transactions.get(&conflicting_tx);
if stem {
let tx_ref = self.stem_transactions.get(&conflicting_tx);
for output in &tx_ref.unwrap().outputs {
match self.pool.get_internal_spent_output(&output.commitment()) {
Some(x) => if self.blockchain.is_unspent(&x.output()).is_err() {
self.mark_transaction(x.destination_hash().unwrap(), marked_txs);
},
None => {}
};
for output in &tx_ref.unwrap().outputs {
match self.stempool
.get_internal_spent_output(&output.commitment())
{
Some(x) => if self.blockchain.is_unspent(&x.output()).is_err() {
self.mark_transaction(x.destination_hash().unwrap(), marked_txs, true);
},
None => {}
};
}
} else {
let tx_ref = self.transactions.get(&conflicting_tx);
for output in &tx_ref.unwrap().outputs {
match self.pool.get_internal_spent_output(&output.commitment()) {
Some(x) => if self.blockchain.is_unspent(&x.output()).is_err() {
self.mark_transaction(x.destination_hash().unwrap(), marked_txs, false);
},
None => {}
};
}
}
}
/// The sweep portion of mark-and-sweep pool cleanup.
@ -563,22 +712,37 @@ where
fn sweep_transactions(
&mut self,
marked_transactions: HashSet<hash::Hash>,
stem: bool,
) -> Vec<Box<transaction::Transaction>> {
let mut removed_txs = Vec::new();
for tx_hash in &marked_transactions {
let removed_tx = self.transactions.remove(&tx_hash).unwrap();
if stem {
for tx_hash in &marked_transactions {
let removed_tx = self.stem_transactions.remove(&tx_hash).unwrap();
self.pool
.remove_pool_transaction(&removed_tx, &marked_transactions);
self.stempool
.remove_pool_transaction(&removed_tx, &marked_transactions);
removed_txs.push(removed_tx);
removed_txs.push(removed_tx);
}
// final step is to update the pool to reflect the new set of roots
// a tx that was non-root may now be root based on the txs removed
self.stempool.update_roots();
} else {
for tx_hash in &marked_transactions {
let removed_tx = self.transactions.remove(&tx_hash).unwrap();
self.pool
.remove_pool_transaction(&removed_tx, &marked_transactions);
removed_txs.push(removed_tx);
}
// final step is to update the pool to reflect the new set of roots
// a tx that was non-root may now be root based on the txs removed
self.pool.update_roots();
}
// final step is to update the pool to reflect the new set of roots
// a tx that was non-root may now be root based on the txs removed
self.pool.update_roots();
removed_txs
}
@ -600,6 +764,12 @@ where
.collect()
}
/// Remove tx from stempool
pub fn remove_from_stempool(&mut self, tx_hash: &Hash) {
self.stem_transactions.remove(&tx_hash);
self.time_stem_transactions.remove(&tx_hash);
}
/// Whether the transaction is acceptable to the pool, given both how
/// full the pool is and the transaction weight.
fn is_acceptable(&self, tx: &transaction::Transaction) -> Result<(), PoolError> {
@ -623,6 +793,57 @@ where
}
Ok(())
}
// Check that the transaction is not in the stempool or in the pool
fn check_pools(&mut self, tx_hash: &Hash, stem: bool) -> Result<(), PoolError> {
// Check if the transaction is a stem transaction AND alreay in stempool.
// If this is the case, we reject the transaction.
if stem && self.stem_transactions.contains_key(&tx_hash) {
return Err(PoolError::AlreadyInStempool);
} else {
// Now it leaves us with two cases:
// 1. The transaction is not a stem transaction and is in stempool. (false &&
// true) => The transaction has been fluffed by another node.
// It is okay too but we have to remove this transaction from our stempool
// before adding it in our transaction pool
// 2. The transaction is a stem transaction and is not in stempool. (true &&
// false). => Ok
// 3. The transaction is not a stem transaction is not in stempool (false &&
// false) => We have to check if the transaction is in the transaction
// pool
// Case number 1, maybe uneeded check
if self.stem_transactions.contains_key(&tx_hash) {
let mut tx: HashSet<hash::Hash> = HashSet::new();
tx.insert(tx_hash.clone());
debug!(
LOGGER,
"pool: check_pools: transaction has been fluffed - {}", &tx_hash,
);
let transaction = self.stem_transactions.remove(&tx_hash).unwrap();
self.time_stem_transactions.remove(&tx_hash);
self.stempool.remove_pool_transaction(&transaction, &tx);
// Case 3
} else if self.transactions.contains_key(&tx_hash) {
return Err(PoolError::AlreadyInPool);
}
}
Ok(())
}
// Check that the transaction is mature
fn is_mature(
&self,
tx: &transaction::Transaction,
head_header: &block::BlockHeader,
) -> Result<(), PoolError> {
if head_header.height < tx.lock_height() {
return Err(PoolError::ImmatureTransaction {
lock_height: tx.lock_height(),
});
}
Ok(())
}
}
#[cfg(test)]
@ -645,7 +866,10 @@ mod tests {
macro_rules! expect_output_parent {
($pool:expr, $expected:pat, $( $output:expr ),+ ) => {
$(
match $pool.search_for_best_output(&OutputIdentifier::from_output(&test_output($output))) {
match $pool
.search_for_best_output(
&OutputIdentifier::from_output(&test_output($output))
) {
$expected => {},
x => panic!(
"Unexpected result from output search for {:?}, got {:?}",
@ -690,13 +914,14 @@ mod tests {
assert_eq!(write_pool.total_size(), 0);
// First, add the transaction rooted in the blockchain
let result = write_pool.add_to_memory_pool(test_source(), parent_transaction);
let result = write_pool.add_to_memory_pool(test_source(), parent_transaction, false);
if result.is_err() {
panic!("got an error adding parent tx: {:?}", result.err().unwrap());
}
// Now, add the transaction connected as a child to the first
let child_result = write_pool.add_to_memory_pool(test_source(), child_transaction);
let child_result =
write_pool.add_to_memory_pool(test_source(), child_transaction, false);
if child_result.is_err() {
panic!(
@ -717,6 +942,140 @@ mod tests {
}
}
#[test]
/// A basic test; add a transaction to the pool and add the child to the
/// stempool
fn test_pool_stempool_add() {
let mut dummy_chain = DummyChainImpl::new();
let head_header = block::BlockHeader {
height: 1,
..block::BlockHeader::default()
};
dummy_chain.store_head_header(&head_header);
let parent_transaction = test_transaction(vec![5, 6, 7], vec![11, 3]);
// We want this transaction to be rooted in the blockchain.
let new_output = DummyOutputSet::empty()
.with_output(test_output(5))
.with_output(test_output(6))
.with_output(test_output(7))
.with_output(test_output(8));
// Prepare a second transaction, connected to the first.
let child_transaction = test_transaction(vec![11, 3], vec![12]);
dummy_chain.update_output_set(new_output);
// To mirror how this construction is intended to be used, the pool
// is placed inside a RwLock.
let pool = RwLock::new(test_setup(&Arc::new(dummy_chain)));
// Take the write lock and add a pool entry
{
let mut write_pool = pool.write().unwrap();
assert_eq!(write_pool.total_size(), 0);
// First, add the transaction rooted in the blockchain
let result = write_pool.add_to_memory_pool(test_source(), parent_transaction, false);
if result.is_err() {
panic!("got an error adding parent tx: {:?}", result.err().unwrap());
}
// Now, add the transaction connected as a child to the first
let child_result =
write_pool.add_to_memory_pool(test_source(), child_transaction, true);
if child_result.is_err() {
panic!(
"got an error adding child tx: {:?}",
child_result.err().unwrap()
);
}
}
// Now take the read lock and use a few exposed methods to check consistency
{
let read_pool = pool.read().unwrap();
assert_eq!(read_pool.total_size(), 2);
if read_pool.stempool.num_transactions() == 0 {
expect_output_parent!(read_pool, Parent::PoolTransaction{tx_ref: _}, 12);
} else {
expect_output_parent!(read_pool, Parent::StemPoolTransaction{tx_ref: _}, 12);
}
expect_output_parent!(read_pool, Parent::AlreadySpent{other_tx: _}, 11, 5);
expect_output_parent!(read_pool, Parent::BlockTransaction, 8);
expect_output_parent!(read_pool, Parent::Unknown, 20);
}
}
#[test]
/// A basic test; add a transaction to the stempool and one the regular transaction pool
/// Child transaction should be added to the stempool.
fn test_stempool_pool_add() {
let mut dummy_chain = DummyChainImpl::new();
let head_header = block::BlockHeader {
height: 1,
..block::BlockHeader::default()
};
dummy_chain.store_head_header(&head_header);
let parent_transaction = test_transaction(vec![5, 6, 7], vec![11, 3]);
// We want this transaction to be rooted in the blockchain.
let new_output = DummyOutputSet::empty()
.with_output(test_output(5))
.with_output(test_output(6))
.with_output(test_output(7))
.with_output(test_output(8));
// Prepare a second transaction, connected to the first.
let child_transaction = test_transaction(vec![11, 3], vec![12]);
dummy_chain.update_output_set(new_output);
// To mirror how this construction is intended to be used, the pool
// is placed inside a RwLock.
let pool = RwLock::new(test_setup(&Arc::new(dummy_chain)));
// Take the write lock and add a pool entry
{
let mut write_pool = pool.write().unwrap();
assert_eq!(write_pool.total_size(), 0);
// First, add the transaction rooted in the blockchain
let result = write_pool.add_to_memory_pool(test_source(), parent_transaction, true);
if result.is_err() {
panic!("got an error adding parent tx: {:?}", result.err().unwrap());
}
// Now, add the transaction connected as a child to the first
let child_result =
write_pool.add_to_memory_pool(test_source(), child_transaction, false);
if child_result.is_err() {
panic!(
"got an error adding child tx: {:?}",
child_result.err().unwrap()
);
}
}
// Now take the read lock and use a few exposed methods to check consistency
{
let read_pool = pool.read().unwrap();
// First transaction is a stem transaction. In that case the child transaction
// should be force stem
assert_eq!(read_pool.total_size(), 2);
// Parent has been directly fluffed
if read_pool.stempool.num_transactions() == 0 {
expect_output_parent!(read_pool, Parent::PoolTransaction{tx_ref: _}, 12);
} else {
expect_output_parent!(read_pool, Parent::StemPoolTransaction{tx_ref: _}, 12);
}
expect_output_parent!(read_pool, Parent::AlreadySpent{other_tx: _}, 11, 5);
expect_output_parent!(read_pool, Parent::BlockTransaction, 8);
expect_output_parent!(read_pool, Parent::Unknown, 20);
}
}
#[test]
/// Testing various expected error conditions
pub fn test_pool_add_error() {
@ -742,7 +1101,7 @@ mod tests {
// First expected failure: duplicate output
let duplicate_tx = test_transaction(vec![5, 6], vec![7]);
match write_pool.add_to_memory_pool(test_source(), duplicate_tx) {
match write_pool.add_to_memory_pool(test_source(), duplicate_tx, false) {
Ok(_) => panic!("Got OK from add_to_memory_pool when dup was expected"),
Err(x) => {
match x {
@ -767,7 +1126,7 @@ mod tests {
// a valid transaction.
let valid_transaction = test_transaction(vec![5, 6], vec![9]);
match write_pool.add_to_memory_pool(test_source(), valid_transaction.clone()) {
match write_pool.add_to_memory_pool(test_source(), valid_transaction.clone(), false) {
Ok(_) => {}
Err(x) => panic!("Unexpected error while adding a valid transaction: {:?}", x),
};
@ -776,7 +1135,7 @@ mod tests {
// as valid_transaction:
let double_spend_transaction = test_transaction(vec![6], vec![2]);
match write_pool.add_to_memory_pool(test_source(), double_spend_transaction) {
match write_pool.add_to_memory_pool(test_source(), double_spend_transaction, false) {
Ok(_) => panic!("Expected error when adding double spend, got Ok"),
Err(x) => {
match x {
@ -802,7 +1161,7 @@ mod tests {
// added
//let already_in_pool = test_transaction(vec![5, 6], vec![9]);
match write_pool.add_to_memory_pool(test_source(), valid_transaction) {
match write_pool.add_to_memory_pool(test_source(), valid_transaction, false) {
Ok(_) => panic!("Expected error when adding already in pool, got Ok"),
Err(x) => {
match x {
@ -817,7 +1176,7 @@ mod tests {
// now attempt to add a timelocked tx to the pool
// should fail as invalid based on current height
let timelocked_tx_1 = timelocked_transaction(vec![9], vec![5], 10);
match write_pool.add_to_memory_pool(test_source(), timelocked_tx_1) {
match write_pool.add_to_memory_pool(test_source(), timelocked_tx_1, false) {
Err(PoolError::ImmatureTransaction {
lock_height: height,
}) => {
@ -862,7 +1221,7 @@ mod tests {
chain_ref.store_head_header(&head_header);
let txn = test_transaction_with_coinbase_input(15, coinbase_header.hash(), vec![10, 3]);
let result = write_pool.add_to_memory_pool(test_source(), txn);
let result = write_pool.add_to_memory_pool(test_source(), txn, false);
match result {
Err(InvalidTx(transaction::Error::ImmatureCoinbase)) => {}
_ => panic!("expected ImmatureCoinbase error here"),
@ -875,7 +1234,7 @@ mod tests {
chain_ref.store_head_header(&head_header);
let txn = test_transaction_with_coinbase_input(15, coinbase_header.hash(), vec![10, 3]);
let result = write_pool.add_to_memory_pool(test_source(), txn);
let result = write_pool.add_to_memory_pool(test_source(), txn, false);
match result {
Ok(_) => {}
Err(_) => panic!("this should not return an error here"),
@ -917,8 +1276,12 @@ mod tests {
// now add both txs to the pool (tx2 spends tx1 with zero confirmations)
// both should be accepted if tx1 added before tx2
write_pool.add_to_memory_pool(test_source(), tx1).unwrap();
write_pool.add_to_memory_pool(test_source(), tx2).unwrap();
write_pool
.add_to_memory_pool(test_source(), tx1, false)
.unwrap();
write_pool
.add_to_memory_pool(test_source(), tx2, false)
.unwrap();
assert_eq!(write_pool.pool_size(), 2);
}
@ -1049,7 +1412,9 @@ mod tests {
assert_eq!(write_pool.total_size(), 0);
for tx in txs_to_add.drain(..) {
write_pool.add_to_memory_pool(test_source(), tx).unwrap();
write_pool
.add_to_memory_pool(test_source(), tx, false)
.unwrap();
}
assert_eq!(write_pool.total_size(), expected_pool_size);
@ -1155,27 +1520,27 @@ mod tests {
assert!(
write_pool
.add_to_memory_pool(test_source(), root_tx_1)
.add_to_memory_pool(test_source(), root_tx_1, false)
.is_ok()
);
assert!(
write_pool
.add_to_memory_pool(test_source(), root_tx_2)
.add_to_memory_pool(test_source(), root_tx_2, false)
.is_ok()
);
assert!(
write_pool
.add_to_memory_pool(test_source(), root_tx_3)
.add_to_memory_pool(test_source(), root_tx_3, false)
.is_ok()
);
assert!(
write_pool
.add_to_memory_pool(test_source(), child_tx_1)
.add_to_memory_pool(test_source(), child_tx_1, false)
.is_ok()
);
assert!(
write_pool
.add_to_memory_pool(test_source(), child_tx_2)
.add_to_memory_pool(test_source(), child_tx_2, false)
.is_ok()
);
@ -1225,8 +1590,13 @@ mod tests {
config: PoolConfig {
accept_fee_base: 0,
max_pool_size: 10_000,
dandelion_probability: 90,
dandelion_embargo: 30,
},
time_stem_transactions: HashMap::new(),
stem_transactions: HashMap::new(),
transactions: HashMap::new(),
stempool: Pool::empty(),
pool: Pool::empty(),
orphans: Orphans::empty(),
blockchain: dummy_chain.clone(),

View file

@ -28,7 +28,7 @@ use core::consensus;
use core::core::{block, hash, transaction};
use core::core::transaction::{Input, OutputIdentifier};
/// Tranasction pool configuration
/// Transaction pool configuration
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PoolConfig {
/// Base fee for a transaction to be accepted by the pool. The transaction
@ -40,6 +40,14 @@ pub struct PoolConfig {
/// Maximum capacity of the pool in number of transactions
#[serde = "default_max_pool_size"]
pub max_pool_size: usize,
/// Maximum capacity of the pool in number of transactions
#[serde = "default_dandelion_probability"]
pub dandelion_probability: usize,
/// Default embargo for Dandelion transaction
#[serde = "default_dandelion_embargo"]
pub dandelion_embargo: i64,
}
impl Default for PoolConfig {
@ -47,6 +55,8 @@ impl Default for PoolConfig {
PoolConfig {
accept_fee_base: default_accept_fee_base(),
max_pool_size: default_max_pool_size(),
dandelion_probability: default_dandelion_probability(),
dandelion_embargo: default_dandelion_embargo(),
}
}
}
@ -57,6 +67,12 @@ fn default_accept_fee_base() -> u64 {
fn default_max_pool_size() -> usize {
50_000
}
fn default_dandelion_probability() -> usize {
90
}
fn default_dandelion_embargo() -> i64 {
30
}
/// Placeholder: the data representing where we heard about a tx from.
///
@ -79,6 +95,7 @@ pub enum Parent {
Unknown,
BlockTransaction,
PoolTransaction { tx_ref: hash::Hash },
StemPoolTransaction { tx_ref: hash::Hash },
AlreadySpent { other_tx: hash::Hash },
}
@ -90,6 +107,9 @@ impl fmt::Debug for Parent {
&Parent::PoolTransaction { tx_ref: x } => {
write!(f, "Parent: Pool Transaction ({:?})", x)
}
&Parent::StemPoolTransaction { tx_ref: x } => {
write!(f, "Parent: Stempool Transaction ({:?})", x)
}
&Parent::AlreadySpent { other_tx: x } => write!(f, "Parent: Already Spent By {:?}", x),
}
}
@ -103,6 +123,8 @@ pub enum PoolError {
InvalidTx(transaction::Error),
/// An entry already in the pool
AlreadyInPool,
/// An entry already in the stempool
AlreadyInStempool,
/// A duplicate output
DuplicateOutput {
/// The other transaction
@ -166,6 +188,9 @@ pub trait PoolAdapter: Send + Sync {
/// The transaction pool has accepted this transactions as valid and added
/// it to its internal cache.
fn tx_accepted(&self, tx: &transaction::Transaction);
/// The stem transaction pool has accepted this transactions as valid and added
/// it to its internal cache.
fn stem_tx_accepted(&self, tx: &transaction::Transaction);
}
/// Dummy adapter used as a placeholder for real implementations
@ -174,6 +199,7 @@ pub trait PoolAdapter: Send + Sync {
pub struct NoopAdapter {}
impl PoolAdapter for NoopAdapter {
fn tx_accepted(&self, _: &transaction::Transaction) {}
fn stem_tx_accepted(&self, _: &transaction::Transaction) {}
}
/// Pool contains the elements of the graph that are connected, in full, to
@ -265,6 +291,39 @@ impl Pool {
}
}
// More relax way for stempool transaction in order to accept scenario such as:
// Parent is in mempool, child is allowed in stempool
//
pub fn add_stempool_transaction(
&mut self,
pool_entry: graph::PoolEntry,
mut blockchain_refs: Vec<graph::Edge>,
pool_refs: Vec<graph::Edge>,
mut new_unspents: Vec<graph::Edge>,
) {
// Removing consumed available_outputs
for new_edge in &pool_refs {
// All of these *can* correspond to an existing unspent
self.available_outputs.remove(&new_edge.output_commitment());
}
// Accounting for consumed blockchain outputs
for new_blockchain_edge in blockchain_refs.drain(..) {
self.consumed_blockchain_outputs
.insert(new_blockchain_edge.output_commitment(), new_blockchain_edge);
}
// Adding the transaction to the vertices list along with internal
// pool edges
self.graph.add_entry(pool_entry, pool_refs);
// Adding the new unspents to the unspent map
for unspent_output in new_unspents.drain(..) {
self.available_outputs
.insert(unspent_output.output_commitment(), unspent_output);
}
}
pub fn update_roots(&mut self) {
self.graph.update_roots()
}

View file

@ -266,7 +266,11 @@ fn main() {
.help("Send the transaction to the provided server")
.short("d")
.long("dest")
.takes_value(true)))
.takes_value(true))
.arg(Arg::with_name("fluff")
.help("Fluff the transaction (ignore Dandelion relay protocol)")
.short("f")
.long("fluff")))
.subcommand(SubCommand::with_name("burn")
.about("** TESTING ONLY ** Burns the provided amount to a known \
@ -543,6 +547,10 @@ fn wallet_command(wallet_args: &ArgMatches, global_config: GlobalConfig) {
let dest = send_args
.value_of("dest")
.expect("Destination wallet address required");
let mut fluff = false;
if send_args.is_present("fluff") {
fluff = true;
}
let max_outputs = 500;
let result = wallet::issue_send_tx(
&wallet_config,
@ -552,6 +560,7 @@ fn wallet_command(wallet_args: &ArgMatches, global_config: GlobalConfig) {
dest.to_string(),
max_outputs,
selection_strategy == "all",
fluff,
);
match result {
Ok(_) => info!(

View file

@ -28,6 +28,7 @@ router = "~0.5.1"
prettytable-rs = "^0.6"
term = "~0.4.6"
uuid = { version = "~0.5.1", features = ["serde", "v4"] }
urlencoded = "~0.5.0"
grin_api = { path = "../api" }
grin_core = { path = "../core" }
grin_keychain = { path = "../keychain" }

View file

@ -64,17 +64,27 @@ where
Ok(res)
}
pub fn send_partial_tx(url: &str, partial_tx: &PartialTx) -> Result<PartialTx, Error> {
single_send_partial_tx(url, partial_tx)
pub fn send_partial_tx(url: &str, partial_tx: &PartialTx, fluff: bool) -> Result<PartialTx, Error> {
single_send_partial_tx(url, partial_tx, fluff)
}
fn single_send_partial_tx(url: &str, partial_tx: &PartialTx) -> Result<PartialTx, Error> {
fn single_send_partial_tx(
url: &str,
partial_tx: &PartialTx,
fluff: bool,
) -> Result<PartialTx, Error> {
let mut core = reactor::Core::new().context(ErrorKind::Hyper)?;
let client = hyper::Client::new(&core.handle());
// In case we want to do an express send
let mut url_pool = url.to_owned();
if fluff {
url_pool = format!("{}{}", url, "?fluff");
}
let mut req = Request::new(
Method::Post,
url.parse::<hyper::Uri>().context(ErrorKind::Hyper)?,
url_pool.parse::<hyper::Uri>().context(ErrorKind::Hyper)?,
);
req.headers_mut().set(ContentType::json());
let json = serde_json::to_string(&partial_tx).context(ErrorKind::Hyper)?;

View file

@ -26,6 +26,7 @@ extern crate serde_json;
#[macro_use]
extern crate slog;
extern crate term;
extern crate urlencoded;
extern crate uuid;
extern crate bodyparser;

View file

@ -30,6 +30,7 @@ use core::{global, ser};
use keychain::{BlindingFactor, Identifier, Keychain};
use types::*;
use util::{secp, to_hex, LOGGER};
use urlencoded::UrlEncodedQuery;
use failure::ResultExt;
/// Dummy wrapper for the hex-encoded serialized transaction.
@ -158,6 +159,7 @@ fn handle_sender_confirmation(
config: &WalletConfig,
keychain: &Keychain,
partial_tx: &PartialTx,
fluff: bool,
) -> Result<PartialTx, Error> {
let (amount, sender_pub_blinding, sender_pub_nonce, kernel_offset, sender_sig_part, tx) =
read_partial_tx(keychain, partial_tx)?;
@ -226,7 +228,15 @@ fn handle_sender_confirmation(
let tx_hex = to_hex(ser::ser_vec(&final_tx).unwrap());
let url = format!("{}/v1/pool/push", config.check_node_api_http_addr.as_str());
let url;
if fluff {
url = format!(
"{}/v1/pool/push?fluff",
config.check_node_api_http_addr.as_str()
);
} else {
url = format!("{}/v1/pool/push", config.check_node_api_http_addr.as_str());
}
api::client::post(url.as_str(), &TxWrapper { tx_hex: tx_hex }).context(ErrorKind::Node)?;
// Return what we've actually posted
@ -255,6 +265,13 @@ impl Handler for WalletReceiver {
fn handle(&self, req: &mut Request) -> IronResult<Response> {
let struct_body = req.get::<bodyparser::Struct<PartialTx>>();
let mut fluff = false;
if let Ok(params) = req.get_ref::<UrlEncodedQuery>() {
if let Some(_) = params.get("fluff") {
fluff = true;
}
}
if let Ok(Some(partial_tx)) = struct_body {
match partial_tx.phase {
PartialTxPhase::SenderInitiation => {
@ -278,6 +295,7 @@ impl Handler for WalletReceiver {
&self.config,
&self.keychain,
&partial_tx,
fluff,
).map_err(|e| {
error!(LOGGER, "Phase 3 Sender Confirmation -> Problematic partial tx, looks like this: {:?}", partial_tx);
api::Error::Internal(format!(

View file

@ -40,6 +40,7 @@ pub fn issue_send_tx(
dest: String,
max_outputs: usize,
selection_strategy_is_use_all: bool,
fluff: bool,
) -> Result<(), Error> {
checker::refresh_outputs(config, keychain)?;
@ -126,7 +127,7 @@ pub fn issue_send_tx(
let url = format!("{}/v1/receive/transaction", &dest);
debug!(LOGGER, "Posting partial transaction to {}", url);
let res = client::send_partial_tx(&url, &partial_tx);
let res = client::send_partial_tx(&url, &partial_tx, fluff);
if let Err(e) = res {
match e.kind() {
ErrorKind::FeeExceedsAmount {
@ -186,7 +187,7 @@ pub fn issue_send_tx(
partial_tx.phase = PartialTxPhase::SenderConfirmation;
// And send again
let res = client::send_partial_tx(&url, &partial_tx);
let res = client::send_partial_tx(&url, &partial_tx, fluff);
if let Err(e) = res {
match e.kind() {
ErrorKind::FeeExceedsAmount {sender_amount, recipient_fee} =>