diff --git a/config/src/comments.rs b/config/src/comments.rs index 1849b656f..b5e3f6d97 100644 --- a/config/src/comments.rs +++ b/config/src/comments.rs @@ -141,9 +141,17 @@ fn comments() -> HashMap { ); retval.insert( - "relay_secs".to_string(), + "epoch_secs".to_string(), " -#dandelion relay time (choose new relay peer every n secs) +#dandelion epoch duration +" + .to_string(), + ); + + retval.insert( + "aggregation_secs".to_string(), + " +#dandelion aggregation period in secs " .to_string(), ); @@ -156,13 +164,6 @@ fn comments() -> HashMap { .to_string(), ); - retval.insert( - "patience_secs".to_string(), - " -#run dandelion stem/fluff processing every n secs (stem tx aggregation in this window) -" - .to_string(), - ); retval.insert( "stem_probability".to_string(), " diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 1b50bf59a..e57d25a84 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -13,6 +13,7 @@ // limitations under the License. use crate::util::{Mutex, RwLock}; +use std::fmt; use std::fs::File; use std::net::{Shutdown, TcpStream}; use std::sync::Arc; @@ -54,6 +55,12 @@ pub struct Peer { connection: Option>, } +impl fmt::Debug for Peer { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Peer({:?})", &self.info) + } +} + impl Peer { // Only accept and connect can be externally used to build a peer fn new(info: PeerInfo, adapter: Arc) -> Peer { diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index 19713b7d8..da1a4067d 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -37,7 +37,6 @@ pub struct Peers { pub adapter: Arc, store: PeerStore, peers: RwLock>>, - dandelion_relay: RwLock)>>, config: P2PConfig, } @@ -48,7 +47,6 @@ impl Peers { store, config, peers: RwLock::new(HashMap::new()), - dandelion_relay: RwLock::new(None), } } @@ -87,39 +85,6 @@ impl Peers { self.save_peer(&peer_data) } - // Update the dandelion relay - pub fn update_dandelion_relay(&self) { - let peers = self.outgoing_connected_peers(); - - let peer = &self - .config - .dandelion_peer - .and_then(|ip| peers.iter().find(|x| x.info.addr == ip)) - .or(thread_rng().choose(&peers)); - - match peer { - Some(peer) => self.set_dandelion_relay(peer), - None => debug!("Could not update dandelion relay"), - } - } - - fn set_dandelion_relay(&self, peer: &Arc) { - // Clear the map and add new relay - let dandelion_relay = &self.dandelion_relay; - dandelion_relay - .write() - .replace((Utc::now().timestamp(), peer.clone())); - debug!( - "Successfully updated Dandelion relay to: {}", - peer.info.addr - ); - } - - // Get the dandelion relay - pub fn get_dandelion_relay(&self) -> Option<(i64, Arc)> { - self.dandelion_relay.read().clone() - } - pub fn is_known(&self, addr: PeerAddr) -> bool { self.peers.read().contains_key(&addr) } @@ -335,26 +300,6 @@ impl Peers { ); } - /// Relays the provided stem transaction to our single stem peer. - pub fn relay_stem_transaction(&self, tx: &core::Transaction) -> Result<(), Error> { - self.get_dandelion_relay() - .or_else(|| { - debug!("No dandelion relay, updating."); - self.update_dandelion_relay(); - self.get_dandelion_relay() - }) - // If still return an error, let the caller handle this as they see fit. - // The caller will "fluff" at this point as the stem phase is finished. - .ok_or(Error::NoDandelionRelay) - .map(|(_, relay)| { - if relay.is_connected() { - if let Err(e) = relay.send_stem_transaction(tx) { - debug!("Error sending stem transaction to peer relay: {:?}", e); - } - } - }) - } - /// Broadcasts the provided transaction to PEER_PREFERRED_COUNT of our /// peers. We may be connected to PEER_MAX_COUNT peers so we only /// want to broadcast to a random subset of peers. diff --git a/pool/src/lib.rs b/pool/src/lib.rs index b6ccc56c7..635e00f53 100644 --- a/pool/src/lib.rs +++ b/pool/src/lib.rs @@ -34,7 +34,8 @@ mod pool; pub mod transaction_pool; pub mod types; +pub use crate::pool::Pool; pub use crate::transaction_pool::TransactionPool; pub use crate::types::{ - BlockChain, DandelionConfig, PoolAdapter, PoolConfig, PoolEntryState, PoolError, TxSource, + BlockChain, DandelionConfig, PoolAdapter, PoolConfig, PoolEntry, PoolError, TxSource, }; diff --git a/pool/src/pool.rs b/pool/src/pool.rs index 5f2e94420..a7d355f92 100644 --- a/pool/src/pool.rs +++ b/pool/src/pool.rs @@ -23,7 +23,7 @@ use self::core::core::{ Block, BlockHeader, BlockSums, Committed, Transaction, TxKernel, Weighting, }; use self::util::RwLock; -use crate::types::{BlockChain, PoolEntry, PoolEntryState, PoolError}; +use crate::types::{BlockChain, PoolEntry, PoolError}; use grin_core as core; use grin_util as util; use std::collections::{HashMap, HashSet}; @@ -139,7 +139,7 @@ impl Pool { // Verify these txs produce an aggregated tx below max tx weight. // Return a vec of all the valid txs. let txs = self.validate_raw_txs( - tx_buckets, + &tx_buckets, None, &header, Weighting::AsLimitedTransaction { max_weight }, @@ -167,33 +167,6 @@ impl Pool { Ok(Some(tx)) } - pub fn select_valid_transactions( - &self, - txs: Vec, - extra_tx: Option, - header: &BlockHeader, - ) -> Result, PoolError> { - let valid_txs = self.validate_raw_txs(txs, extra_tx, header, Weighting::NoLimit)?; - Ok(valid_txs) - } - - pub fn get_transactions_in_state(&self, state: PoolEntryState) -> Vec { - self.entries - .iter() - .filter(|x| x.state == state) - .map(|x| x.tx.clone()) - .collect::>() - } - - // Transition the specified pool entries to the new state. - pub fn transition_to_state(&mut self, txs: &[Transaction], state: PoolEntryState) { - for x in &mut self.entries { - if txs.contains(&x.tx) { - x.state = state; - } - } - } - // Aggregate this new tx with all existing txs in the pool. // If we can validate the aggregated tx against the current chain state // then we can safely add the tx to the pool. @@ -267,9 +240,9 @@ impl Pool { Ok(new_sums) } - fn validate_raw_txs( + pub fn validate_raw_txs( &self, - txs: Vec, + txs: &[Transaction], extra_tx: Option, header: &BlockHeader, weighting: Weighting, @@ -289,7 +262,7 @@ impl Pool { // We know the tx is valid if the entire aggregate tx is valid. if self.validate_raw_tx(&agg_tx, header, weighting).is_ok() { - valid_txs.push(tx); + valid_txs.push(tx.clone()); } } diff --git a/pool/src/transaction_pool.rs b/pool/src/transaction_pool.rs index becd7a788..f0e0d81de 100644 --- a/pool/src/transaction_pool.rs +++ b/pool/src/transaction_pool.rs @@ -23,9 +23,7 @@ use self::core::core::verifier_cache::VerifierCache; use self::core::core::{transaction, Block, BlockHeader, Transaction, Weighting}; use self::util::RwLock; use crate::pool::Pool; -use crate::types::{ - BlockChain, PoolAdapter, PoolConfig, PoolEntry, PoolEntryState, PoolError, TxSource, -}; +use crate::types::{BlockChain, PoolAdapter, PoolConfig, PoolEntry, PoolError, TxSource}; use chrono::prelude::*; use grin_core as core; use grin_util as util; @@ -76,13 +74,10 @@ impl TransactionPool { self.blockchain.chain_head() } + // Add tx to stempool (passing in all txs from txpool to validate against). fn add_to_stempool(&mut self, entry: PoolEntry, header: &BlockHeader) -> Result<(), PoolError> { - // Add tx to stempool (passing in all txs from txpool to validate against). self.stempool .add_to_pool(entry, self.txpool.all_transactions(), header)?; - - // Note: we do not notify the adapter here, - // we let the dandelion monitor handle this. Ok(()) } @@ -124,8 +119,6 @@ impl TransactionPool { let txpool_tx = self.txpool.all_transactions_aggregate()?; self.stempool.reconcile(txpool_tx, header)?; } - - self.adapter.tx_accepted(&entry.tx); Ok(()) } @@ -159,28 +152,25 @@ impl TransactionPool { self.blockchain.verify_coinbase_maturity(&tx)?; let entry = PoolEntry { - state: PoolEntryState::Fresh, src, tx_at: Utc::now(), tx, }; - // If we are in "stem" mode then check if this is a new tx or if we have seen it before. - // If new tx - add it to our stempool. - // If we have seen any of the kernels before then fallback to fluff, - // adding directly to txpool. - if stem - && self - .stempool - .find_matching_transactions(entry.tx.kernels()) - .is_empty() + // If not stem then we are fluff. + // If this is a stem tx then attempt to stem. + // Any problems during stem, fallback to fluff. + if !stem + || self + .add_to_stempool(entry.clone(), header) + .and_then(|_| self.adapter.stem_tx_accepted(&entry.tx)) + .is_err() { - self.add_to_stempool(entry, header)?; - return Ok(()); + self.add_to_txpool(entry.clone(), header)?; + self.add_to_reorg_cache(entry.clone()); + self.adapter.tx_accepted(&entry.tx); } - self.add_to_txpool(entry.clone(), header)?; - self.add_to_reorg_cache(entry); Ok(()) } diff --git a/pool/src/types.rs b/pool/src/types.rs index 04f1a1ee2..20397a514 100644 --- a/pool/src/types.rs +++ b/pool/src/types.rs @@ -27,62 +27,61 @@ use failure::Fail; use grin_core as core; use grin_keychain as keychain; -/// Dandelion relay timer -const DANDELION_RELAY_SECS: u64 = 600; +/// Dandelion "epoch" length. +const DANDELION_EPOCH_SECS: u16 = 600; -/// Dandelion embargo timer -const DANDELION_EMBARGO_SECS: u64 = 180; +/// Dandelion embargo timer. +const DANDELION_EMBARGO_SECS: u16 = 180; -/// Dandelion patience timer -const DANDELION_PATIENCE_SECS: u64 = 10; +/// Dandelion aggregation timer. +const DANDELION_AGGREGATION_SECS: u16 = 30; /// Dandelion stem probability (stem 90% of the time, fluff 10%). -const DANDELION_STEM_PROBABILITY: usize = 90; +const DANDELION_STEM_PROBABILITY: u8 = 90; /// Configuration for "Dandelion". /// Note: shared between p2p and pool. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct DandelionConfig { - /// Choose new Dandelion relay peer every n secs. - #[serde = "default_dandelion_relay_secs"] - pub relay_secs: Option, - /// Dandelion embargo, fluff and broadcast tx if not seen on network before - /// embargo expires. - #[serde = "default_dandelion_embargo_secs"] - pub embargo_secs: Option, - /// Dandelion patience timer, fluff/stem processing runs every n secs. - /// Tx aggregation happens on stem txs received within this window. - #[serde = "default_dandelion_patience_secs"] - pub patience_secs: Option, + /// Length of each "epoch". + #[serde(default = "default_dandelion_epoch_secs")] + pub epoch_secs: Option, + /// Dandelion embargo timer. Fluff and broadcast individual txs if not seen + /// on network before embargo expires. + #[serde(default = "default_dandelion_embargo_secs")] + pub embargo_secs: Option, + /// Dandelion aggregation timer. + #[serde(default = "default_dandelion_aggregation_secs")] + pub aggregation_secs: Option, /// Dandelion stem probability (stem 90% of the time, fluff 10% etc.) - #[serde = "default_dandelion_stem_probability"] - pub stem_probability: Option, + #[serde(default = "default_dandelion_stem_probability")] + pub stem_probability: Option, } impl Default for DandelionConfig { fn default() -> DandelionConfig { DandelionConfig { - relay_secs: default_dandelion_relay_secs(), + epoch_secs: default_dandelion_epoch_secs(), embargo_secs: default_dandelion_embargo_secs(), - patience_secs: default_dandelion_patience_secs(), + aggregation_secs: default_dandelion_aggregation_secs(), stem_probability: default_dandelion_stem_probability(), } } } -fn default_dandelion_relay_secs() -> Option { - Some(DANDELION_RELAY_SECS) +fn default_dandelion_epoch_secs() -> Option { + Some(DANDELION_EPOCH_SECS) } -fn default_dandelion_embargo_secs() -> Option { +fn default_dandelion_embargo_secs() -> Option { Some(DANDELION_EMBARGO_SECS) } -fn default_dandelion_patience_secs() -> Option { - Some(DANDELION_PATIENCE_SECS) +fn default_dandelion_aggregation_secs() -> Option { + Some(DANDELION_AGGREGATION_SECS) } -fn default_dandelion_stem_probability() -> Option { +fn default_dandelion_stem_probability() -> Option { Some(DANDELION_STEM_PROBABILITY) } @@ -138,8 +137,6 @@ fn default_mineable_max_weight() -> usize { /// A single (possibly aggregated) transaction. #[derive(Clone, Debug)] pub struct PoolEntry { - /// The state of the pool entry. - pub state: PoolEntryState, /// Info on where this tx originated from. pub src: TxSource, /// Timestamp of when this tx was originally added to the pool. @@ -148,21 +145,6 @@ pub struct PoolEntry { pub tx: Transaction, } -/// The possible states a pool entry can be in. -#[derive(Clone, Copy, Debug, PartialEq)] -pub enum PoolEntryState { - /// A new entry, not yet processed. - Fresh, - /// Tx to be included in the next "stem" run. - ToStem, - /// Tx previously "stemmed" and propagated. - Stemmed, - /// Tx to be included in the next "fluff" run. - ToFluff, - /// Tx previously "fluffed" and broadcast. - Fluffed, -} - /// Placeholder: the data representing where we heard about a tx from. /// /// Used to make decisions based on transaction acceptance priority from @@ -267,12 +249,10 @@ pub trait BlockChain: Sync + Send { /// downstream processing of valid transactions by the rest of the system, most /// importantly the broadcasting of transactions to our peers. pub trait PoolAdapter: Send + Sync { - /// The transaction pool has accepted this transactions as valid and added - /// it to its internal cache. + /// The transaction pool has accepted this transaction as valid. fn tx_accepted(&self, tx: &transaction::Transaction); - /// The stem transaction pool has accepted this transactions as valid and - /// added it to its internal cache, we have waited for the "patience" timer - /// to fire and we now want to propagate the tx to the next Dandelion relay. + + /// The stem transaction pool has accepted this transactions as valid. fn stem_tx_accepted(&self, tx: &transaction::Transaction) -> Result<(), PoolError>; } @@ -281,9 +261,8 @@ pub trait PoolAdapter: Send + Sync { pub struct NoopAdapter {} impl PoolAdapter for NoopAdapter { - fn tx_accepted(&self, _: &transaction::Transaction) {} - - fn stem_tx_accepted(&self, _: &transaction::Transaction) -> Result<(), PoolError> { + fn tx_accepted(&self, _tx: &transaction::Transaction) {} + fn stem_tx_accepted(&self, _tx: &transaction::Transaction) -> Result<(), PoolError> { Ok(()) } } diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index d684fb31f..f6e6f71c1 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -23,7 +23,9 @@ use std::time::Instant; use crate::chain::{self, BlockStatus, ChainAdapter, Options}; use crate::common::hooks::{ChainEvents, NetEvents}; -use crate::common::types::{self, ChainValidationMode, ServerConfig, SyncState, SyncStatus}; +use crate::common::types::{ + self, ChainValidationMode, DandelionEpoch, ServerConfig, SyncState, SyncStatus, +}; use crate::core::core::hash::{Hash, Hashed}; use crate::core::core::transaction::Transaction; use crate::core::core::verifier_cache::VerifierCache; @@ -33,6 +35,7 @@ use crate::core::{core, global}; use crate::p2p; use crate::p2p::types::PeerAddr; use crate::pool; +use crate::pool::types::DandelionConfig; use crate::util::OneTime; use chrono::prelude::*; use chrono::Duration; @@ -685,26 +688,77 @@ impl ChainToPoolAndNetAdapter { /// transactions that have been accepted. pub struct PoolToNetAdapter { peers: OneTime>, + dandelion_epoch: Arc>, +} + +/// Adapter between the Dandelion monitor and the current Dandelion "epoch". +pub trait DandelionAdapter: Send + Sync { + /// Is the node stemming (or fluffing) transactions in the current epoch? + fn is_stem(&self) -> bool; + + /// Is the current Dandelion epoch expired? + fn is_expired(&self) -> bool; + + /// Transition to the next Dandelion epoch (new stem/fluff state, select new relay peer). + fn next_epoch(&self); +} + +impl DandelionAdapter for PoolToNetAdapter { + fn is_stem(&self) -> bool { + self.dandelion_epoch.read().is_stem() + } + + fn is_expired(&self) -> bool { + self.dandelion_epoch.read().is_expired() + } + + fn next_epoch(&self) { + self.dandelion_epoch.write().next_epoch(&self.peers()); + } } impl pool::PoolAdapter for PoolToNetAdapter { - fn stem_tx_accepted(&self, tx: &core::Transaction) -> Result<(), pool::PoolError> { - self.peers() - .relay_stem_transaction(tx) - .map_err(|_| pool::PoolError::DandelionError)?; - Ok(()) - } - fn tx_accepted(&self, tx: &core::Transaction) { self.peers().broadcast_transaction(tx); } + + fn stem_tx_accepted(&self, tx: &core::Transaction) -> Result<(), pool::PoolError> { + // Take write lock on the current epoch. + // We need to be able to update the current relay peer if not currently connected. + let mut epoch = self.dandelion_epoch.write(); + + // If "stem" epoch attempt to relay the tx to the next Dandelion relay. + // Fallback to immediately fluffing the tx if we cannot stem for any reason. + // If "fluff" epoch then nothing to do right now (fluff via Dandelion monitor). + if epoch.is_stem() { + if let Some(peer) = epoch.relay_peer(&self.peers()) { + match peer.send_stem_transaction(tx) { + Ok(_) => { + info!("Stemming this epoch, relaying to next peer."); + Ok(()) + } + Err(e) => { + error!("Stemming tx failed. Fluffing. {:?}", e); + Err(pool::PoolError::DandelionError) + } + } + } else { + error!("No relay peer. Fluffing."); + Err(pool::PoolError::DandelionError) + } + } else { + info!("Fluff epoch. Aggregating stem tx(s). Will fluff via Dandelion monitor."); + Ok(()) + } + } } impl PoolToNetAdapter { /// Create a new pool to net adapter - pub fn new() -> PoolToNetAdapter { + pub fn new(config: DandelionConfig) -> PoolToNetAdapter { PoolToNetAdapter { peers: OneTime::new(), + dandelion_epoch: Arc::new(RwLock::new(DandelionEpoch::new(config))), } } diff --git a/servers/src/common/types.rs b/servers/src/common/types.rs index ee3d19971..374bddef2 100644 --- a/servers/src/common/types.rs +++ b/servers/src/common/types.rs @@ -13,18 +13,21 @@ // limitations under the License. //! Server types -use crate::util::RwLock; use std::convert::From; use std::sync::Arc; +use chrono::prelude::{DateTime, Utc}; +use rand::prelude::*; + use crate::api; use crate::chain; use crate::core::global::ChainTypes; use crate::core::{core, pow}; use crate::p2p; use crate::pool; +use crate::pool::types::DandelionConfig; use crate::store; -use chrono::prelude::{DateTime, Utc}; +use crate::util::RwLock; /// Error type wrapping underlying module errors. #[derive(Debug)] @@ -437,3 +440,94 @@ impl chain::TxHashsetWriteStatus for SyncState { self.update(SyncStatus::TxHashsetDone); } } + +/// A node is either "stem" of "fluff" for the duration of a single epoch. +/// A node also maintains an outbound relay peer for the epoch. +#[derive(Debug)] +pub struct DandelionEpoch { + config: DandelionConfig, + // When did this epoch start? + start_time: Option, + // Are we in "stem" mode or "fluff" mode for this epoch? + is_stem: bool, + // Our current Dandelion relay peer (effective for this epoch). + relay_peer: Option>, +} + +impl DandelionEpoch { + /// Create a new Dandelion epoch, defaulting to "stem" and no outbound relay peer. + pub fn new(config: DandelionConfig) -> DandelionEpoch { + DandelionEpoch { + config, + start_time: None, + is_stem: true, + relay_peer: None, + } + } + + /// Is the current Dandelion epoch expired? + /// It is expired if start_time is older than the configured epoch_secs. + pub fn is_expired(&self) -> bool { + match self.start_time { + None => true, + Some(start_time) => { + let epoch_secs = self.config.epoch_secs.expect("epoch_secs config missing") as i64; + Utc::now().timestamp().saturating_sub(start_time) > epoch_secs + } + } + } + + /// Transition to next Dandelion epoch. + /// Select stem/fluff based on configured stem_probability. + /// Choose a new outbound stem relay peer. + pub fn next_epoch(&mut self, peers: &Arc) { + self.start_time = Some(Utc::now().timestamp()); + self.relay_peer = peers.outgoing_connected_peers().first().cloned(); + + // If stem_probability == 90 then we stem 90% of the time. + let mut rng = rand::thread_rng(); + let stem_probability = self + .config + .stem_probability + .expect("stem_probability config missing"); + self.is_stem = rng.gen_range(0, 100) < stem_probability; + + let addr = self.relay_peer.clone().map(|p| p.info.addr); + info!( + "DandelionEpoch: next_epoch: is_stem: {} ({}%), relay: {:?}", + self.is_stem, stem_probability, addr + ); + } + + /// Are we stemming (or fluffing) transactions in this epoch? + pub fn is_stem(&self) -> bool { + self.is_stem + } + + /// What is our current relay peer? + /// If it is not connected then choose a new one. + pub fn relay_peer(&mut self, peers: &Arc) -> Option> { + let mut update_relay = false; + if let Some(peer) = &self.relay_peer { + if !peer.is_connected() { + info!( + "DandelionEpoch: relay_peer: {:?} not connected, choosing a new one.", + peer.info.addr + ); + update_relay = true; + } + } else { + update_relay = true; + } + + if update_relay { + self.relay_peer = peers.outgoing_connected_peers().first().cloned(); + info!( + "DandelionEpoch: relay_peer: new peer chosen: {:?}", + self.relay_peer.clone().map(|p| p.info.addr) + ); + } + + self.relay_peer.clone() + } +} diff --git a/servers/src/grin/dandelion_monitor.rs b/servers/src/grin/dandelion_monitor.rs index 57fcbb308..665874298 100644 --- a/servers/src/grin/dandelion_monitor.rs +++ b/servers/src/grin/dandelion_monitor.rs @@ -12,17 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::util::{Mutex, RwLock, StopState}; use chrono::prelude::Utc; use rand::{thread_rng, Rng}; use std::sync::Arc; use std::thread; use std::time::Duration; +use crate::common::adapters::DandelionAdapter; use crate::core::core::hash::Hashed; use crate::core::core::transaction; use crate::core::core::verifier_cache::VerifierCache; -use crate::pool::{DandelionConfig, PoolEntryState, PoolError, TransactionPool, TxSource}; +use crate::pool::{DandelionConfig, Pool, PoolEntry, PoolError, TransactionPool, TxSource}; +use crate::util::{Mutex, RwLock, StopState}; /// A process to monitor transactions in the stempool. /// With Dandelion, transaction can be broadcasted in stem or fluff phase. @@ -35,6 +36,7 @@ use crate::pool::{DandelionConfig, PoolEntryState, PoolError, TransactionPool, T pub fn monitor_transactions( dandelion_config: DandelionConfig, tx_pool: Arc>, + adapter: Arc, verifier_cache: Arc>, stop_state: Arc>, ) { @@ -44,211 +46,143 @@ pub fn monitor_transactions( .name("dandelion".to_string()) .spawn(move || { loop { + // Halt Dandelion monitor if we have been notified that we are stopping. if stop_state.lock().is_stopped() { break; } - // This is the patience timer, we loop every n secs. - let patience_secs = dandelion_config.patience_secs.unwrap(); - thread::sleep(Duration::from_secs(patience_secs)); - - // Step 1: find all "ToStem" entries in stempool from last run. - // Aggregate them up to give a single (valid) aggregated tx and propagate it - // to the next Dandelion relay along the stem. - if process_stem_phase(tx_pool.clone(), verifier_cache.clone()).is_err() { - error!("dand_mon: Problem with stem phase."); + if !adapter.is_stem() { + let _ = + process_fluff_phase(&dandelion_config, &tx_pool, &adapter, &verifier_cache) + .map_err(|e| { + error!("dand_mon: Problem processing fluff phase. {:?}", e); + }); } - // Step 2: find all "ToFluff" entries in stempool from last run. - // Aggregate them up to give a single (valid) aggregated tx and (re)add it - // to our pool with stem=false (which will then broadcast it). - if process_fluff_phase(tx_pool.clone(), verifier_cache.clone()).is_err() { - error!("dand_mon: Problem with fluff phase."); + // Now find all expired entries based on embargo timer. + let _ = process_expired_entries(&dandelion_config, &tx_pool).map_err(|e| { + error!("dand_mon: Problem processing expired entries. {:?}", e); + }); + + // Handle the tx above *before* we transition to next epoch. + // This gives us an opportunity to do the final "fluff" before we start + // stemming on the subsequent epoch. + if adapter.is_expired() { + adapter.next_epoch(); } - // Step 3: now find all "Fresh" entries in stempool since last run. - // Coin flip for each (90/10) and label them as either "ToStem" or "ToFluff". - // We will process these in the next run (waiting patience secs). - if process_fresh_entries(dandelion_config.clone(), tx_pool.clone()).is_err() { - error!("dand_mon: Problem processing fresh pool entries."); - } - - // Step 4: now find all expired entries based on embargo timer. - if process_expired_entries(dandelion_config.clone(), tx_pool.clone()).is_err() { - error!("dand_mon: Problem processing fresh pool entries."); - } + // Monitor loops every 10s. + thread::sleep(Duration::from_secs(10)); } }); } -fn process_stem_phase( - tx_pool: Arc>, - verifier_cache: Arc>, -) -> Result<(), PoolError> { - let mut tx_pool = tx_pool.write(); - - let header = tx_pool.chain_head()?; - - let stem_txs = tx_pool - .stempool - .get_transactions_in_state(PoolEntryState::ToStem); - - if stem_txs.is_empty() { - return Ok(()); - } - - // Get the aggregate tx representing the entire txpool. - let txpool_tx = tx_pool.txpool.all_transactions_aggregate()?; - - let stem_txs = tx_pool - .stempool - .select_valid_transactions(stem_txs, txpool_tx, &header)?; - tx_pool - .stempool - .transition_to_state(&stem_txs, PoolEntryState::Stemmed); - - if stem_txs.len() > 0 { - debug!("dand_mon: Found {} txs for stemming.", stem_txs.len()); - - let agg_tx = transaction::aggregate(stem_txs)?; - agg_tx.validate( - transaction::Weighting::AsTransaction, - verifier_cache.clone(), - )?; - - let res = tx_pool.adapter.stem_tx_accepted(&agg_tx); - if res.is_err() { - debug!("dand_mon: Unable to propagate stem tx. No relay, fluffing instead."); - - let src = TxSource { - debug_name: "no_relay".to_string(), - identifier: "?.?.?.?".to_string(), - }; - - tx_pool.add_to_pool(src, agg_tx, false, &header)?; - } - } - Ok(()) +// Query the pool for transactions older than the cutoff. +// Used for both periodic fluffing and handling expired embargo timer. +fn select_txs_cutoff(pool: &Pool, cutoff_secs: u16) -> Vec { + let cutoff = Utc::now().timestamp() - cutoff_secs as i64; + pool.entries + .iter() + .filter(|x| x.tx_at.timestamp() < cutoff) + .cloned() + .collect() } fn process_fluff_phase( - tx_pool: Arc>, - verifier_cache: Arc>, + dandelion_config: &DandelionConfig, + tx_pool: &Arc>, + adapter: &Arc, + verifier_cache: &Arc>, ) -> Result<(), PoolError> { + // Take a write lock on the txpool for the duration of this processing. let mut tx_pool = tx_pool.write(); - let header = tx_pool.chain_head()?; - - let stem_txs = tx_pool - .stempool - .get_transactions_in_state(PoolEntryState::ToFluff); - - if stem_txs.is_empty() { + let all_entries = tx_pool.stempool.entries.clone(); + if all_entries.is_empty() { return Ok(()); } - // Get the aggregate tx representing the entire txpool. - let txpool_tx = tx_pool.txpool.all_transactions_aggregate()?; + let cutoff_secs = dandelion_config + .aggregation_secs + .expect("aggregation secs config missing"); + let cutoff_entries = select_txs_cutoff(&tx_pool.stempool, cutoff_secs); - let stem_txs = tx_pool - .stempool - .select_valid_transactions(stem_txs, txpool_tx, &header)?; - tx_pool - .stempool - .transition_to_state(&stem_txs, PoolEntryState::Fluffed); - - if stem_txs.len() > 0 { - debug!("dand_mon: Found {} txs for fluffing.", stem_txs.len()); - - let agg_tx = transaction::aggregate(stem_txs)?; - agg_tx.validate( - transaction::Weighting::AsTransaction, - verifier_cache.clone(), - )?; - - let src = TxSource { - debug_name: "fluff".to_string(), - identifier: "?.?.?.?".to_string(), - }; - - tx_pool.add_to_pool(src, agg_tx, false, &header)?; + // If epoch is expired, fluff *all* outstanding entries in stempool. + // If *any* entry older than aggregation_secs (30s) then fluff *all* entries. + // Otherwise we are done for now and we can give txs more time to aggregate. + if !adapter.is_expired() && cutoff_entries.is_empty() { + return Ok(()); } - Ok(()) -} -fn process_fresh_entries( - dandelion_config: DandelionConfig, - tx_pool: Arc>, -) -> Result<(), PoolError> { - let mut tx_pool = tx_pool.write(); + let header = tx_pool.chain_head()?; - let mut rng = thread_rng(); + let fluffable_txs = { + let txpool_tx = tx_pool.txpool.all_transactions_aggregate()?; + let txs: Vec<_> = all_entries.into_iter().map(|x| x.tx).collect(); + tx_pool.stempool.validate_raw_txs( + &txs, + txpool_tx, + &header, + transaction::Weighting::NoLimit, + )? + }; - let fresh_entries = &mut tx_pool - .stempool - .entries - .iter_mut() - .filter(|x| x.state == PoolEntryState::Fresh) - .collect::>(); + debug!( + "dand_mon: Found {} txs in local stempool to fluff", + fluffable_txs.len() + ); - if fresh_entries.len() > 0 { - debug!( - "dand_mon: Found {} fresh entries in stempool.", - fresh_entries.len() - ); + let agg_tx = transaction::aggregate(fluffable_txs)?; + agg_tx.validate( + transaction::Weighting::AsTransaction, + verifier_cache.clone(), + )?; - for x in &mut fresh_entries.iter_mut() { - let random = rng.gen_range(0, 101); - if random <= dandelion_config.stem_probability.unwrap() { - x.state = PoolEntryState::ToStem; - } else { - x.state = PoolEntryState::ToFluff; - } - } - } + let src = TxSource { + debug_name: "fluff".to_string(), + identifier: "?.?.?.?".to_string(), + }; + + tx_pool.add_to_pool(src, agg_tx, false, &header)?; Ok(()) } fn process_expired_entries( - dandelion_config: DandelionConfig, - tx_pool: Arc>, + dandelion_config: &DandelionConfig, + tx_pool: &Arc>, ) -> Result<(), PoolError> { - let now = Utc::now().timestamp(); - let embargo_sec = dandelion_config.embargo_secs.unwrap() + thread_rng().gen_range(0, 31); - let cutoff = now - embargo_sec as i64; + // Take a write lock on the txpool for the duration of this processing. + let mut tx_pool = tx_pool.write(); - let mut expired_entries = vec![]; - { - let tx_pool = tx_pool.read(); - for entry in tx_pool - .stempool - .entries - .iter() - .filter(|x| x.tx_at.timestamp() < cutoff) - { - debug!("dand_mon: Embargo timer expired for {:?}", entry.tx.hash()); - expired_entries.push(entry.clone()); - } + let embargo_secs = dandelion_config + .embargo_secs + .expect("embargo_secs config missing") + + thread_rng().gen_range(0, 31); + let expired_entries = select_txs_cutoff(&tx_pool.stempool, embargo_secs); + + if expired_entries.is_empty() { + return Ok(()); } - if expired_entries.len() > 0 { - debug!("dand_mon: Found {} expired txs.", expired_entries.len()); + debug!("dand_mon: Found {} expired txs.", expired_entries.len()); - { - let mut tx_pool = tx_pool.write(); - let header = tx_pool.chain_head()?; + let header = tx_pool.chain_head()?; - for entry in expired_entries { - let src = TxSource { - debug_name: "embargo_expired".to_string(), - identifier: "?.?.?.?".to_string(), - }; - match tx_pool.add_to_pool(src, entry.tx, false, &header) { - Ok(_) => debug!("dand_mon: embargo expired, fluffed tx successfully."), - Err(e) => debug!("dand_mon: Failed to fluff expired tx - {:?}", e), - }; - } - } + let src = TxSource { + debug_name: "embargo_expired".to_string(), + identifier: "?.?.?.?".to_string(), + }; + + for entry in expired_entries { + let txhash = entry.tx.hash(); + match tx_pool.add_to_pool(src.clone(), entry.tx, false, &header) { + Ok(_) => info!( + "dand_mon: embargo expired for {}, fluffed successfully.", + txhash + ), + Err(e) => warn!("dand_mon: failed to fluff expired tx {}, {:?}", txhash, e), + }; } + Ok(()) } diff --git a/servers/src/grin/seed.rs b/servers/src/grin/seed.rs index dd6f2d134..68886586c 100644 --- a/servers/src/grin/seed.rs +++ b/servers/src/grin/seed.rs @@ -29,7 +29,6 @@ use crate::core::global; use crate::p2p; use crate::p2p::types::PeerAddr; use crate::p2p::ChainAdapter; -use crate::pool::DandelionConfig; use crate::util::{Mutex, StopState}; // DNS Seeds with contact email associated @@ -52,7 +51,6 @@ const FLOONET_DNS_SEEDS: &'static [&'static str] = &[ pub fn connect_and_monitor( p2p_server: Arc, capabilities: p2p::Capabilities, - dandelion_config: DandelionConfig, seed_list: Box Vec + Send>, preferred_peers: Option>, stop_state: Arc>, @@ -119,8 +117,6 @@ pub fn connect_and_monitor( preferred_peers.clone(), ); - update_dandelion_relay(peers.clone(), dandelion_config.clone()); - prev = Utc::now(); start_attempt = cmp::min(6, start_attempt + 1); } @@ -244,21 +240,6 @@ fn monitor_peers( } } -fn update_dandelion_relay(peers: Arc, dandelion_config: DandelionConfig) { - // Dandelion Relay Updater - let dandelion_relay = peers.get_dandelion_relay(); - if let Some((last_added, _)) = dandelion_relay { - let dandelion_interval = Utc::now().timestamp() - last_added; - if dandelion_interval >= dandelion_config.relay_secs.unwrap() as i64 { - debug!("monitor_peers: updating expired dandelion relay"); - peers.update_dandelion_relay(); - } - } else { - debug!("monitor_peers: no dandelion relay updating"); - peers.update_dandelion_relay(); - } -} - // Check if we have any pre-existing peer in db. If so, start with those, // otherwise use the seeds provided. fn connect_to_seeds_and_preferred_peers( diff --git a/servers/src/grin/server.rs b/servers/src/grin/server.rs index 5c6cb3a4a..167201c25 100644 --- a/servers/src/grin/server.rs +++ b/servers/src/grin/server.rs @@ -154,7 +154,7 @@ impl Server { let verifier_cache = Arc::new(RwLock::new(LruVerifierCache::new())); let pool_adapter = Arc::new(PoolToChainAdapter::new()); - let pool_net_adapter = Arc::new(PoolToNetAdapter::new()); + let pool_net_adapter = Arc::new(PoolToNetAdapter::new(config.dandelion_config.clone())); let tx_pool = Arc::new(RwLock::new(pool::TransactionPool::new( config.pool_config.clone(), pool_adapter.clone(), @@ -207,6 +207,8 @@ impl Server { genesis.hash(), stop_state.clone(), )?); + + // Initialize various adapters with our dynamic set of connected peers. chain_adapter.init(p2p_server.peers.clone()); pool_net_adapter.init(p2p_server.peers.clone()); net_adapter.init(p2p_server.peers.clone()); @@ -227,7 +229,6 @@ impl Server { seed::connect_and_monitor( p2p_server.clone(), config.p2p_config.capabilities, - config.dandelion_config.clone(), seeder, config.p2p_config.peers_preferred.clone(), stop_state.clone(), @@ -281,6 +282,7 @@ impl Server { dandelion_monitor::monitor_transactions( config.dandelion_config.clone(), tx_pool.clone(), + pool_net_adapter.clone(), verifier_cache.clone(), stop_state.clone(), );