diff --git a/p2p/src/conn.rs b/p2p/src/conn.rs index ad84d8507..fa36aec44 100644 --- a/p2p/src/conn.rs +++ b/p2p/src/conn.rs @@ -1,4 +1,4 @@ -// Copyright 2018-2018 The Grin Developers +// Copyright 2018 The Grin Developers // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -26,11 +26,11 @@ use std::mem::size_of; use std::net::TcpStream; use std::sync::{mpsc, Arc}; use std::{cmp, thread, time}; -use util::RwLock; use core::ser; use msg::{read_body, read_exact, read_header, write_all, write_to_buf, MsgHeader, Type}; use types::Error; +use util::{RateCounter, RwLock}; /// A trait to be implemented in order to receive messages from the /// connection. Allows providing an optional response. @@ -146,9 +146,9 @@ pub const SEND_CHANNEL_CAP: usize = 10; pub struct Tracker { /// Bytes we've sent. - pub sent_bytes: Arc>, + pub sent_bytes: Arc>, /// Bytes we've received. - pub received_bytes: Arc>, + pub received_bytes: Arc>, /// Channel to allow sending data through the connection pub send_channel: mpsc::SyncSender>, /// Channel to close the connection @@ -168,7 +168,7 @@ impl Tracker { // Increase sent bytes counter let mut sent_bytes = self.sent_bytes.write(); - *sent_bytes += buf_len as u64; + sent_bytes.inc(buf_len as u64); Ok(()) } @@ -186,7 +186,7 @@ where let (error_tx, error_rx) = mpsc::channel(); // Counter of number of bytes received - let received_bytes = Arc::new(RwLock::new(0)); + let received_bytes = Arc::new(RwLock::new(RateCounter::new())); stream .set_nonblocking(true) @@ -201,7 +201,7 @@ where ); Tracker { - sent_bytes: Arc::new(RwLock::new(0)), + sent_bytes: Arc::new(RwLock::new(RateCounter::new())), received_bytes: received_bytes.clone(), send_channel: send_tx, close_channel: close_tx, @@ -215,7 +215,7 @@ fn poll( send_rx: mpsc::Receiver>, error_tx: mpsc::Sender, close_rx: mpsc::Receiver<()>, - received_bytes: Arc>, + received_bytes: Arc>, ) where H: MessageHandler, { @@ -240,8 +240,7 @@ fn poll( // Increase received bytes counter { let mut received_bytes = received_bytes.write(); - let header_size = size_of::() as u64; - *received_bytes += header_size + msg.header.msg_len; + received_bytes.inc(size_of::() as u64 + msg.header.msg_len); } if let Some(Some(resp)) = try_break!(error_tx, handler.consume(msg)) { @@ -249,25 +248,15 @@ fn poll( } } - // check the write end - if let Ok::, ()>(data) = retry_send { - if let None = - try_break!(error_tx, conn.write_all(&data[..]).map_err(&From::from)) - { + // check the write end, use or_else so try_recv is lazily eval'd + let maybe_data = retry_send.or_else(|_| send_rx.try_recv()); + retry_send = Err(()); + if let Ok(data) = maybe_data { + let written = + try_break!(error_tx, conn.write_all(&data[..]).map_err(&From::from)); + if written.is_none() { retry_send = Ok(data); - } else { - retry_send = Err(()); } - } else if let Ok(data) = send_rx.try_recv() { - if let None = - try_break!(error_tx, conn.write_all(&data[..]).map_err(&From::from)) - { - retry_send = Ok(data); - } else { - retry_send = Err(()); - } - } else { - retry_send = Err(()); } // check the close channel diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 78507defa..2aaae3a72 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -30,6 +30,7 @@ use types::{ }; const MAX_TRACK_SIZE: usize = 30; +const MAX_PEER_MSG_PER_MIN: u64 = 300; #[derive(Debug, Clone, Copy, PartialEq, Eq)] /// Remind: don't mix up this 'State' with that 'State' in p2p/src/store.rs, @@ -149,22 +150,35 @@ impl Peer { } } + /// Whether the peer is considered abusive, mostly for spammy nodes + pub fn is_abusive(&self) -> bool { + if let Some(ref conn) = self.connection { + let conn = conn.lock(); + let rec = conn.received_bytes.read(); + let sent = conn.sent_bytes.read(); + rec.count_per_min() > MAX_PEER_MSG_PER_MIN + || sent.count_per_min() > MAX_PEER_MSG_PER_MIN + } else { + false + } + } + /// Number of bytes sent to the peer - pub fn sent_bytes(&self) -> Option { + pub fn last_min_sent_bytes(&self) -> Option { if let Some(ref tracker) = self.connection { let conn = tracker.lock(); let sent_bytes = conn.sent_bytes.read(); - return Some(*sent_bytes); + return Some(sent_bytes.bytes_per_min()); } None } /// Number of bytes received from the peer - pub fn received_bytes(&self) -> Option { + pub fn last_min_received_bytes(&self) -> Option { if let Some(ref tracker) = self.connection { let conn = tracker.lock(); let received_bytes = conn.received_bytes.read(); - return Some(*received_bytes); + return Some(received_bytes.bytes_per_min()); } None } diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index 96f936a15..49c35dd3a 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -385,52 +385,45 @@ impl Peers { for peer in self.peers.read().values() { if peer.is_banned() { debug!("clean_peers {:?}, peer banned", peer.info.addr); - rm.push(peer.clone()); + rm.push(peer.info.addr.clone()); } else if !peer.is_connected() { debug!("clean_peers {:?}, not connected", peer.info.addr); - rm.push(peer.clone()); + rm.push(peer.info.addr.clone()); + } else if peer.is_abusive() { + debug!("clean_peers {:?}, abusive", peer.info.addr); + let _ = self.update_state(peer.info.addr, State::Banned); + rm.push(peer.info.addr.clone()); } else { let (stuck, diff) = peer.is_stuck(); if stuck && diff < self.adapter.total_difficulty() { debug!("clean_peers {:?}, stuck peer", peer.info.addr); - peer.stop(); let _ = self.update_state(peer.info.addr, State::Defunct); - rm.push(peer.clone()); + rm.push(peer.info.addr.clone()); } } } + // ensure we do not still have too many connected peers + let excess_count = (self.peer_count() as usize - rm.len()).saturating_sub(max_count); + if excess_count > 0 { + // map peers to addrs in a block to bound how long we keep the read lock for + let mut addrs = self + .connected_peers() + .iter() + .take(excess_count) + .map(|x| x.info.addr.clone()) + .collect::>(); + rm.append(&mut addrs); + } + // now clean up peer map based on the list to remove { let mut peers = self.peers.write(); for p in rm { - peers.remove(&p.info.addr); + let _ = peers.get(&p).map(|p| p.stop()); + peers.remove(&p); } } - - // ensure we do not have too many connected peers - let excess_count = { - let peer_count = self.peer_count() as usize; - if peer_count > max_count { - peer_count - max_count - } else { - 0 - } - }; - - // map peers to addrs in a block to bound how long we keep the read lock for - let addrs = self - .connected_peers() - .iter() - .map(|x| x.info.addr.clone()) - .collect::>(); - - // now remove them taking a short-lived write lock each time - // maybe better to take write lock once and remove them all? - for x in addrs.iter().take(excess_count) { - let mut peers = self.peers.write(); - peers.remove(x); - } } pub fn stop(&self) { diff --git a/servers/src/common/stats.rs b/servers/src/common/stats.rs index b5e0a4c26..181596cb5 100644 --- a/servers/src/common/stats.rs +++ b/servers/src/common/stats.rs @@ -150,9 +150,9 @@ pub struct PeerStats { /// Last time we saw a ping/pong from this peer. pub last_seen: DateTime, /// Number of bytes we've sent to the peer. - pub sent_bytes: Option, + pub sent_bytes_per_sec: u64, /// Number of bytes we've received from the peer. - pub received_bytes: Option, + pub received_bytes_per_sec: u64, } impl StratumStats { @@ -186,8 +186,8 @@ impl PeerStats { height: peer.info.height(), direction: direction.to_string(), last_seen: peer.info.last_seen(), - sent_bytes: peer.sent_bytes(), - received_bytes: peer.received_bytes(), + sent_bytes_per_sec: peer.last_min_sent_bytes().unwrap_or(0) / 60, + received_bytes_per_sec: peer.last_min_received_bytes().unwrap_or(0) / 60, } } } diff --git a/src/bin/tui/peers.rs b/src/bin/tui/peers.rs index d4d212f32..dcb3e84ac 100644 --- a/src/bin/tui/peers.rs +++ b/src/bin/tui/peers.rs @@ -57,16 +57,8 @@ impl PeerColumn { impl TableViewItem for PeerStats { fn to_column(&self, column: PeerColumn) -> String { // Converts optional size to human readable size - fn size_to_string(size: Option) -> String { - if let Some(n) = size { - let size = n.file_size(CONVENTIONAL); - match size { - Ok(size) => size, - Err(_) => "-".to_string(), - } - } else { - "-".to_string() - } + fn size_to_string(size: u64) -> String { + size.file_size(CONVENTIONAL).unwrap_or("-".to_string()) } match column { @@ -74,8 +66,8 @@ impl TableViewItem for PeerStats { PeerColumn::State => self.state.clone(), PeerColumn::UsedBandwidth => format!( "S: {}, R: {}", - size_to_string(self.sent_bytes), - size_to_string(self.received_bytes), + size_to_string(self.sent_bytes_per_sec), + size_to_string(self.received_bytes_per_sec), ).to_string(), PeerColumn::TotalDifficulty => format!( "{} D @ {} H ({}s)", @@ -94,10 +86,10 @@ impl TableViewItem for PeerStats { { // Compares used bandwidth of two peers fn cmp_used_bandwidth(curr: &PeerStats, other: &PeerStats) -> Ordering { - let curr_recv_bytes = curr.received_bytes.unwrap_or(0); - let curr_sent_bytes = curr.sent_bytes.unwrap_or(0); - let other_recv_bytes = other.received_bytes.unwrap_or(0); - let other_sent_bytes = other.sent_bytes.unwrap_or(0); + let curr_recv_bytes = curr.received_bytes_per_sec; + let curr_sent_bytes = curr.sent_bytes_per_sec; + let other_recv_bytes = other.received_bytes_per_sec; + let other_sent_bytes = other.sent_bytes_per_sec; let curr_sum = curr_recv_bytes + curr_sent_bytes; let other_sum = other_recv_bytes + other_sent_bytes; diff --git a/util/src/lib.rs b/util/src/lib.rs index c1714cd80..fe5aba3b6 100644 --- a/util/src/lib.rs +++ b/util/src/lib.rs @@ -71,6 +71,9 @@ pub mod file; /// Compress and decompress zip bz2 archives pub mod zip; +mod rate_counter; +pub use rate_counter::RateCounter; + /// Encapsulation of a RwLock> for one-time initialization. /// This implementation will purposefully fail hard if not used /// properly, for example if not initialized before being first used diff --git a/util/src/rate_counter.rs b/util/src/rate_counter.rs new file mode 100644 index 000000000..f4e929130 --- /dev/null +++ b/util/src/rate_counter.rs @@ -0,0 +1,70 @@ +// Copyright 2018 The Grin Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// Utility to track the rate of data transfers +use std::time::{Duration, SystemTime}; + +/// A rate counter tracks the number of transfers, the amount of data +/// exchanged and the rate of transfer (via a few timers) over the last +/// minute. The counter does not try to be accurate and update times +/// proactively, instead it only does so lazily. As a result, produced +/// rates are worst-case estimates. +pub struct RateCounter { + last_min_bytes: Vec, + last_min_count: u64, + last_min_times: Vec, +} + +impl RateCounter { + /// Instantiate a new rate counter + pub fn new() -> RateCounter { + RateCounter { + last_min_bytes: vec![], + last_min_count: 0, + last_min_times: vec![], + } + } + + /// Increments number of bytes transferred, updating counts and rates. + pub fn inc(&mut self, bytes: u64) { + let now_millis = millis_since_epoch(); + self.last_min_times.push(now_millis); + self.last_min_bytes.push(bytes); + self.last_min_count += 1; + while self.last_min_times.len() > 0 && self.last_min_times[0] > now_millis + 60000 { + self.last_min_times.pop(); + self.last_min_bytes.pop(); + self.last_min_count -= 1; + } + } + + /// Number of bytes counted in the last minute + pub fn bytes_per_min(&self) -> u64 { + self.last_min_bytes.iter().sum() + } + + /// Count of increases in the last minute + pub fn count_per_min(&self) -> u64 { + self.last_min_count + } +} + +// turns out getting the millisecs since epoch in Rust isn't as easy as it +// could be +fn millis_since_epoch() -> u64 { + let since_epoch = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or(Duration::new(0, 0)); + since_epoch.as_secs() * 1000 + since_epoch.subsec_millis() as u64 +}