Peer rate limiting (#1933)

* Rate counter in peer conn to monitor traffic 
* Ban peers that trigger is_abusive
This commit is contained in:
Ignotus Peverell 2018-11-06 17:51:22 -08:00 committed by GitHub
parent d2088ff48c
commit 8b546632fe
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 141 additions and 80 deletions

View file

@ -1,4 +1,4 @@
// Copyright 2018-2018 The Grin Developers
// Copyright 2018 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -26,11 +26,11 @@ use std::mem::size_of;
use std::net::TcpStream;
use std::sync::{mpsc, Arc};
use std::{cmp, thread, time};
use util::RwLock;
use core::ser;
use msg::{read_body, read_exact, read_header, write_all, write_to_buf, MsgHeader, Type};
use types::Error;
use util::{RateCounter, RwLock};
/// A trait to be implemented in order to receive messages from the
/// connection. Allows providing an optional response.
@ -146,9 +146,9 @@ pub const SEND_CHANNEL_CAP: usize = 10;
pub struct Tracker {
/// Bytes we've sent.
pub sent_bytes: Arc<RwLock<u64>>,
pub sent_bytes: Arc<RwLock<RateCounter>>,
/// Bytes we've received.
pub received_bytes: Arc<RwLock<u64>>,
pub received_bytes: Arc<RwLock<RateCounter>>,
/// Channel to allow sending data through the connection
pub send_channel: mpsc::SyncSender<Vec<u8>>,
/// Channel to close the connection
@ -168,7 +168,7 @@ impl Tracker {
// Increase sent bytes counter
let mut sent_bytes = self.sent_bytes.write();
*sent_bytes += buf_len as u64;
sent_bytes.inc(buf_len as u64);
Ok(())
}
@ -186,7 +186,7 @@ where
let (error_tx, error_rx) = mpsc::channel();
// Counter of number of bytes received
let received_bytes = Arc::new(RwLock::new(0));
let received_bytes = Arc::new(RwLock::new(RateCounter::new()));
stream
.set_nonblocking(true)
@ -201,7 +201,7 @@ where
);
Tracker {
sent_bytes: Arc::new(RwLock::new(0)),
sent_bytes: Arc::new(RwLock::new(RateCounter::new())),
received_bytes: received_bytes.clone(),
send_channel: send_tx,
close_channel: close_tx,
@ -215,7 +215,7 @@ fn poll<H>(
send_rx: mpsc::Receiver<Vec<u8>>,
error_tx: mpsc::Sender<Error>,
close_rx: mpsc::Receiver<()>,
received_bytes: Arc<RwLock<u64>>,
received_bytes: Arc<RwLock<RateCounter>>,
) where
H: MessageHandler,
{
@ -240,8 +240,7 @@ fn poll<H>(
// Increase received bytes counter
{
let mut received_bytes = received_bytes.write();
let header_size = size_of::<MsgHeader>() as u64;
*received_bytes += header_size + msg.header.msg_len;
received_bytes.inc(size_of::<MsgHeader>() as u64 + msg.header.msg_len);
}
if let Some(Some(resp)) = try_break!(error_tx, handler.consume(msg)) {
@ -249,25 +248,15 @@ fn poll<H>(
}
}
// check the write end
if let Ok::<Vec<u8>, ()>(data) = retry_send {
if let None =
try_break!(error_tx, conn.write_all(&data[..]).map_err(&From::from))
{
// check the write end, use or_else so try_recv is lazily eval'd
let maybe_data = retry_send.or_else(|_| send_rx.try_recv());
retry_send = Err(());
if let Ok(data) = maybe_data {
let written =
try_break!(error_tx, conn.write_all(&data[..]).map_err(&From::from));
if written.is_none() {
retry_send = Ok(data);
} else {
retry_send = Err(());
}
} else if let Ok(data) = send_rx.try_recv() {
if let None =
try_break!(error_tx, conn.write_all(&data[..]).map_err(&From::from))
{
retry_send = Ok(data);
} else {
retry_send = Err(());
}
} else {
retry_send = Err(());
}
// check the close channel

View file

@ -30,6 +30,7 @@ use types::{
};
const MAX_TRACK_SIZE: usize = 30;
const MAX_PEER_MSG_PER_MIN: u64 = 300;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
/// Remind: don't mix up this 'State' with that 'State' in p2p/src/store.rs,
@ -149,22 +150,35 @@ impl Peer {
}
}
/// Whether the peer is considered abusive, mostly for spammy nodes
pub fn is_abusive(&self) -> bool {
if let Some(ref conn) = self.connection {
let conn = conn.lock();
let rec = conn.received_bytes.read();
let sent = conn.sent_bytes.read();
rec.count_per_min() > MAX_PEER_MSG_PER_MIN
|| sent.count_per_min() > MAX_PEER_MSG_PER_MIN
} else {
false
}
}
/// Number of bytes sent to the peer
pub fn sent_bytes(&self) -> Option<u64> {
pub fn last_min_sent_bytes(&self) -> Option<u64> {
if let Some(ref tracker) = self.connection {
let conn = tracker.lock();
let sent_bytes = conn.sent_bytes.read();
return Some(*sent_bytes);
return Some(sent_bytes.bytes_per_min());
}
None
}
/// Number of bytes received from the peer
pub fn received_bytes(&self) -> Option<u64> {
pub fn last_min_received_bytes(&self) -> Option<u64> {
if let Some(ref tracker) = self.connection {
let conn = tracker.lock();
let received_bytes = conn.received_bytes.read();
return Some(*received_bytes);
return Some(received_bytes.bytes_per_min());
}
None
}

View file

@ -385,52 +385,45 @@ impl Peers {
for peer in self.peers.read().values() {
if peer.is_banned() {
debug!("clean_peers {:?}, peer banned", peer.info.addr);
rm.push(peer.clone());
rm.push(peer.info.addr.clone());
} else if !peer.is_connected() {
debug!("clean_peers {:?}, not connected", peer.info.addr);
rm.push(peer.clone());
rm.push(peer.info.addr.clone());
} else if peer.is_abusive() {
debug!("clean_peers {:?}, abusive", peer.info.addr);
let _ = self.update_state(peer.info.addr, State::Banned);
rm.push(peer.info.addr.clone());
} else {
let (stuck, diff) = peer.is_stuck();
if stuck && diff < self.adapter.total_difficulty() {
debug!("clean_peers {:?}, stuck peer", peer.info.addr);
peer.stop();
let _ = self.update_state(peer.info.addr, State::Defunct);
rm.push(peer.clone());
rm.push(peer.info.addr.clone());
}
}
}
// ensure we do not still have too many connected peers
let excess_count = (self.peer_count() as usize - rm.len()).saturating_sub(max_count);
if excess_count > 0 {
// map peers to addrs in a block to bound how long we keep the read lock for
let mut addrs = self
.connected_peers()
.iter()
.take(excess_count)
.map(|x| x.info.addr.clone())
.collect::<Vec<_>>();
rm.append(&mut addrs);
}
// now clean up peer map based on the list to remove
{
let mut peers = self.peers.write();
for p in rm {
peers.remove(&p.info.addr);
let _ = peers.get(&p).map(|p| p.stop());
peers.remove(&p);
}
}
// ensure we do not have too many connected peers
let excess_count = {
let peer_count = self.peer_count() as usize;
if peer_count > max_count {
peer_count - max_count
} else {
0
}
};
// map peers to addrs in a block to bound how long we keep the read lock for
let addrs = self
.connected_peers()
.iter()
.map(|x| x.info.addr.clone())
.collect::<Vec<_>>();
// now remove them taking a short-lived write lock each time
// maybe better to take write lock once and remove them all?
for x in addrs.iter().take(excess_count) {
let mut peers = self.peers.write();
peers.remove(x);
}
}
pub fn stop(&self) {

View file

@ -150,9 +150,9 @@ pub struct PeerStats {
/// Last time we saw a ping/pong from this peer.
pub last_seen: DateTime<Utc>,
/// Number of bytes we've sent to the peer.
pub sent_bytes: Option<u64>,
pub sent_bytes_per_sec: u64,
/// Number of bytes we've received from the peer.
pub received_bytes: Option<u64>,
pub received_bytes_per_sec: u64,
}
impl StratumStats {
@ -186,8 +186,8 @@ impl PeerStats {
height: peer.info.height(),
direction: direction.to_string(),
last_seen: peer.info.last_seen(),
sent_bytes: peer.sent_bytes(),
received_bytes: peer.received_bytes(),
sent_bytes_per_sec: peer.last_min_sent_bytes().unwrap_or(0) / 60,
received_bytes_per_sec: peer.last_min_received_bytes().unwrap_or(0) / 60,
}
}
}

View file

@ -57,16 +57,8 @@ impl PeerColumn {
impl TableViewItem<PeerColumn> for PeerStats {
fn to_column(&self, column: PeerColumn) -> String {
// Converts optional size to human readable size
fn size_to_string(size: Option<u64>) -> String {
if let Some(n) = size {
let size = n.file_size(CONVENTIONAL);
match size {
Ok(size) => size,
Err(_) => "-".to_string(),
}
} else {
"-".to_string()
}
fn size_to_string(size: u64) -> String {
size.file_size(CONVENTIONAL).unwrap_or("-".to_string())
}
match column {
@ -74,8 +66,8 @@ impl TableViewItem<PeerColumn> for PeerStats {
PeerColumn::State => self.state.clone(),
PeerColumn::UsedBandwidth => format!(
"S: {}, R: {}",
size_to_string(self.sent_bytes),
size_to_string(self.received_bytes),
size_to_string(self.sent_bytes_per_sec),
size_to_string(self.received_bytes_per_sec),
).to_string(),
PeerColumn::TotalDifficulty => format!(
"{} D @ {} H ({}s)",
@ -94,10 +86,10 @@ impl TableViewItem<PeerColumn> for PeerStats {
{
// Compares used bandwidth of two peers
fn cmp_used_bandwidth(curr: &PeerStats, other: &PeerStats) -> Ordering {
let curr_recv_bytes = curr.received_bytes.unwrap_or(0);
let curr_sent_bytes = curr.sent_bytes.unwrap_or(0);
let other_recv_bytes = other.received_bytes.unwrap_or(0);
let other_sent_bytes = other.sent_bytes.unwrap_or(0);
let curr_recv_bytes = curr.received_bytes_per_sec;
let curr_sent_bytes = curr.sent_bytes_per_sec;
let other_recv_bytes = other.received_bytes_per_sec;
let other_sent_bytes = other.sent_bytes_per_sec;
let curr_sum = curr_recv_bytes + curr_sent_bytes;
let other_sum = other_recv_bytes + other_sent_bytes;

View file

@ -71,6 +71,9 @@ pub mod file;
/// Compress and decompress zip bz2 archives
pub mod zip;
mod rate_counter;
pub use rate_counter::RateCounter;
/// Encapsulation of a RwLock<Option<T>> for one-time initialization.
/// This implementation will purposefully fail hard if not used
/// properly, for example if not initialized before being first used

70
util/src/rate_counter.rs Normal file
View file

@ -0,0 +1,70 @@
// Copyright 2018 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/// Utility to track the rate of data transfers
use std::time::{Duration, SystemTime};
/// A rate counter tracks the number of transfers, the amount of data
/// exchanged and the rate of transfer (via a few timers) over the last
/// minute. The counter does not try to be accurate and update times
/// proactively, instead it only does so lazily. As a result, produced
/// rates are worst-case estimates.
pub struct RateCounter {
last_min_bytes: Vec<u64>,
last_min_count: u64,
last_min_times: Vec<u64>,
}
impl RateCounter {
/// Instantiate a new rate counter
pub fn new() -> RateCounter {
RateCounter {
last_min_bytes: vec![],
last_min_count: 0,
last_min_times: vec![],
}
}
/// Increments number of bytes transferred, updating counts and rates.
pub fn inc(&mut self, bytes: u64) {
let now_millis = millis_since_epoch();
self.last_min_times.push(now_millis);
self.last_min_bytes.push(bytes);
self.last_min_count += 1;
while self.last_min_times.len() > 0 && self.last_min_times[0] > now_millis + 60000 {
self.last_min_times.pop();
self.last_min_bytes.pop();
self.last_min_count -= 1;
}
}
/// Number of bytes counted in the last minute
pub fn bytes_per_min(&self) -> u64 {
self.last_min_bytes.iter().sum()
}
/// Count of increases in the last minute
pub fn count_per_min(&self) -> u64 {
self.last_min_count
}
}
// turns out getting the millisecs since epoch in Rust isn't as easy as it
// could be
fn millis_since_epoch() -> u64 {
let since_epoch = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or(Duration::new(0, 0));
since_epoch.as_secs() * 1000 + since_epoch.subsec_millis() as u64
}