Dandelion++ Rewrite (#2628)

* reworked the dandelion rewrite (dandelion++)

* fallback to fluff/broadcast if we cannot stem the tx for any reason

* rework stem vs fluff logic during accepting tx

* cleanup docs

* add is_stem to logging

* cleanup

* rustfmt

* cleanup monitor and logging

* rework dandelion monitor to use simple cutoff for aggregation

* transition to next epoch *after* processing tx
so we fluff final outstanding txs

* fluff all txs in stempool if any are older than 30s
aggressively aggregate when we can

* fix rebase onto 1.1.0

* default config comments for Dandelion

* fix code to reflect our tests - fallback to txpool on stempool error

* log fluff and expire errors in dandelion monitor

* cleanup

* fix off by one

* cleanup

* cleanup

* various fixes

* one less clone

* cleanup
This commit is contained in:
Antioch Peverell 2019-03-20 13:08:56 +00:00 committed by GitHub
parent 16487a3eb7
commit a2adf2dfe8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 335 additions and 374 deletions

View file

@ -141,9 +141,17 @@ fn comments() -> HashMap<String, String> {
);
retval.insert(
"relay_secs".to_string(),
"epoch_secs".to_string(),
"
#dandelion relay time (choose new relay peer every n secs)
#dandelion epoch duration
"
.to_string(),
);
retval.insert(
"aggregation_secs".to_string(),
"
#dandelion aggregation period in secs
"
.to_string(),
);
@ -156,13 +164,6 @@ fn comments() -> HashMap<String, String> {
.to_string(),
);
retval.insert(
"patience_secs".to_string(),
"
#run dandelion stem/fluff processing every n secs (stem tx aggregation in this window)
"
.to_string(),
);
retval.insert(
"stem_probability".to_string(),
"

View file

@ -13,6 +13,7 @@
// limitations under the License.
use crate::util::{Mutex, RwLock};
use std::fmt;
use std::fs::File;
use std::net::{Shutdown, TcpStream};
use std::sync::Arc;
@ -54,6 +55,12 @@ pub struct Peer {
connection: Option<Mutex<conn::Tracker>>,
}
impl fmt::Debug for Peer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Peer({:?})", &self.info)
}
}
impl Peer {
// Only accept and connect can be externally used to build a peer
fn new(info: PeerInfo, adapter: Arc<dyn NetAdapter>) -> Peer {

View file

@ -37,7 +37,6 @@ pub struct Peers {
pub adapter: Arc<dyn ChainAdapter>,
store: PeerStore,
peers: RwLock<HashMap<PeerAddr, Arc<Peer>>>,
dandelion_relay: RwLock<Option<(i64, Arc<Peer>)>>,
config: P2PConfig,
}
@ -48,7 +47,6 @@ impl Peers {
store,
config,
peers: RwLock::new(HashMap::new()),
dandelion_relay: RwLock::new(None),
}
}
@ -87,39 +85,6 @@ impl Peers {
self.save_peer(&peer_data)
}
// Update the dandelion relay
pub fn update_dandelion_relay(&self) {
let peers = self.outgoing_connected_peers();
let peer = &self
.config
.dandelion_peer
.and_then(|ip| peers.iter().find(|x| x.info.addr == ip))
.or(thread_rng().choose(&peers));
match peer {
Some(peer) => self.set_dandelion_relay(peer),
None => debug!("Could not update dandelion relay"),
}
}
fn set_dandelion_relay(&self, peer: &Arc<Peer>) {
// Clear the map and add new relay
let dandelion_relay = &self.dandelion_relay;
dandelion_relay
.write()
.replace((Utc::now().timestamp(), peer.clone()));
debug!(
"Successfully updated Dandelion relay to: {}",
peer.info.addr
);
}
// Get the dandelion relay
pub fn get_dandelion_relay(&self) -> Option<(i64, Arc<Peer>)> {
self.dandelion_relay.read().clone()
}
pub fn is_known(&self, addr: PeerAddr) -> bool {
self.peers.read().contains_key(&addr)
}
@ -335,26 +300,6 @@ impl Peers {
);
}
/// Relays the provided stem transaction to our single stem peer.
pub fn relay_stem_transaction(&self, tx: &core::Transaction) -> Result<(), Error> {
self.get_dandelion_relay()
.or_else(|| {
debug!("No dandelion relay, updating.");
self.update_dandelion_relay();
self.get_dandelion_relay()
})
// If still return an error, let the caller handle this as they see fit.
// The caller will "fluff" at this point as the stem phase is finished.
.ok_or(Error::NoDandelionRelay)
.map(|(_, relay)| {
if relay.is_connected() {
if let Err(e) = relay.send_stem_transaction(tx) {
debug!("Error sending stem transaction to peer relay: {:?}", e);
}
}
})
}
/// Broadcasts the provided transaction to PEER_PREFERRED_COUNT of our
/// peers. We may be connected to PEER_MAX_COUNT peers so we only
/// want to broadcast to a random subset of peers.

