add rate limiting to outbound p2p msg sending (#3560)

treat peers as abusive based on incoming msgs, not outgoing msg rates
This commit is contained in:
Antioch Peverell 2021-02-15 13:49:31 +00:00 committed by GitHub
parent 7649d361e4
commit 4ab72902e0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 33 additions and 28 deletions

View file

@ -34,10 +34,10 @@ use crate::types::{
use crate::util::secp::pedersen::RangeProof; use crate::util::secp::pedersen::RangeProof;
use bytes::Bytes; use bytes::Bytes;
use num::FromPrimitive; use num::FromPrimitive;
use std::fmt;
use std::fs::File; use std::fs::File;
use std::io::{Read, Write}; use std::io::{Read, Write};
use std::sync::Arc; use std::sync::Arc;
use std::{fmt, thread, time::Duration};
/// Grin's user agent with current version /// Grin's user agent with current version
pub const USER_AGENT: &str = concat!("MW/Grin ", env!("CARGO_PKG_VERSION")); pub const USER_AGENT: &str = concat!("MW/Grin ", env!("CARGO_PKG_VERSION"));
@ -238,6 +238,17 @@ pub fn write_message<W: Write>(
msg: &Msg, msg: &Msg,
tracker: Arc<Tracker>, tracker: Arc<Tracker>,
) -> Result<(), Error> { ) -> Result<(), Error> {
// Introduce a delay so messages are spaced at least 150ms apart.
// This gives a max msg rate of 60000/150 = 400 messages per minute.
// Exceeding 500 messages per minute will result in being banned as abusive.
if let Some(elapsed) = tracker.sent_bytes.read().elapsed_since_last_msg() {
let min_interval: u64 = 150;
let sleep_ms = min_interval.saturating_sub(elapsed);
if sleep_ms > 0 {
thread::sleep(Duration::from_millis(sleep_ms))
}
}
let mut buf = ser::ser_vec(&msg.header, msg.version)?; let mut buf = ser::ser_vec(&msg.header, msg.version)?;
buf.extend(&msg.body[..]); buf.extend(&msg.body[..]);
stream.write_all(&buf[..])?; stream.write_all(&buf[..])?;

View file

@ -205,27 +205,13 @@ impl Peer {
/// Whether the peer is considered abusive, mostly for spammy nodes /// Whether the peer is considered abusive, mostly for spammy nodes
pub fn is_abusive(&self) -> bool { pub fn is_abusive(&self) -> bool {
let rec = self.tracker.received_bytes.read(); let rec = self.tracker().received_bytes.read();
let sent = self.tracker.sent_bytes.read(); rec.count_per_min() > MAX_PEER_MSG_PER_MIN
rec.count_per_min() > MAX_PEER_MSG_PER_MIN || sent.count_per_min() > MAX_PEER_MSG_PER_MIN
} }
/// Number of bytes sent to the peer /// Tracker tracks sent/received bytes and message counts per minute.
pub fn last_min_sent_bytes(&self) -> Option<u64> { pub fn tracker(&self) -> &conn::Tracker {
let sent_bytes = self.tracker.sent_bytes.read(); &self.tracker
Some(sent_bytes.bytes_per_min())
}
/// Number of bytes received from the peer
pub fn last_min_received_bytes(&self) -> Option<u64> {
let received_bytes = self.tracker.received_bytes.read();
Some(received_bytes.bytes_per_min())
}
pub fn last_min_message_counts(&self) -> Option<(u64, u64)> {
let received_bytes = self.tracker.received_bytes.read();
let sent_bytes = self.tracker.sent_bytes.read();
Some((sent_bytes.count_per_min(), received_bytes.count_per_min()))
} }
/// Set this peer status to banned /// Set this peer status to banned

View file

@ -324,12 +324,12 @@ impl Peers {
debug!("clean_peers {:?}, not connected", peer.info.addr); debug!("clean_peers {:?}, not connected", peer.info.addr);
rm.push(peer.info.addr.clone()); rm.push(peer.info.addr.clone());
} else if peer.is_abusive() { } else if peer.is_abusive() {
if let Some(counts) = peer.last_min_message_counts() { let received = peer.tracker().received_bytes.read().count_per_min();
debug!( let sent = peer.tracker().sent_bytes.read().count_per_min();
"clean_peers {:?}, abusive ({} sent, {} recv)", debug!(
peer.info.addr, counts.0, counts.1, "clean_peers {:?}, abusive ({} sent, {} recv)",
); peer.info.addr, sent, received,
} );
let _ = self.update_state(peer.info.addr, State::Banned); let _ = self.update_state(peer.info.addr, State::Banned);
rm.push(peer.info.addr.clone()); rm.push(peer.info.addr.clone());
} else { } else {

View file

@ -240,8 +240,8 @@ impl PeerStats {
height: peer.info.height(), height: peer.info.height(),
direction: direction.to_string(), direction: direction.to_string(),
last_seen: peer.info.last_seen(), last_seen: peer.info.last_seen(),
sent_bytes_per_sec: peer.last_min_sent_bytes().unwrap_or(0) / 60, sent_bytes_per_sec: peer.tracker().sent_bytes.read().bytes_per_min() / 60,
received_bytes_per_sec: peer.last_min_received_bytes().unwrap_or(0) / 60, received_bytes_per_sec: peer.tracker().received_bytes.read().bytes_per_min() / 60,
capabilities: peer.info.capabilities, capabilities: peer.info.capabilities,
} }
} }

View file

@ -99,6 +99,14 @@ impl RateCounter {
.filter(|x| !x.is_quiet()) .filter(|x| !x.is_quiet())
.count() as u64 .count() as u64
} }
/// Elapsed time in ms since the last entry.
/// We use this to rate limit when sending.
pub fn elapsed_since_last_msg(&self) -> Option<u64> {
self.last_min_entries
.last()
.map(|x| millis_since_epoch().saturating_sub(x.timestamp))
}
} }
// turns out getting the millisecs since epoch in Rust isn't as easy as it // turns out getting the millisecs since epoch in Rust isn't as easy as it