From 6a15100c960c178aa693528bc761af4f745a5168 Mon Sep 17 00:00:00 2001 From: Jacob Payne Date: Sat, 17 Jun 2017 16:15:46 -0700 Subject: [PATCH] [WIP] Rate Limiting (#57) * Setup tests for rate limiting * Add send and receive rate configs to P2P Config * Setup ThrottledConnection and ThrottledWrite * Add Tokio io * Fix ThrottledReader + Add Bytes * Attach Rate Limiting to Connection --- p2p/Cargo.toml | 2 + p2p/src/conn.rs | 32 +++--- p2p/src/lib.rs | 3 + p2p/src/peer.rs | 16 +-- p2p/src/rate_limit.rs | 190 ++++++++++++++++++++++++++++++++++++ p2p/src/server.rs | 18 ++-- p2p/tests/peer_handshake.rs | 88 +++++++++-------- 7 files changed, 280 insertions(+), 69 deletions(-) create mode 100644 p2p/src/rate_limit.rs diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index c2e1afc8a..f076db60b 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -13,8 +13,10 @@ net2 = "0.2.0" rand = "^0.3" serde = "~1.0.8" serde_derive = "~1.0.8" +bytes = "0.4.3" tokio-core="^0.1.1" tokio-timer="^0.1.0" +tokio-io="^0.1" time = "^0.1" enum_primitive = "^0.1.0" num = "^0.1.36" diff --git a/p2p/src/conn.rs b/p2p/src/conn.rs index 5e873bae2..e7523d4f2 100644 --- a/p2p/src/conn.rs +++ b/p2p/src/conn.rs @@ -24,14 +24,16 @@ use futures; use futures::{Stream, Future}; use futures::stream; use futures::sync::mpsc::{Sender, UnboundedSender, UnboundedReceiver}; -use tokio_core::io::{Io, WriteHalf, ReadHalf, write_all, read_exact}; +use tokio_core::io::{WriteHalf, ReadHalf, write_all, read_exact}; use tokio_core::net::TcpStream; use tokio_timer::{Timer, TimerError}; +use tokio_io::*; use core::core::hash::{Hash, ZERO_HASH}; use core::ser; use msg::*; use types::Error; +use rate_limit::*; /// Handler to provide to the connection, will be called back anytime a message /// is received. The provided sender can be use to immediately send back @@ -92,6 +94,11 @@ impl Connection { let (reader, writer) = conn.split(); + // Set Max Read to 12 Mb/s + let reader = ThrottledReader::new(reader, 12_000_000); + // Set Max Write to 12 Mb/s + let writer = ThrottledWriter::new(writer, 12_000_000); + // prepare the channel that will transmit data to the connection writer let (tx, rx) = futures::sync::mpsc::unbounded(); @@ -125,10 +132,12 @@ impl Connection { /// Prepares the future that gets message data produced by our system and /// sends it to the peer connection - fn write_msg(&self, - rx: UnboundedReceiver>, - writer: WriteHalf) - -> Box, Error = Error>> { + fn write_msg(&self, + rx: UnboundedReceiver>, + writer: W) + -> Box> + where W: AsyncWrite + 'static + { let sent_bytes = self.sent_bytes.clone(); let send_data = rx @@ -148,12 +157,13 @@ impl Connection { /// Prepares the future reading from the peer connection, parsing each /// message and forwarding them appropriately based on their type - fn read_msg(&self, - sender: UnboundedSender>, - reader: ReadHalf, - handler: F) - -> Box, Error = Error>> - where F: Handler + 'static + fn read_msg(&self, + sender: UnboundedSender>, + reader: R, + handler: F) + -> Box> + where F: Handler + 'static, + R: AsyncRead + 'static { // infinite iterator stream so we repeat the message reading logic until the diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index f68d4e968..e42bf2f91 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -33,6 +33,8 @@ extern crate log; extern crate futures; #[macro_use] extern crate tokio_core; +extern crate tokio_io; +extern crate bytes; extern crate tokio_timer; extern crate rand; extern crate serde; @@ -43,6 +45,7 @@ extern crate num; mod conn; pub mod handshake; +mod rate_limit; mod msg; mod peer; mod protocol; diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index dce0a0ff5..562835373 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -52,10 +52,10 @@ impl Peer { .and_then(|(conn, proto, info)| { Ok((conn, Peer { - info: info, - proto: Box::new(proto), - state: Arc::new(RwLock::new(State::Connected)), - })) + info: info, + proto: Box::new(proto), + state: Arc::new(RwLock::new(State::Connected)), + })) }); Box::new(connect_peer) } @@ -70,10 +70,10 @@ impl Peer { .and_then(|(conn, proto, info)| { Ok((conn, Peer { - info: info, - proto: Box::new(proto), - state: Arc::new(RwLock::new(State::Connected)), - })) + info: info, + proto: Box::new(proto), + state: Arc::new(RwLock::new(State::Connected)), + })) }); Box::new(hs_peer) } diff --git a/p2p/src/rate_limit.rs b/p2p/src/rate_limit.rs new file mode 100644 index 000000000..0cf6ca7a3 --- /dev/null +++ b/p2p/src/rate_limit.rs @@ -0,0 +1,190 @@ +// Copyright 2016 The Grin Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Provides wrappers for throttling readers and writers + +use std::time::{Instant, Duration}; +use std::io; + +use futures::*; +use tokio_io::*; +use bytes::{Buf, BytesMut, BufMut}; + +/// A Rate Limited Reader +#[derive(Debug)] +pub struct ThrottledReader { + reader: R, + /// Max Bytes per second + max: u32, + /// Stores a count of last request and last request time + allowed: isize, + last_check: Instant, +} + +impl ThrottledReader { + /// Adds throttling to a reader. + /// The resulting reader will read at most `max` amount of bytes per second + pub fn new(reader: R, max: u32) -> Self { + ThrottledReader { + reader: reader, + max: max, + allowed: max as isize, + last_check: Instant::now(), + } + } + + /// Get a shared reference to the inner sink. + pub fn get_ref(&self) -> &R { + &self.reader + } + + /// Get a mutable reference to the inner sink. + pub fn get_mut(&mut self) -> &mut R { + &mut self.reader + } + + /// Consumes this combinator, returning the underlying sink. + /// + /// Note that this may discard intermediate state of this combinator, so + /// care should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> R { + self.reader + } +} + +impl io::Read for ThrottledReader { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.reader.read(buf) + } +} + +impl AsyncRead for ThrottledReader { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + self.reader.prepare_uninitialized_buffer(buf) + } + + fn read_buf(&mut self, buf: &mut B) -> Poll { + // Check passed Time + let time_passed = self.last_check.elapsed(); + self.last_check = Instant::now(); + self.allowed += time_passed.as_secs() as isize * self.max as isize; + + // Throttle + if self.allowed > self.max as isize { + self.allowed = self.max as isize; + } + + // Check if Allowed + if self.allowed < 1 { + return Ok(Async::NotReady); + } + + // Since we can't limit the scope that is read, + // we use a signed `allowed` counter in case n > allowed + let res = self.reader.read_buf(buf); + + // Decrement Allowed amount written + if let Ok(Async::Ready(n)) = res { + self.allowed = self.allowed.saturating_sub(n as isize); + } + res + } +} + +/// A Rate Limited Writer +#[derive(Debug)] +pub struct ThrottledWriter { + writer: W, + /// Max Bytes per second + max: u32, + /// Stores a count of last request and last request time + allowed: usize, + last_check: Instant, +} + +impl ThrottledWriter { + /// Adds throttling to a writer. + /// The resulting writer will write at most `max` amount of bytes per second + pub fn new(writer: W, max: u32) -> Self { + ThrottledWriter { + writer: writer, + max: max, + allowed: max as usize, + last_check: Instant::now(), + } + } + + /// Get a shared reference to the inner sink. + pub fn get_ref(&self) -> &W { + &self.writer + } + + /// Get a mutable reference to the inner sink. + pub fn get_mut(&mut self) -> &mut W { + &mut self.writer + } + + /// Consumes this combinator, returning the underlying sink. + /// + /// Note that this may discard intermediate state of this combinator, so + /// care should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> W { + self.writer + } +} + +impl io::Write for ThrottledWriter { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.writer.write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + self.writer.flush() + } +} + +impl AsyncWrite for ThrottledWriter { + fn shutdown(&mut self) -> Poll<(), io::Error> { + self.writer.shutdown() + } + + fn write_buf(&mut self, buf: &mut B) -> Poll + where Self: Sized + { + // Check passed Time + let time_passed = self.last_check.elapsed(); + self.last_check = Instant::now(); + self.allowed += time_passed.as_secs() as usize * self.max as usize; + + // Throttle + if self.allowed > self.max as usize { + self.allowed = self.max as usize; + } + + // Check if Allowed + if self.allowed < 1 { + return Ok(Async::NotReady); + } + + // Write max allowed + let ref mut lbuf = buf.by_ref().take(self.allowed); + let res = self.writer.write_buf(lbuf); + + // Decrement Allowed amount written + if let Ok(Async::Ready(n)) = res { + self.allowed -= n; + } + res + } +} diff --git a/p2p/src/server.rs b/p2p/src/server.rs index 71d020bbb..7ebca4d96 100644 --- a/p2p/src/server.rs +++ b/p2p/src/server.rs @@ -132,11 +132,9 @@ impl Server { let mut stop_mut = self.stop.borrow_mut(); *stop_mut = Some(stop); } - Box::new(server.select(stop_rx.map_err(|_| Error::ConnectionClose)).then(|res| { - match res { - Ok((_, _)) => Ok(()), - Err((e, _)) => Err(e), - } + Box::new(server.select(stop_rx.map_err(|_| Error::ConnectionClose)).then(|res| match res { + Ok((_, _)) => Ok(()), + Err((e, _)) => Err(e), })) } @@ -304,12 +302,10 @@ fn with_timeout(fut: Box, Error = Error> -> Box> { let timeout = reactor::Timeout::new(Duration::new(5, 0), h).unwrap(); let timed = fut.select(timeout.map(Err).from_err()) - .then(|res| { - match res { - Ok((Ok(inner), _timeout)) => Ok(inner), - Ok((_, _accept)) => Err(Error::Timeout), - Err((e, _other)) => Err(e), - } + .then(|res| match res { + Ok((Ok(inner), _timeout)) => Ok(inner), + Ok((_, _accept)) => Err(Error::Timeout), + Err((e, _other)) => Err(e), }); Box::new(timed) } diff --git a/p2p/tests/peer_handshake.rs b/p2p/tests/peer_handshake.rs index 28cad305f..2d945481e 100644 --- a/p2p/tests/peer_handshake.rs +++ b/p2p/tests/peer_handshake.rs @@ -30,49 +30,59 @@ use core::ser; use core::core::target::Difficulty; use p2p::Peer; -// Starts a server and connects a client peer to it to check handshake, followed by a ping/pong exchange to make sure the connection is live. +// Starts a server and connects a client peer to it to check handshake, +// followed by a ping/pong exchange to make sure the connection is live. #[test] fn peer_handshake() { - env_logger::init().unwrap(); + env_logger::init().unwrap(); - let mut evtlp = Core::new().unwrap(); - let handle = evtlp.handle(); - let p2p_conf = p2p::P2PConfig::default(); - let net_adapter = Arc::new(p2p::DummyAdapter{}); - let server = p2p::Server::new(p2p::UNKNOWN, p2p_conf, net_adapter.clone()); - let run_server = server.start(handle.clone()); - let my_addr = "127.0.0.1:5000".parse().unwrap(); + let mut evtlp = Core::new().unwrap(); + let handle = evtlp.handle(); + let p2p_conf = p2p::P2PConfig::default(); + let net_adapter = Arc::new(p2p::DummyAdapter {}); + let server = p2p::Server::new(p2p::UNKNOWN, p2p_conf, net_adapter.clone()); + let run_server = server.start(handle.clone()); + let my_addr = "127.0.0.1:5000".parse().unwrap(); - let phandle = handle.clone(); - let rhandle = handle.clone(); - let timeout = reactor::Timeout::new(time::Duration::new(1, 0), &handle).unwrap(); - let timeout_send = reactor::Timeout::new(time::Duration::new(2, 0), &handle).unwrap(); - handle.spawn(timeout.from_err().and_then(move |_| { - let p2p_conf = p2p::P2PConfig::default(); - let addr = SocketAddr::new(p2p_conf.host, p2p_conf.port); - let socket = TcpStream::connect(&addr, &phandle).map_err(|e| p2p::Error::Connection(e)); - socket.and_then(move |socket| { - Peer::connect(socket, p2p::UNKNOWN, Difficulty::one(), my_addr, &p2p::handshake::Handshake::new()) - }).and_then(move |(socket, peer)| { - rhandle.spawn(peer.run(socket, net_adapter.clone()).map_err(|e| { - panic!("Client run failed: {:?}", e); - })); - peer.send_ping().unwrap(); - timeout_send.from_err().map(|_| peer) - }).and_then(|peer| { - let (sent, recv) = peer.transmitted_bytes(); - assert!(sent > 0); - assert!(recv > 0); - Ok(()) - }).and_then(|_| { - assert!(server.peer_count() > 0); - server.stop(); - Ok(()) - }) - }).map_err(|e| { - panic!("Client connection failed: {:?}", e); - })); + let phandle = handle.clone(); + let rhandle = handle.clone(); + let timeout = reactor::Timeout::new(time::Duration::new(1, 0), &handle).unwrap(); + let timeout_send = reactor::Timeout::new(time::Duration::new(2, 0), &handle).unwrap(); + handle.spawn(timeout.from_err() + .and_then(move |_| { + let p2p_conf = p2p::P2PConfig::default(); + let addr = SocketAddr::new(p2p_conf.host, p2p_conf.port); + let socket = TcpStream::connect(&addr, &phandle).map_err(|e| p2p::Error::Connection(e)); + socket.and_then(move |socket| { + Peer::connect(socket, + p2p::UNKNOWN, + Difficulty::one(), + my_addr, + &p2p::handshake::Handshake::new()) + }) + .and_then(move |(socket, peer)| { + rhandle.spawn(peer.run(socket, net_adapter.clone()).map_err(|e| { + panic!("Client run failed: {:?}", e); + })); + peer.send_ping().unwrap(); + timeout_send.from_err().map(|_| peer) + }) + .and_then(|peer| { + let (sent, recv) = peer.transmitted_bytes(); + assert!(sent > 0); + assert!(recv > 0); + Ok(()) + }) + .and_then(|_| { + assert!(server.peer_count() > 0); + server.stop(); + Ok(()) + }) + }) + .map_err(|e| { + panic!("Client connection failed: {:?}", e); + })); - evtlp.run(run_server).unwrap(); + evtlp.run(run_server).unwrap(); }