View file

@ -34,7 +34,8 @@ mod pool;
pub mod transaction_pool;
pub mod types;
pub use crate::pool::Pool;
pub use crate::transaction_pool::TransactionPool;
pub use crate::types::{
BlockChain, DandelionConfig, PoolAdapter, PoolConfig, PoolEntryState, PoolError, TxSource,
BlockChain, DandelionConfig, PoolAdapter, PoolConfig, PoolEntry, PoolError, TxSource,
};

View file

@ -23,7 +23,7 @@ use self::core::core::{
Block, BlockHeader, BlockSums, Committed, Transaction, TxKernel, Weighting,
};
use self::util::RwLock;
use crate::types::{BlockChain, PoolEntry, PoolEntryState, PoolError};
use crate::types::{BlockChain, PoolEntry, PoolError};
use grin_core as core;
use grin_util as util;
use std::collections::{HashMap, HashSet};
@ -139,7 +139,7 @@ impl Pool {
// Verify these txs produce an aggregated tx below max tx weight.
// Return a vec of all the valid txs.
let txs = self.validate_raw_txs(
tx_buckets,
&tx_buckets,
None,
&header,
Weighting::AsLimitedTransaction { max_weight },
@ -167,33 +167,6 @@ impl Pool {
Ok(Some(tx))
}
pub fn select_valid_transactions(
&self,
txs: Vec<Transaction>,
extra_tx: Option<Transaction>,
header: &BlockHeader,
) -> Result<Vec<Transaction>, PoolError> {
let valid_txs = self.validate_raw_txs(txs, extra_tx, header, Weighting::NoLimit)?;
Ok(valid_txs)
}
pub fn get_transactions_in_state(&self, state: PoolEntryState) -> Vec<Transaction> {
self.entries
.iter()
.filter(|x| x.state == state)
.map(|x| x.tx.clone())
.collect::<Vec<_>>()
}
// Transition the specified pool entries to the new state.
pub fn transition_to_state(&mut self, txs: &[Transaction], state: PoolEntryState) {
for x in &mut self.entries {
if txs.contains(&x.tx) {
x.state = state;
}
}
}
// Aggregate this new tx with all existing txs in the pool.
// If we can validate the aggregated tx against the current chain state
// then we can safely add the tx to the pool.
@ -267,9 +240,9 @@ impl Pool {
Ok(new_sums)
}
fn validate_raw_txs(
pub fn validate_raw_txs(
&self,
txs: Vec<Transaction>,
txs: &[Transaction],
extra_tx: Option<Transaction>,
header: &BlockHeader,
weighting: Weighting,
@ -289,7 +262,7 @@ impl Pool {
// We know the tx is valid if the entire aggregate tx is valid.
if self.validate_raw_tx(&agg_tx, header, weighting).is_ok() {
valid_txs.push(tx);
valid_txs.push(tx.clone());
}
}

View file

@ -23,9 +23,7 @@ use self::core::core::verifier_cache::VerifierCache;
use self::core::core::{transaction, Block, BlockHeader, Transaction, Weighting};
use self::util::RwLock;
use crate::pool::Pool;
use crate::types::{
BlockChain, PoolAdapter, PoolConfig, PoolEntry, PoolEntryState, PoolError, TxSource,
};
use crate::types::{BlockChain, PoolAdapter, PoolConfig, PoolEntry, PoolError, TxSource};
use chrono::prelude::*;
use grin_core as core;
use grin_util as util;
@ -76,13 +74,10 @@ impl TransactionPool {
self.blockchain.chain_head()
}
// Add tx to stempool (passing in all txs from txpool to validate against).
fn add_to_stempool(&mut self, entry: PoolEntry, header: &BlockHeader) -> Result<(), PoolError> {
// Add tx to stempool (passing in all txs from txpool to validate against).
self.stempool
.add_to_pool(entry, self.txpool.all_transactions(), header)?;
// Note: we do not notify the adapter here,
// we let the dandelion monitor handle this.
Ok(())
}
@ -124,8 +119,6 @@ impl TransactionPool {
let txpool_tx = self.txpool.all_transactions_aggregate()?;
self.stempool.reconcile(txpool_tx, header)?;
}
self.adapter.tx_accepted(&entry.tx);
Ok(())
}
@ -159,28 +152,25 @@ impl TransactionPool {
self.blockchain.verify_coinbase_maturity(&tx)?;
let entry = PoolEntry {
state: PoolEntryState::Fresh,
src,
tx_at: Utc::now(),
tx,
};
// If we are in "stem" mode then check if this is a new tx or if we have seen it before.
// If new tx - add it to our stempool.
// If we have seen any of the kernels before then fallback to fluff,
// adding directly to txpool.
if stem
&& self
.stempool
.find_matching_transactions(entry.tx.kernels())
.is_empty()
// If not stem then we are fluff.
// If this is a stem tx then attempt to stem.
// Any problems during stem, fallback to fluff.
if !stem
|| self
.add_to_stempool(entry.clone(), header)
.and_then(|_| self.adapter.stem_tx_accepted(&entry.tx))
.is_err()
{
self.add_to_stempool(entry, header)?;
return Ok(());
self.add_to_txpool(entry.clone(), header)?;
self.add_to_reorg_cache(entry.clone());
self.adapter.tx_accepted(&entry.tx);
}
self.add_to_txpool(entry.clone(), header)?;
self.add_to_reorg_cache(entry);
Ok(())
}

View file

@ -27,62 +27,61 @@ use failure::Fail;
use grin_core as core;
use grin_keychain as keychain;
/// Dandelion relay timer
const DANDELION_RELAY_SECS: u64 = 600;
/// Dandelion "epoch" length.
const DANDELION_EPOCH_SECS: u16 = 600;
/// Dandelion embargo timer
const DANDELION_EMBARGO_SECS: u64 = 180;
/// Dandelion embargo timer.
const DANDELION_EMBARGO_SECS: u16 = 180;
/// Dandelion patience timer
const DANDELION_PATIENCE_SECS: u64 = 10;
/// Dandelion aggregation timer.
const DANDELION_AGGREGATION_SECS: u16 = 30;
/// Dandelion stem probability (stem 90% of the time, fluff 10%).
const DANDELION_STEM_PROBABILITY: usize = 90;
const DANDELION_STEM_PROBABILITY: u8 = 90;
/// Configuration for "Dandelion".
/// Note: shared between p2p and pool.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct DandelionConfig {
/// Choose new Dandelion relay peer every n secs.
#[serde = "default_dandelion_relay_secs"]
pub relay_secs: Option<u64>,
/// Dandelion embargo, fluff and broadcast tx if not seen on network before
/// embargo expires.
#[serde = "default_dandelion_embargo_secs"]
pub embargo_secs: Option<u64>,
/// Dandelion patience timer, fluff/stem processing runs every n secs.
/// Tx aggregation happens on stem txs received within this window.
#[serde = "default_dandelion_patience_secs"]
pub patience_secs: Option<u64>,
/// Length of each "epoch".
#[serde(default = "default_dandelion_epoch_secs")]
pub epoch_secs: Option<u16>,
/// Dandelion embargo timer. Fluff and broadcast individual txs if not seen
/// on network before embargo expires.
#[serde(default = "default_dandelion_embargo_secs")]
pub embargo_secs: Option<u16>,
/// Dandelion aggregation timer.
#[serde(default = "default_dandelion_aggregation_secs")]
pub aggregation_secs: Option<u16>,
/// Dandelion stem probability (stem 90% of the time, fluff 10% etc.)
#[serde = "default_dandelion_stem_probability"]
pub stem_probability: Option<usize>,
#[serde(default = "default_dandelion_stem_probability")]
pub stem_probability: Option<u8>,
}
impl Default for DandelionConfig {
fn default() -> DandelionConfig {
DandelionConfig {
relay_secs: default_dandelion_relay_secs(),
epoch_secs: default_dandelion_epoch_secs(),
embargo_secs: default_dandelion_embargo_secs(),
patience_secs: default_dandelion_patience_secs(),
aggregation_secs: default_dandelion_aggregation_secs(),
stem_probability: default_dandelion_stem_probability(),
}
}
}
fn default_dandelion_relay_secs() -> Option<u64> {
Some(DANDELION_RELAY_SECS)
fn default_dandelion_epoch_secs() -> Option<u16> {
Some(DANDELION_EPOCH_SECS)
}
fn default_dandelion_embargo_secs() -> Option<u64> {
fn default_dandelion_embargo_secs() -> Option<u16> {
Some(DANDELION_EMBARGO_SECS)
}
fn default_dandelion_patience_secs() -> Option<u64> {
Some(DANDELION_PATIENCE_SECS)
fn default_dandelion_aggregation_secs() -> Option<u16> {
Some(DANDELION_AGGREGATION_SECS)
}
fn default_dandelion_stem_probability() -> Option<usize> {
fn default_dandelion_stem_probability() -> Option<u8> {
Some(DANDELION_STEM_PROBABILITY)
}
@ -138,8 +137,6 @@ fn default_mineable_max_weight() -> usize {
/// A single (possibly aggregated) transaction.
#[derive(Clone, Debug)]
pub struct PoolEntry {
/// The state of the pool entry.
pub state: PoolEntryState,
/// Info on where this tx originated from.
pub src: TxSource,
/// Timestamp of when this tx was originally added to the pool.
@ -148,21 +145,6 @@ pub struct PoolEntry {
pub tx: Transaction,
}
/// The possible states a pool entry can be in.
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum PoolEntryState {
/// A new entry, not yet processed.
Fresh,
/// Tx to be included in the next "stem" run.
ToStem,
/// Tx previously "stemmed" and propagated.
Stemmed,
/// Tx to be included in the next "fluff" run.
ToFluff,
/// Tx previously "fluffed" and broadcast.
Fluffed,
}
/// Placeholder: the data representing where we heard about a tx from.
///
/// Used to make decisions based on transaction acceptance priority from
@ -267,12 +249,10 @@ pub trait BlockChain: Sync + Send {
/// downstream processing of valid transactions by the rest of the system, most
/// importantly the broadcasting of transactions to our peers.
pub trait PoolAdapter: Send + Sync {
/// The transaction pool has accepted this transactions as valid and added
/// it to its internal cache.
/// The transaction pool has accepted this transaction as valid.
fn tx_accepted(&self, tx: &transaction::Transaction);
/// The stem transaction pool has accepted this transactions as valid and
/// added it to its internal cache, we have waited for the "patience" timer
/// to fire and we now want to propagate the tx to the next Dandelion relay.
/// The stem transaction pool has accepted this transactions as valid.
fn stem_tx_accepted(&self, tx: &transaction::Transaction) -> Result<(), PoolError>;
}
@ -281,9 +261,8 @@ pub trait PoolAdapter: Send + Sync {
pub struct NoopAdapter {}
impl PoolAdapter for NoopAdapter {
fn tx_accepted(&self, _: &transaction::Transaction) {}
fn stem_tx_accepted(&self, _: &transaction::Transaction) -> Result<(), PoolError> {
fn tx_accepted(&self, _tx: &transaction::Transaction) {}
fn stem_tx_accepted(&self, _tx: &transaction::Transaction) -> Result<(), PoolError> {
Ok(())
}
}

View file

@ -23,7 +23,9 @@ use std::time::Instant;
use crate::chain::{self, BlockStatus, ChainAdapter, Options};
use crate::common::hooks::{ChainEvents, NetEvents};
use crate::common::types::{self, ChainValidationMode, ServerConfig, SyncState, SyncStatus};
use crate::common::types::{
self, ChainValidationMode, DandelionEpoch, ServerConfig, SyncState, SyncStatus,
};
use crate::core::core::hash::{Hash, Hashed};
use crate::core::core::transaction::Transaction;
use crate::core::core::verifier_cache::VerifierCache;
@ -33,6 +35,7 @@ use crate::core::{core, global};
use crate::p2p;
use crate::p2p::types::PeerAddr;
use crate::pool;
use crate::pool::types::DandelionConfig;
use crate::util::OneTime;
use chrono::prelude::*;
use chrono::Duration;
@ -685,26 +688,77 @@ impl ChainToPoolAndNetAdapter {
/// transactions that have been accepted.
pub struct PoolToNetAdapter {
peers: OneTime<Weak<p2p::Peers>>,
dandelion_epoch: Arc<RwLock<DandelionEpoch>>,
}
/// Adapter between the Dandelion monitor and the current Dandelion "epoch".
pub trait DandelionAdapter: Send + Sync {
/// Is the node stemming (or fluffing) transactions in the current epoch?
fn is_stem(&self) -> bool;
/// Is the current Dandelion epoch expired?
fn is_expired(&self) -> bool;
/// Transition to the next Dandelion epoch (new stem/fluff state, select new relay peer).
fn next_epoch(&self);
}
impl DandelionAdapter for PoolToNetAdapter {
fn is_stem(&self) -> bool {
self.dandelion_epoch.read().is_stem()
}
fn is_expired(&self) -> bool {
self.dandelion_epoch.read().is_expired()
}
fn next_epoch(&self) {
self.dandelion_epoch.write().next_epoch(&self.peers());
}
}
impl pool::PoolAdapter for PoolToNetAdapter {
fn stem_tx_accepted(&self, tx: &core::Transaction) -> Result<(), pool::PoolError> {
self.peers()
.relay_stem_transaction(tx)
.map_err(|_| pool::PoolError::DandelionError)?;
Ok(())
}
fn tx_accepted(&self, tx: &core::Transaction) {
self.peers().broadcast_transaction(tx);
}
fn stem_tx_accepted(&self, tx: &core::Transaction) -> Result<(), pool::PoolError> {
// Take write lock on the current epoch.
// We need to be able to update the current relay peer if not currently connected.
let mut epoch = self.dandelion_epoch.write();
// If "stem" epoch attempt to relay the tx to the next Dandelion relay.
// Fallback to immediately fluffing the tx if we cannot stem for any reason.
// If "fluff" epoch then nothing to do right now (fluff via Dandelion monitor).
if epoch.is_stem() {
if let Some(peer) = epoch.relay_peer(&self.peers()) {
match peer.send_stem_transaction(tx) {
Ok(_) => {
info!("Stemming this epoch, relaying to next peer.");
Ok(())
}
Err(e) => {
error!("Stemming tx failed. Fluffing. {:?}", e);
Err(pool::PoolError::DandelionError)
}
}
} else {
error!("No relay peer. Fluffing.");
Err(pool::PoolError::DandelionError)
}
} else {
info!("Fluff epoch. Aggregating stem tx(s). Will fluff via Dandelion monitor.");
Ok(())
}
}
}
impl PoolToNetAdapter {
/// Create a new pool to net adapter
pub fn new() -> PoolToNetAdapter {
pub fn new(config: DandelionConfig) -> PoolToNetAdapter {
PoolToNetAdapter {
peers: OneTime::new(),
dandelion_epoch: Arc::new(RwLock::new(DandelionEpoch::new(config))),
}
}

View file

@ -13,18 +13,21 @@
// limitations under the License.
//! Server types
use crate::util::RwLock;
use std::convert::From;
use std::sync::Arc;
use chrono::prelude::{DateTime, Utc};
use rand::prelude::*;
use crate::api;
use crate::chain;
use crate::core::global::ChainTypes;
use crate::core::{core, pow};
use crate::p2p;
use crate::pool;
use crate::pool::types::DandelionConfig;
use crate::store;
use chrono::prelude::{DateTime, Utc};
use crate::util::RwLock;
/// Error type wrapping underlying module errors.
#[derive(Debug)]
@ -437,3 +440,94 @@ impl chain::TxHashsetWriteStatus for SyncState {
self.update(SyncStatus::TxHashsetDone);
}
}
/// A node is either "stem" of "fluff" for the duration of a single epoch.
/// A node also maintains an outbound relay peer for the epoch.
#[derive(Debug)]
pub struct DandelionEpoch {
config: DandelionConfig,
// When did this epoch start?
start_time: Option<i64>,
// Are we in "stem" mode or "fluff" mode for this epoch?
is_stem: bool,
// Our current Dandelion relay peer (effective for this epoch).
relay_peer: Option<Arc<p2p::Peer>>,
}
impl DandelionEpoch {
/// Create a new Dandelion epoch, defaulting to "stem" and no outbound relay peer.
pub fn new(config: DandelionConfig) -> DandelionEpoch {
DandelionEpoch {
config,
start_time: None,
is_stem: true,
relay_peer: None,
}
}
/// Is the current Dandelion epoch expired?
/// It is expired if start_time is older than the configured epoch_secs.
pub fn is_expired(&self) -> bool {
match self.start_time {
None => true,
Some(start_time) => {
let epoch_secs = self.config.epoch_secs.expect("epoch_secs config missing") as i64;
Utc::now().timestamp().saturating_sub(start_time) > epoch_secs
}
}
}
/// Transition to next Dandelion epoch.
/// Select stem/fluff based on configured stem_probability.
/// Choose a new outbound stem relay peer.
pub fn next_epoch(&mut self, peers: &Arc<p2p::Peers>) {
self.start_time = Some(Utc::now().timestamp());
self.relay_peer = peers.outgoing_connected_peers().first().cloned();
// If stem_probability == 90 then we stem 90% of the time.
let mut rng = rand::thread_rng();
let stem_probability = self
.config
.stem_probability
.expect("stem_probability config missing");
self.is_stem = rng.gen_range(0, 100) < stem_probability;
let addr = self.relay_peer.clone().map(|p| p.info.addr);
info!(
"DandelionEpoch: next_epoch: is_stem: {} ({}%), relay: {:?}",
self.is_stem, stem_probability, addr
);
}
/// Are we stemming (or fluffing) transactions in this epoch?
pub fn is_stem(&self) -> bool {
self.is_stem
}
/// What is our current relay peer?
/// If it is not connected then choose a new one.
pub fn relay_peer(&mut self, peers: &Arc<p2p::Peers>) -> Option<Arc<p2p::Peer>> {
let mut update_relay = false;
if let Some(peer) = &self.relay_peer {
if !peer.is_connected() {
info!(
"DandelionEpoch: relay_peer: {:?} not connected, choosing a new one.",
peer.info.addr
);
update_relay = true;
}
} else {
update_relay = true;
}
if update_relay {
self.relay_peer = peers.outgoing_connected_peers().first().cloned();
info!(
"DandelionEpoch: relay_peer: new peer chosen: {:?}",
self.relay_peer.clone().map(|p| p.info.addr)
);
}
self.relay_peer.clone()
}
}

View file

@ -12,17 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::util::{Mutex, RwLock, StopState};
use chrono::prelude::Utc;
use rand::{thread_rng, Rng};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use crate::common::adapters::DandelionAdapter;
use crate::core::core::hash::Hashed;
use crate::core::core::transaction;
use crate::core::core::verifier_cache::VerifierCache;
use crate::pool::{DandelionConfig, PoolEntryState, PoolError, TransactionPool, TxSource};
use crate::pool::{DandelionConfig, Pool, PoolEntry, PoolError, TransactionPool, TxSource};
use crate::util::{Mutex, RwLock, StopState};
/// A process to monitor transactions in the stempool.
/// With Dandelion, transaction can be broadcasted in stem or fluff phase.
@ -35,6 +36,7 @@ use crate::pool::{DandelionConfig, PoolEntryState, PoolError, TransactionPool, T
pub fn monitor_transactions(
dandelion_config: DandelionConfig,
tx_pool: Arc<RwLock<TransactionPool>>,
adapter: Arc<DandelionAdapter>,
verifier_cache: Arc<RwLock<dyn VerifierCache>>,
stop_state: Arc<Mutex<StopState>>,
) {
@ -44,211 +46,143 @@ pub fn monitor_transactions(
.name("dandelion".to_string())
.spawn(move || {
loop {
// Halt Dandelion monitor if we have been notified that we are stopping.
if stop_state.lock().is_stopped() {
break;
}
// This is the patience timer, we loop every n secs.
let patience_secs = dandelion_config.patience_secs.unwrap();
thread::sleep(Duration::from_secs(patience_secs));
// Step 1: find all "ToStem" entries in stempool from last run.
// Aggregate them up to give a single (valid) aggregated tx and propagate it
// to the next Dandelion relay along the stem.
if process_stem_phase(tx_pool.clone(), verifier_cache.clone()).is_err() {
error!("dand_mon: Problem with stem phase.");
if !adapter.is_stem() {
let _ =
process_fluff_phase(&dandelion_config, &tx_pool, &adapter, &verifier_cache)
.map_err(|e| {
error!("dand_mon: Problem processing fluff phase. {:?}", e);
});
}
// Step 2: find all "ToFluff" entries in stempool from last run.
// Aggregate them up to give a single (valid) aggregated tx and (re)add it
// to our pool with stem=false (which will then broadcast it).
if process_fluff_phase(tx_pool.clone(), verifier_cache.clone()).is_err() {
error!("dand_mon: Problem with fluff phase.");
// Now find all expired entries based on embargo timer.
let _ = process_expired_entries(&dandelion_config, &tx_pool).map_err(|e| {
error!("dand_mon: Problem processing expired entries. {:?}", e);
});
// Handle the tx above *before* we transition to next epoch.
// This gives us an opportunity to do the final "fluff" before we start
// stemming on the subsequent epoch.
if adapter.is_expired() {
adapter.next_epoch();
}
// Step 3: now find all "Fresh" entries in stempool since last run.
// Coin flip for each (90/10) and label them as either "ToStem" or "ToFluff".
// We will process these in the next run (waiting patience secs).
if process_fresh_entries(dandelion_config.clone(), tx_pool.clone()).is_err() {
error!("dand_mon: Problem processing fresh pool entries.");
}
// Step 4: now find all expired entries based on embargo timer.
if process_expired_entries(dandelion_config.clone(), tx_pool.clone()).is_err() {
error!("dand_mon: Problem processing fresh pool entries.");
}
// Monitor loops every 10s.
thread::sleep(Duration::from_secs(10));
}
});
}
fn process_stem_phase(
tx_pool: Arc<RwLock<TransactionPool>>,
verifier_cache: Arc<RwLock<dyn VerifierCache>>,
) -> Result<(), PoolError> {
let mut tx_pool = tx_pool.write();
let header = tx_pool.chain_head()?;
let stem_txs = tx_pool
.stempool
.get_transactions_in_state(PoolEntryState::ToStem);
if stem_txs.is_empty() {
return Ok(());
}
// Get the aggregate tx representing the entire txpool.
let txpool_tx = tx_pool.txpool.all_transactions_aggregate()?;
let stem_txs = tx_pool
.stempool
.select_valid_transactions(stem_txs, txpool_tx, &header)?;
tx_pool
.stempool
.transition_to_state(&stem_txs, PoolEntryState::Stemmed);
if stem_txs.len() > 0 {
debug!("dand_mon: Found {} txs for stemming.", stem_txs.len());
let agg_tx = transaction::aggregate(stem_txs)?;
agg_tx.validate(
transaction::Weighting::AsTransaction,
verifier_cache.clone(),
)?;
let res = tx_pool.adapter.stem_tx_accepted(&agg_tx);
if res.is_err() {
debug!("dand_mon: Unable to propagate stem tx. No relay, fluffing instead.");
let src = TxSource {
debug_name: "no_relay".to_string(),
identifier: "?.?.?.?".to_string(),
};
tx_pool.add_to_pool(src, agg_tx, false, &header)?;
}
}
Ok(())
// Query the pool for transactions older than the cutoff.
// Used for both periodic fluffing and handling expired embargo timer.
fn select_txs_cutoff(pool: &Pool, cutoff_secs: u16) -> Vec<PoolEntry> {
let cutoff = Utc::now().timestamp() - cutoff_secs as i64;
pool.entries
.iter()
.filter(|x| x.tx_at.timestamp() < cutoff)
.cloned()
.collect()
}
fn process_fluff_phase(
tx_pool: Arc<RwLock<TransactionPool>>,
verifier_cache: Arc<RwLock<dyn VerifierCache>>,
dandelion_config: &DandelionConfig,
tx_pool: &Arc<RwLock<TransactionPool>>,
adapter: &Arc<DandelionAdapter>,
verifier_cache: &Arc<RwLock<dyn VerifierCache>>,
) -> Result<(), PoolError> {
// Take a write lock on the txpool for the duration of this processing.
let mut tx_pool = tx_pool.write();
let header = tx_pool.chain_head()?;
let stem_txs = tx_pool
.stempool
.get_transactions_in_state(PoolEntryState::ToFluff);
if stem_txs.is_empty() {
let all_entries = tx_pool.stempool.entries.clone();
if all_entries.is_empty() {
return Ok(());
}
// Get the aggregate tx representing the entire txpool.
let txpool_tx = tx_pool.txpool.all_transactions_aggregate()?;
let cutoff_secs = dandelion_config
.aggregation_secs
.expect("aggregation secs config missing");
let cutoff_entries = select_txs_cutoff(&tx_pool.stempool, cutoff_secs);
let stem_txs = tx_pool
.stempool
.select_valid_transactions(stem_txs, txpool_tx, &header)?;
tx_pool
.stempool
.transition_to_state(&stem_txs, PoolEntryState::Fluffed);
if stem_txs.len() > 0 {
debug!("dand_mon: Found {} txs for fluffing.", stem_txs.len());
let agg_tx = transaction::aggregate(stem_txs)?;
agg_tx.validate(
transaction::Weighting::AsTransaction,
verifier_cache.clone(),
)?;
let src = TxSource {
debug_name: "fluff".to_string(),
identifier: "?.?.?.?".to_string(),
};
tx_pool.add_to_pool(src, agg_tx, false, &header)?;
// If epoch is expired, fluff *all* outstanding entries in stempool.
// If *any* entry older than aggregation_secs (30s) then fluff *all* entries.
// Otherwise we are done for now and we can give txs more time to aggregate.
if !adapter.is_expired() && cutoff_entries.is_empty() {
return Ok(());
}
Ok(())
}
fn process_fresh_entries(
dandelion_config: DandelionConfig,
tx_pool: Arc<RwLock<TransactionPool>>,
) -> Result<(), PoolError> {
let mut tx_pool = tx_pool.write();
let header = tx_pool.chain_head()?;
let mut rng = thread_rng();
let fluffable_txs = {
let txpool_tx = tx_pool.txpool.all_transactions_aggregate()?;
let txs: Vec<_> = all_entries.into_iter().map(|x| x.tx).collect();
tx_pool.stempool.validate_raw_txs(
&txs,
txpool_tx,
&header,
transaction::Weighting::NoLimit,
)?
};
let fresh_entries = &mut tx_pool
.stempool
.entries
.iter_mut()
.filter(|x| x.state == PoolEntryState::Fresh)
.collect::<Vec<_>>();
debug!(
"dand_mon: Found {} txs in local stempool to fluff",
fluffable_txs.len()
);
if fresh_entries.len() > 0 {
debug!(
"dand_mon: Found {} fresh entries in stempool.",
fresh_entries.len()
);
let agg_tx = transaction::aggregate(fluffable_txs)?;
agg_tx.validate(
transaction::Weighting::AsTransaction,
verifier_cache.clone(),
)?;
for x in &mut fresh_entries.iter_mut() {
let random = rng.gen_range(0, 101);
if random <= dandelion_config.stem_probability.unwrap() {
x.state = PoolEntryState::ToStem;
} else {
x.state = PoolEntryState::ToFluff;
}
}
}
let src = TxSource {
debug_name: "fluff".to_string(),
identifier: "?.?.?.?".to_string(),
};
tx_pool.add_to_pool(src, agg_tx, false, &header)?;
Ok(())
}
fn process_expired_entries(
dandelion_config: DandelionConfig,
tx_pool: Arc<RwLock<TransactionPool>>,
dandelion_config: &DandelionConfig,
tx_pool: &Arc<RwLock<TransactionPool>>,
) -> Result<(), PoolError> {
let now = Utc::now().timestamp();
let embargo_sec = dandelion_config.embargo_secs.unwrap() + thread_rng().gen_range(0, 31);
let cutoff = now - embargo_sec as i64;
// Take a write lock on the txpool for the duration of this processing.
let mut tx_pool = tx_pool.write();
let mut expired_entries = vec![];
{
let tx_pool = tx_pool.read();
for entry in tx_pool
.stempool
.entries
.iter()
.filter(|x| x.tx_at.timestamp() < cutoff)
{
debug!("dand_mon: Embargo timer expired for {:?}", entry.tx.hash());
expired_entries.push(entry.clone());
}
let embargo_secs = dandelion_config
.embargo_secs
.expect("embargo_secs config missing")
+ thread_rng().gen_range(0, 31);
let expired_entries = select_txs_cutoff(&tx_pool.stempool, embargo_secs);
if expired_entries.is_empty() {
return Ok(());
}
if expired_entries.len() > 0 {
debug!("dand_mon: Found {} expired txs.", expired_entries.len());
debug!("dand_mon: Found {} expired txs.", expired_entries.len());
{
let mut tx_pool = tx_pool.write();
let header = tx_pool.chain_head()?;
let header = tx_pool.chain_head()?;
for entry in expired_entries {
let src = TxSource {
debug_name: "embargo_expired".to_string(),
identifier: "?.?.?.?".to_string(),
};
match tx_pool.add_to_pool(src, entry.tx, false, &header) {
Ok(_) => debug!("dand_mon: embargo expired, fluffed tx successfully."),
Err(e) => debug!("dand_mon: Failed to fluff expired tx - {:?}", e),
};
}
}
let src = TxSource {
debug_name: "embargo_expired".to_string(),
identifier: "?.?.?.?".to_string(),
};
for entry in expired_entries {
let txhash = entry.tx.hash();
match tx_pool.add_to_pool(src.clone(), entry.tx, false, &header) {
Ok(_) => info!(
"dand_mon: embargo expired for {}, fluffed successfully.",
txhash
),
Err(e) => warn!("dand_mon: failed to fluff expired tx {}, {:?}", txhash, e),
};
}
Ok(())
}

View file

@ -29,7 +29,6 @@ use crate::core::global;
use crate::p2p;
use crate::p2p::types::PeerAddr;
use crate::p2p::ChainAdapter;
use crate::pool::DandelionConfig;
use crate::util::{Mutex, StopState};
// DNS Seeds with contact email associated
@ -52,7 +51,6 @@ const FLOONET_DNS_SEEDS: &'static [&'static str] = &[
pub fn connect_and_monitor(
p2p_server: Arc<p2p::Server>,
capabilities: p2p::Capabilities,
dandelion_config: DandelionConfig,
seed_list: Box<dyn Fn() -> Vec<PeerAddr> + Send>,
preferred_peers: Option<Vec<PeerAddr>>,
stop_state: Arc<Mutex<StopState>>,
@ -119,8 +117,6 @@ pub fn connect_and_monitor(
preferred_peers.clone(),
);
update_dandelion_relay(peers.clone(), dandelion_config.clone());
prev = Utc::now();
start_attempt = cmp::min(6, start_attempt + 1);
}
@ -244,21 +240,6 @@ fn monitor_peers(
}
}
fn update_dandelion_relay(peers: Arc<p2p::Peers>, dandelion_config: DandelionConfig) {
// Dandelion Relay Updater
let dandelion_relay = peers.get_dandelion_relay();
if let Some((last_added, _)) = dandelion_relay {
let dandelion_interval = Utc::now().timestamp() - last_added;
if dandelion_interval >= dandelion_config.relay_secs.unwrap() as i64 {
debug!("monitor_peers: updating expired dandelion relay");
peers.update_dandelion_relay();
}
} else {
debug!("monitor_peers: no dandelion relay updating");
peers.update_dandelion_relay();
}
}
// Check if we have any pre-existing peer in db. If so, start with those,
// otherwise use the seeds provided.
fn connect_to_seeds_and_preferred_peers(

View file

@ -154,7 +154,7 @@ impl Server {
let verifier_cache = Arc::new(RwLock::new(LruVerifierCache::new()));
let pool_adapter = Arc::new(PoolToChainAdapter::new());
let pool_net_adapter = Arc::new(PoolToNetAdapter::new());
let pool_net_adapter = Arc::new(PoolToNetAdapter::new(config.dandelion_config.clone()));
let tx_pool = Arc::new(RwLock::new(pool::TransactionPool::new(
config.pool_config.clone(),
pool_adapter.clone(),
@ -207,6 +207,8 @@ impl Server {
genesis.hash(),
stop_state.clone(),
)?);
// Initialize various adapters with our dynamic set of connected peers.
chain_adapter.init(p2p_server.peers.clone());
pool_net_adapter.init(p2p_server.peers.clone());
net_adapter.init(p2p_server.peers.clone());
@ -227,7 +229,6 @@ impl Server {
seed::connect_and_monitor(
p2p_server.clone(),
config.p2p_config.capabilities,
config.dandelion_config.clone(),
seeder,
config.p2p_config.peers_preferred.clone(),
stop_state.clone(),
@ -281,6 +282,7 @@ impl Server {
dandelion_monitor::monitor_transactions(
config.dandelion_config.clone(),
tx_pool.clone(),
pool_net_adapter.clone(),
verifier_cache.clone(),
stop_state.clone(),
);