diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index 12c390149..93de6dca2 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -34,10 +34,10 @@ use crate::types::{ use crate::util::secp::pedersen::RangeProof; use bytes::Bytes; use num::FromPrimitive; -use std::fmt; use std::fs::File; use std::io::{Read, Write}; use std::sync::Arc; +use std::{fmt, thread, time::Duration}; /// Grin's user agent with current version pub const USER_AGENT: &str = concat!("MW/Grin ", env!("CARGO_PKG_VERSION")); @@ -238,6 +238,17 @@ pub fn write_message( msg: &Msg, tracker: Arc, ) -> Result<(), Error> { + // Introduce a delay so messages are spaced at least 150ms apart. + // This gives a max msg rate of 60000/150 = 400 messages per minute. + // Exceeding 500 messages per minute will result in being banned as abusive. + if let Some(elapsed) = tracker.sent_bytes.read().elapsed_since_last_msg() { + let min_interval: u64 = 150; + let sleep_ms = min_interval.saturating_sub(elapsed); + if sleep_ms > 0 { + thread::sleep(Duration::from_millis(sleep_ms)) + } + } + let mut buf = ser::ser_vec(&msg.header, msg.version)?; buf.extend(&msg.body[..]); stream.write_all(&buf[..])?; diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 617a9172b..1b28baeba 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -205,27 +205,13 @@ impl Peer { /// Whether the peer is considered abusive, mostly for spammy nodes pub fn is_abusive(&self) -> bool { - let rec = self.tracker.received_bytes.read(); - let sent = self.tracker.sent_bytes.read(); - rec.count_per_min() > MAX_PEER_MSG_PER_MIN || sent.count_per_min() > MAX_PEER_MSG_PER_MIN + let rec = self.tracker().received_bytes.read(); + rec.count_per_min() > MAX_PEER_MSG_PER_MIN } - /// Number of bytes sent to the peer - pub fn last_min_sent_bytes(&self) -> Option { - let sent_bytes = self.tracker.sent_bytes.read(); - Some(sent_bytes.bytes_per_min()) - } - - /// Number of bytes received from the peer - pub fn last_min_received_bytes(&self) -> Option { - let received_bytes = self.tracker.received_bytes.read(); - Some(received_bytes.bytes_per_min()) - } - - pub fn last_min_message_counts(&self) -> Option<(u64, u64)> { - let received_bytes = self.tracker.received_bytes.read(); - let sent_bytes = self.tracker.sent_bytes.read(); - Some((sent_bytes.count_per_min(), received_bytes.count_per_min())) + /// Tracker tracks sent/received bytes and message counts per minute. + pub fn tracker(&self) -> &conn::Tracker { + &self.tracker } /// Set this peer status to banned diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index c1bbccfaa..e045baca0 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -324,12 +324,12 @@ impl Peers { debug!("clean_peers {:?}, not connected", peer.info.addr); rm.push(peer.info.addr.clone()); } else if peer.is_abusive() { - if let Some(counts) = peer.last_min_message_counts() { - debug!( - "clean_peers {:?}, abusive ({} sent, {} recv)", - peer.info.addr, counts.0, counts.1, - ); - } + let received = peer.tracker().received_bytes.read().count_per_min(); + let sent = peer.tracker().sent_bytes.read().count_per_min(); + debug!( + "clean_peers {:?}, abusive ({} sent, {} recv)", + peer.info.addr, sent, received, + ); let _ = self.update_state(peer.info.addr, State::Banned); rm.push(peer.info.addr.clone()); } else { diff --git a/servers/src/common/stats.rs b/servers/src/common/stats.rs index f986edce3..811a084c6 100644 --- a/servers/src/common/stats.rs +++ b/servers/src/common/stats.rs @@ -240,8 +240,8 @@ impl PeerStats { height: peer.info.height(), direction: direction.to_string(), last_seen: peer.info.last_seen(), - sent_bytes_per_sec: peer.last_min_sent_bytes().unwrap_or(0) / 60, - received_bytes_per_sec: peer.last_min_received_bytes().unwrap_or(0) / 60, + sent_bytes_per_sec: peer.tracker().sent_bytes.read().bytes_per_min() / 60, + received_bytes_per_sec: peer.tracker().received_bytes.read().bytes_per_min() / 60, capabilities: peer.info.capabilities, } } diff --git a/util/src/rate_counter.rs b/util/src/rate_counter.rs index aa84ced96..0a9c92e1e 100644 --- a/util/src/rate_counter.rs +++ b/util/src/rate_counter.rs @@ -99,6 +99,14 @@ impl RateCounter { .filter(|x| !x.is_quiet()) .count() as u64 } + + /// Elapsed time in ms since the last entry. + /// We use this to rate limit when sending. + pub fn elapsed_since_last_msg(&self) -> Option { + self.last_min_entries + .last() + .map(|x| millis_since_epoch().saturating_sub(x.timestamp)) + } } // turns out getting the millisecs since epoch in Rust isn't as easy as it