Always stem local txs if configured that way (unless explicitly fluffed) (#2876)

* always stem local txs if configured that way (unless explicitly fluff from wallet)
this overrides current epoch behavior for txs coming in via "push-api"
rename "local" to "our" txs

* TxSource is now an enum for type safety.
This commit is contained in:
Antioch Peverell 2019-07-04 11:56:42 +01:00 committed by GitHub
parent 82775164e8
commit f4eb3e3d4b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 86 additions and 75 deletions

View file

@ -82,10 +82,7 @@ impl PoolPushHandler {
.map_err(|e| ErrorKind::RequestError(format!("Bad request: {}", e)).into()) .map_err(|e| ErrorKind::RequestError(format!("Bad request: {}", e)).into())
}) })
.and_then(move |tx: Transaction| { .and_then(move |tx: Transaction| {
let source = pool::TxSource { let source = pool::TxSource::PushApi;
debug_name: "push-api".to_string(),
identifier: "?.?.?.?".to_string(),
};
info!( info!(
"Pushing transaction {} to pool (inputs: {}, outputs: {}, kernels: {})", "Pushing transaction {} to pool (inputs: {}, outputs: {}, kernels: {})",
tx.hash(), tx.hash(),

View file

@ -210,6 +210,14 @@ fn comments() -> HashMap<String, String> {
.to_string(), .to_string(),
); );
retval.insert(
"always_stem_our_txs".to_string(),
"
#always stem our (pushed via api) txs regardless of stem/fluff epoch (as per Dandelion++ paper)
"
.to_string(),
);
retval.insert( retval.insert(
"[server.p2p_config]".to_string(), "[server.p2p_config]".to_string(),
"#test miner wallet URL (burns if this doesn't exist) "#test miner wallet URL (burns if this doesn't exist)

View file

@ -202,10 +202,10 @@ impl Pool {
fn log_pool_add(&self, entry: &PoolEntry, header: &BlockHeader) { fn log_pool_add(&self, entry: &PoolEntry, header: &BlockHeader) {
debug!( debug!(
"add_to_pool [{}]: {} ({}) [in/out/kern: {}/{}/{}] pool: {} (at block {})", "add_to_pool [{}]: {} ({:?}) [in/out/kern: {}/{}/{}] pool: {} (at block {})",
self.name, self.name,
entry.tx.hash(), entry.tx.hash(),
entry.src.debug_name, entry.src,
entry.tx.inputs().len(), entry.tx.inputs().len(),
entry.tx.outputs().len(), entry.tx.outputs().len(),
entry.tx.kernels().len(), entry.tx.kernels().len(),

View file

@ -108,7 +108,7 @@ impl TransactionPool {
tx.validate(Weighting::AsTransaction, self.verifier_cache.clone())?; tx.validate(Weighting::AsTransaction, self.verifier_cache.clone())?;
entry.tx = tx; entry.tx = tx;
entry.src.debug_name = "deagg".to_string(); entry.src = TxSource::Deaggregate;
} }
} }
self.txpool.add_to_pool(entry.clone(), vec![], header)?; self.txpool.add_to_pool(entry.clone(), vec![], header)?;
@ -169,12 +169,12 @@ impl TransactionPool {
if !stem if !stem
|| self || self
.add_to_stempool(entry.clone(), header) .add_to_stempool(entry.clone(), header)
.and_then(|_| self.adapter.stem_tx_accepted(&entry.tx)) .and_then(|_| self.adapter.stem_tx_accepted(&entry))
.is_err() .is_err()
{ {
self.add_to_txpool(entry.clone(), header)?; self.add_to_txpool(entry.clone(), header)?;
self.add_to_reorg_cache(entry.clone()); self.add_to_reorg_cache(entry.clone());
self.adapter.tx_accepted(&entry.tx); self.adapter.tx_accepted(&entry);
} }
// Transaction passed all the checks but we have to make space for it // Transaction passed all the checks but we have to make space for it

View file

@ -39,23 +39,32 @@ const DANDELION_AGGREGATION_SECS: u16 = 30;
/// Dandelion stem probability (stem 90% of the time, fluff 10%). /// Dandelion stem probability (stem 90% of the time, fluff 10%).
const DANDELION_STEM_PROBABILITY: u8 = 90; const DANDELION_STEM_PROBABILITY: u8 = 90;
/// Always stem our (pushed via api) txs?
/// Defaults to true to match the Dandelion++ paper.
/// But can be overridden to allow a node to fluff our txs if desired.
/// If set to false we will stem/fluff our txs as per current epoch.
const DANDELION_ALWAYS_STEM_OUR_TXS: bool = true;
/// Configuration for "Dandelion". /// Configuration for "Dandelion".
/// Note: shared between p2p and pool. /// Note: shared between p2p and pool.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct DandelionConfig { pub struct DandelionConfig {
/// Length of each "epoch". /// Length of each "epoch".
#[serde(default = "default_dandelion_epoch_secs")] #[serde(default = "default_dandelion_epoch_secs")]
pub epoch_secs: Option<u16>, pub epoch_secs: u16,
/// Dandelion embargo timer. Fluff and broadcast individual txs if not seen /// Dandelion embargo timer. Fluff and broadcast individual txs if not seen
/// on network before embargo expires. /// on network before embargo expires.
#[serde(default = "default_dandelion_embargo_secs")] #[serde(default = "default_dandelion_embargo_secs")]
pub embargo_secs: Option<u16>, pub embargo_secs: u16,
/// Dandelion aggregation timer. /// Dandelion aggregation timer.
#[serde(default = "default_dandelion_aggregation_secs")] #[serde(default = "default_dandelion_aggregation_secs")]
pub aggregation_secs: Option<u16>, pub aggregation_secs: u16,
/// Dandelion stem probability (stem 90% of the time, fluff 10% etc.) /// Dandelion stem probability (stem 90% of the time, fluff 10% etc.)
#[serde(default = "default_dandelion_stem_probability")] #[serde(default = "default_dandelion_stem_probability")]
pub stem_probability: Option<u8>, pub stem_probability: u8,
/// Default to always stem our txs as described in Dandelion++ paper.
#[serde(default = "default_dandelion_always_stem_our_txs")]
pub always_stem_our_txs: bool,
} }
impl Default for DandelionConfig { impl Default for DandelionConfig {
@ -65,24 +74,29 @@ impl Default for DandelionConfig {
embargo_secs: default_dandelion_embargo_secs(), embargo_secs: default_dandelion_embargo_secs(),
aggregation_secs: default_dandelion_aggregation_secs(), aggregation_secs: default_dandelion_aggregation_secs(),
stem_probability: default_dandelion_stem_probability(), stem_probability: default_dandelion_stem_probability(),
always_stem_our_txs: default_dandelion_always_stem_our_txs(),
} }
} }
} }
fn default_dandelion_epoch_secs() -> Option<u16> { fn default_dandelion_epoch_secs() -> u16 {
Some(DANDELION_EPOCH_SECS) DANDELION_EPOCH_SECS
} }
fn default_dandelion_embargo_secs() -> Option<u16> { fn default_dandelion_embargo_secs() -> u16 {
Some(DANDELION_EMBARGO_SECS) DANDELION_EMBARGO_SECS
} }
fn default_dandelion_aggregation_secs() -> Option<u16> { fn default_dandelion_aggregation_secs() -> u16 {
Some(DANDELION_AGGREGATION_SECS) DANDELION_AGGREGATION_SECS
} }
fn default_dandelion_stem_probability() -> Option<u8> { fn default_dandelion_stem_probability() -> u8 {
Some(DANDELION_STEM_PROBABILITY) DANDELION_STEM_PROBABILITY
}
fn default_dandelion_always_stem_our_txs() -> bool {
DANDELION_ALWAYS_STEM_OUR_TXS
} }
/// Transaction pool configuration /// Transaction pool configuration
@ -145,20 +159,29 @@ pub struct PoolEntry {
pub tx: Transaction, pub tx: Transaction,
} }
/// Placeholder: the data representing where we heard about a tx from.
///
/// Used to make decisions based on transaction acceptance priority from /// Used to make decisions based on transaction acceptance priority from
/// various sources. For example, a node may want to bypass pool size /// various sources. For example, a node may want to bypass pool size
/// restrictions when accepting a transaction from a local wallet. /// restrictions when accepting a transaction from a local wallet.
/// ///
/// Most likely this will evolve to contain some sort of network identifier, /// Most likely this will evolve to contain some sort of network identifier,
/// once we get a better sense of what transaction building might look like. /// once we get a better sense of what transaction building might look like.
#[derive(Clone, Debug)] #[derive(Clone, Debug, PartialEq)]
pub struct TxSource { pub enum TxSource {
/// Human-readable name used for logging and errors. PushApi,
pub debug_name: String, Broadcast,
/// Unique identifier used to distinguish this peer from others. Fluff,
pub identifier: String, EmbargoExpired,
Deaggregate,
}
impl TxSource {
/// Convenience fn for checking if this tx was sourced via the push api.
pub fn is_pushed(&self) -> bool {
match self {
TxSource::PushApi => true,
_ => false,
}
}
} }
/// Possible errors when interacting with the transaction pool. /// Possible errors when interacting with the transaction pool.
@ -250,10 +273,10 @@ pub trait BlockChain: Sync + Send {
/// importantly the broadcasting of transactions to our peers. /// importantly the broadcasting of transactions to our peers.
pub trait PoolAdapter: Send + Sync { pub trait PoolAdapter: Send + Sync {
/// The transaction pool has accepted this transaction as valid. /// The transaction pool has accepted this transaction as valid.
fn tx_accepted(&self, tx: &transaction::Transaction); fn tx_accepted(&self, entry: &PoolEntry);
/// The stem transaction pool has accepted this transactions as valid. /// The stem transaction pool has accepted this transactions as valid.
fn stem_tx_accepted(&self, tx: &transaction::Transaction) -> Result<(), PoolError>; fn stem_tx_accepted(&self, entry: &PoolEntry) -> Result<(), PoolError>;
} }
/// Dummy adapter used as a placeholder for real implementations /// Dummy adapter used as a placeholder for real implementations
@ -261,8 +284,8 @@ pub trait PoolAdapter: Send + Sync {
pub struct NoopAdapter {} pub struct NoopAdapter {}
impl PoolAdapter for NoopAdapter { impl PoolAdapter for NoopAdapter {
fn tx_accepted(&self, _tx: &transaction::Transaction) {} fn tx_accepted(&self, _entry: &PoolEntry) {}
fn stem_tx_accepted(&self, _tx: &transaction::Transaction) -> Result<(), PoolError> { fn stem_tx_accepted(&self, _entry: &PoolEntry) -> Result<(), PoolError> {
Ok(()) Ok(())
} }
} }

View file

@ -229,10 +229,7 @@ where
} }
pub fn test_source() -> TxSource { pub fn test_source() -> TxSource {
TxSource { TxSource::Broadcast
debug_name: format!("test"),
identifier: format!("127.0.0.1"),
}
} }
pub fn clean_output_dir(db_root: String) { pub fn clean_output_dir(db_root: String) {

View file

@ -19,10 +19,12 @@ use self::core::core::{transaction, Block, BlockHeader, Weighting};
use self::core::libtx; use self::core::libtx;
use self::core::pow::Difficulty; use self::core::pow::Difficulty;
use self::keychain::{ExtKeychain, Keychain}; use self::keychain::{ExtKeychain, Keychain};
use self::pool::TxSource;
use self::util::RwLock; use self::util::RwLock;
use crate::common::*; use crate::common::*;
use grin_core as core; use grin_core as core;
use grin_keychain as keychain; use grin_keychain as keychain;
use grin_pool as pool;
use grin_util as util; use grin_util as util;
use std::sync::Arc; use std::sync::Arc;
@ -237,7 +239,7 @@ fn test_the_transaction_pool() {
assert_eq!(write_pool.total_size(), 6); assert_eq!(write_pool.total_size(), 6);
let entry = write_pool.txpool.entries.last().unwrap(); let entry = write_pool.txpool.entries.last().unwrap();
assert_eq!(entry.tx.kernels().len(), 1); assert_eq!(entry.tx.kernels().len(), 1);
assert_eq!(entry.src.debug_name, "deagg"); assert_eq!(entry.src, TxSource::Deaggregate);
} }
// Check we cannot "double spend" an output spent in a previous block. // Check we cannot "double spend" an output spent in a previous block.
@ -447,7 +449,7 @@ fn test_the_transaction_pool() {
assert_eq!(write_pool.total_size(), 6); assert_eq!(write_pool.total_size(), 6);
let entry = write_pool.txpool.entries.last().unwrap(); let entry = write_pool.txpool.entries.last().unwrap();
assert_eq!(entry.tx.kernels().len(), 1); assert_eq!(entry.tx.kernels().len(), 1);
assert_eq!(entry.src.debug_name, "deagg"); assert_eq!(entry.src, TxSource::Deaggregate);
} }
// Check we cannot "double spend" an output spent in a previous block. // Check we cannot "double spend" an output spent in a previous block.

View file

@ -37,7 +37,6 @@ use crate::core::{core, global};
use crate::p2p; use crate::p2p;
use crate::p2p::types::PeerInfo; use crate::p2p::types::PeerInfo;
use crate::pool; use crate::pool;
use crate::pool::types::DandelionConfig;
use crate::util::OneTime; use crate::util::OneTime;
use chrono::prelude::*; use chrono::prelude::*;
use chrono::Duration; use chrono::Duration;
@ -97,10 +96,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
return Ok(true); return Ok(true);
} }
let source = pool::TxSource { let source = pool::TxSource::Broadcast;
debug_name: "p2p".to_string(),
identifier: "?.?.?.?".to_string(),
};
let header = self.chain().head_header()?; let header = self.chain().head_header()?;
@ -804,11 +800,11 @@ impl DandelionAdapter for PoolToNetAdapter {
} }
impl pool::PoolAdapter for PoolToNetAdapter { impl pool::PoolAdapter for PoolToNetAdapter {
fn tx_accepted(&self, tx: &core::Transaction) { fn tx_accepted(&self, entry: &pool::PoolEntry) {
self.peers().broadcast_transaction(tx); self.peers().broadcast_transaction(&entry.tx);
} }
fn stem_tx_accepted(&self, tx: &core::Transaction) -> Result<(), pool::PoolError> { fn stem_tx_accepted(&self, entry: &pool::PoolEntry) -> Result<(), pool::PoolError> {
// Take write lock on the current epoch. // Take write lock on the current epoch.
// We need to be able to update the current relay peer if not currently connected. // We need to be able to update the current relay peer if not currently connected.
let mut epoch = self.dandelion_epoch.write(); let mut epoch = self.dandelion_epoch.write();
@ -816,9 +812,10 @@ impl pool::PoolAdapter for PoolToNetAdapter {
// If "stem" epoch attempt to relay the tx to the next Dandelion relay. // If "stem" epoch attempt to relay the tx to the next Dandelion relay.
// Fallback to immediately fluffing the tx if we cannot stem for any reason. // Fallback to immediately fluffing the tx if we cannot stem for any reason.
// If "fluff" epoch then nothing to do right now (fluff via Dandelion monitor). // If "fluff" epoch then nothing to do right now (fluff via Dandelion monitor).
if epoch.is_stem() { // If node is configured to always stem our (pushed via api) txs then do so.
if epoch.is_stem() || (entry.src.is_pushed() && epoch.always_stem_our_txs()) {
if let Some(peer) = epoch.relay_peer(&self.peers()) { if let Some(peer) = epoch.relay_peer(&self.peers()) {
match peer.send_stem_transaction(tx) { match peer.send_stem_transaction(&entry.tx) {
Ok(_) => { Ok(_) => {
info!("Stemming this epoch, relaying to next peer."); info!("Stemming this epoch, relaying to next peer.");
Ok(()) Ok(())
@ -841,7 +838,7 @@ impl pool::PoolAdapter for PoolToNetAdapter {
impl PoolToNetAdapter { impl PoolToNetAdapter {
/// Create a new pool to net adapter /// Create a new pool to net adapter
pub fn new(config: DandelionConfig) -> PoolToNetAdapter { pub fn new(config: pool::DandelionConfig) -> PoolToNetAdapter {
PoolToNetAdapter { PoolToNetAdapter {
peers: OneTime::new(), peers: OneTime::new(),
dandelion_epoch: Arc::new(RwLock::new(DandelionEpoch::new(config))), dandelion_epoch: Arc::new(RwLock::new(DandelionEpoch::new(config))),

View file

@ -496,8 +496,8 @@ impl DandelionEpoch {
match self.start_time { match self.start_time {
None => true, None => true,
Some(start_time) => { Some(start_time) => {
let epoch_secs = self.config.epoch_secs.expect("epoch_secs config missing") as i64; let epoch_secs = self.config.epoch_secs;
Utc::now().timestamp().saturating_sub(start_time) > epoch_secs Utc::now().timestamp().saturating_sub(start_time) > epoch_secs as i64
} }
} }
} }
@ -511,10 +511,7 @@ impl DandelionEpoch {
// If stem_probability == 90 then we stem 90% of the time. // If stem_probability == 90 then we stem 90% of the time.
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
let stem_probability = self let stem_probability = self.config.stem_probability;
.config
.stem_probability
.expect("stem_probability config missing");
self.is_stem = rng.gen_range(0, 100) < stem_probability; self.is_stem = rng.gen_range(0, 100) < stem_probability;
let addr = self.relay_peer.clone().map(|p| p.info.addr); let addr = self.relay_peer.clone().map(|p| p.info.addr);
@ -529,6 +526,11 @@ impl DandelionEpoch {
self.is_stem self.is_stem
} }
/// Always stem our (pushed via api) txs regardless of stem/fluff epoch?
pub fn always_stem_our_txs(&self) -> bool {
self.config.always_stem_our_txs
}
/// What is our current relay peer? /// What is our current relay peer?
/// If it is not connected then choose a new one. /// If it is not connected then choose a new one.
pub fn relay_peer(&mut self, peers: &Arc<p2p::Peers>) -> Option<Arc<p2p::Peer>> { pub fn relay_peer(&mut self, peers: &Arc<p2p::Peers>) -> Option<Arc<p2p::Peer>> {

View file

@ -113,9 +113,7 @@ fn process_fluff_phase(
return Ok(()); return Ok(());
} }
let cutoff_secs = dandelion_config let cutoff_secs = dandelion_config.aggregation_secs;
.aggregation_secs
.expect("aggregation secs config missing");
let cutoff_entries = select_txs_cutoff(&tx_pool.stempool, cutoff_secs); let cutoff_entries = select_txs_cutoff(&tx_pool.stempool, cutoff_secs);
// If epoch is expired, fluff *all* outstanding entries in stempool. // If epoch is expired, fluff *all* outstanding entries in stempool.
@ -149,12 +147,7 @@ fn process_fluff_phase(
verifier_cache.clone(), verifier_cache.clone(),
)?; )?;
let src = TxSource { tx_pool.add_to_pool(TxSource::Fluff, agg_tx, false, &header)?;
debug_name: "fluff".to_string(),
identifier: "?.?.?.?".to_string(),
};
tx_pool.add_to_pool(src, agg_tx, false, &header)?;
Ok(()) Ok(())
} }
@ -165,10 +158,7 @@ fn process_expired_entries(
// Take a write lock on the txpool for the duration of this processing. // Take a write lock on the txpool for the duration of this processing.
let mut tx_pool = tx_pool.write(); let mut tx_pool = tx_pool.write();
let embargo_secs = dandelion_config let embargo_secs = dandelion_config.embargo_secs + thread_rng().gen_range(0, 31);
.embargo_secs
.expect("embargo_secs config missing")
+ thread_rng().gen_range(0, 31);
let expired_entries = select_txs_cutoff(&tx_pool.stempool, embargo_secs); let expired_entries = select_txs_cutoff(&tx_pool.stempool, embargo_secs);
if expired_entries.is_empty() { if expired_entries.is_empty() {
@ -179,14 +169,9 @@ fn process_expired_entries(
let header = tx_pool.chain_head()?; let header = tx_pool.chain_head()?;
let src = TxSource {
debug_name: "embargo_expired".to_string(),
identifier: "?.?.?.?".to_string(),
};
for entry in expired_entries { for entry in expired_entries {
let txhash = entry.tx.hash(); let txhash = entry.tx.hash();
match tx_pool.add_to_pool(src.clone(), entry.tx, false, &header) { match tx_pool.add_to_pool(TxSource::EmbargoExpired, entry.tx, false, &header) {
Ok(_) => info!( Ok(_) => info!(
"dand_mon: embargo expired for {}, fluffed successfully.", "dand_mon: embargo expired for {}, fluffed successfully.",
txhash txhash