diff --git a/p2p/src/conn.rs b/p2p/src/conn.rs index 646102229..b3f0ca0fd 100644 --- a/p2p/src/conn.rs +++ b/p2p/src/conn.rs @@ -140,9 +140,10 @@ impl<'a> Response<'a> { Ok(0) => break, Ok(n) => { write_all(&mut self.conn, &buf[..n], time::Duration::from_secs(10))?; - // Increase sent bytes counter + // Increase sent bytes "quietly" without incrementing the counter. + // (In a loop here for the single attachment). let mut sent_bytes = sent_bytes.write(); - sent_bytes.inc(n as u64); + sent_bytes.inc_quiet(n as u64); } Err(e) => return Err(From::from(e)), } diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index b94e1f3f0..b3c15261f 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -300,10 +300,11 @@ impl MessageHandler for Protocol { total_size as u64, ); - // Increase received bytes counter + // Increase received bytes quietly (without affecting the counters). + // Otherwise we risk banning a peer as "abusive". { let mut received_bytes = received_bytes.write(); - received_bytes.inc(size as u64); + received_bytes.inc_quiet(size as u64); } } tmp_zip.into_inner().unwrap().sync_all()?; diff --git a/util/src/rate_counter.rs b/util/src/rate_counter.rs index 78b1d6ca3..dc94d645a 100644 --- a/util/src/rate_counter.rs +++ b/util/src/rate_counter.rs @@ -15,44 +15,88 @@ /// Utility to track the rate of data transfers use std::time::{Duration, SystemTime}; +struct Entry { + bytes: u64, + timestamp: u64, +} + +impl Entry { + fn new(bytes: u64) -> Entry { + Entry { + bytes, + timestamp: millis_since_epoch(), + } + } + + // Create new "quiet" entry with zero timestamp. + // This will count toward total bytes but will not affect the "msg rate". + fn new_quiet(bytes: u64) -> Entry { + Entry { + bytes, + timestamp: 0, + } + } + + // We want to filter out "quiet" entries when calculating the "msg rate". + fn is_quiet(&self) -> bool { + self.timestamp == 0 + } +} + /// A rate counter tracks the number of transfers, the amount of data /// exchanged and the rate of transfer (via a few timers) over the last /// minute. The counter does not try to be accurate and update times /// proactively, instead it only does so lazily. As a result, produced /// rates are worst-case estimates. pub struct RateCounter { - last_min_bytes: Vec, - last_min_times: Vec, + last_min_entries: Vec, } impl RateCounter { /// Instantiate a new rate counter pub fn new() -> RateCounter { RateCounter { - last_min_bytes: vec![], - last_min_times: vec![], + last_min_entries: vec![], } } /// Increments number of bytes transferred, updating counts and rates. pub fn inc(&mut self, bytes: u64) { + self.last_min_entries.push(Entry::new(bytes)); + self.truncate(); + } + + /// Increments number of bytes without updating the count or rate. + /// We filter out 0 last_min_times when calculating rate. + /// Used during txhashset.zip download to track bytes downloaded + /// without treating a peer as abusive (too high a rate of download). + pub fn inc_quiet(&mut self, bytes: u64) { + self.last_min_entries.push(Entry::new_quiet(bytes)); + self.truncate(); + } + + fn truncate(&mut self) { let now_millis = millis_since_epoch(); - self.last_min_times.push(now_millis); - self.last_min_bytes.push(bytes); - while self.last_min_times.len() > 0 && self.last_min_times[0] + 60000 < now_millis { - self.last_min_times.remove(0); - self.last_min_bytes.remove(0); + while self.last_min_entries.len() > 0 + && self.last_min_entries[0].timestamp + 60000 < now_millis + { + self.last_min_entries.remove(0); } } - /// Number of bytes counted in the last minute + /// Number of bytes counted in the last minute. + /// Includes "quiet" byte increments. pub fn bytes_per_min(&self) -> u64 { - self.last_min_bytes.iter().sum() + self.last_min_entries.iter().map(|x| x.bytes).sum() } - /// Count of increases in the last minute + /// Count of increases in the last minute. + /// Excludes "quiet" byte increments. pub fn count_per_min(&self) -> u64 { - self.last_min_bytes.len() as u64 + self.last_min_entries + .iter() + .filter(|x| !x.is_quiet()) + .count() as u64 } }