[WIP] Rate Limiting (#57)

* Setup tests for rate limiting
* Add send and receive rate configs to P2P Config
* Setup ThrottledConnection and ThrottledWrite
* Add Tokio io
* Fix ThrottledReader + Add Bytes
* Attach Rate Limiting to Connection
This commit is contained in:
Jacob Payne 2017-06-17 16:15:46 -07:00 committed by Ignotus Peverell
parent a82f9ce415
commit 6a15100c96
7 changed files with 280 additions and 69 deletions

View file

@ -13,8 +13,10 @@ net2 = "0.2.0"
rand = "^0.3"
serde = "~1.0.8"
serde_derive = "~1.0.8"
bytes = "0.4.3"
tokio-core="^0.1.1"
tokio-timer="^0.1.0"
tokio-io="^0.1"
time = "^0.1"
enum_primitive = "^0.1.0"
num = "^0.1.36"

View file

@ -24,14 +24,16 @@ use futures;
use futures::{Stream, Future};
use futures::stream;
use futures::sync::mpsc::{Sender, UnboundedSender, UnboundedReceiver};
use tokio_core::io::{Io, WriteHalf, ReadHalf, write_all, read_exact};
use tokio_core::io::{WriteHalf, ReadHalf, write_all, read_exact};
use tokio_core::net::TcpStream;
use tokio_timer::{Timer, TimerError};
use tokio_io::*;
use core::core::hash::{Hash, ZERO_HASH};
use core::ser;
use msg::*;
use types::Error;
use rate_limit::*;
/// Handler to provide to the connection, will be called back anytime a message
/// is received. The provided sender can be use to immediately send back
@ -92,6 +94,11 @@ impl Connection {
let (reader, writer) = conn.split();
// Set Max Read to 12 Mb/s
let reader = ThrottledReader::new(reader, 12_000_000);
// Set Max Write to 12 Mb/s
let writer = ThrottledWriter::new(writer, 12_000_000);
// prepare the channel that will transmit data to the connection writer
let (tx, rx) = futures::sync::mpsc::unbounded();
@ -125,10 +132,12 @@ impl Connection {
/// Prepares the future that gets message data produced by our system and
/// sends it to the peer connection
fn write_msg(&self,
rx: UnboundedReceiver<Vec<u8>>,
writer: WriteHalf<TcpStream>)
-> Box<Future<Item = WriteHalf<TcpStream>, Error = Error>> {
fn write_msg<W>(&self,
rx: UnboundedReceiver<Vec<u8>>,
writer: W)
-> Box<Future<Item = W, Error = Error>>
where W: AsyncWrite + 'static
{
let sent_bytes = self.sent_bytes.clone();
let send_data = rx
@ -148,12 +157,13 @@ impl Connection {
/// Prepares the future reading from the peer connection, parsing each
/// message and forwarding them appropriately based on their type
fn read_msg<F>(&self,
sender: UnboundedSender<Vec<u8>>,
reader: ReadHalf<TcpStream>,
handler: F)
-> Box<Future<Item = ReadHalf<TcpStream>, Error = Error>>
where F: Handler + 'static
fn read_msg<F, R>(&self,
sender: UnboundedSender<Vec<u8>>,
reader: R,
handler: F)
-> Box<Future<Item = R, Error = Error>>
where F: Handler + 'static,
R: AsyncRead + 'static
{
// infinite iterator stream so we repeat the message reading logic until the

View file

@ -33,6 +33,8 @@ extern crate log;
extern crate futures;
#[macro_use]
extern crate tokio_core;
extern crate tokio_io;
extern crate bytes;
extern crate tokio_timer;
extern crate rand;
extern crate serde;
@ -43,6 +45,7 @@ extern crate num;
mod conn;
pub mod handshake;
mod rate_limit;
mod msg;
mod peer;
mod protocol;

View file

@ -52,10 +52,10 @@ impl Peer {
.and_then(|(conn, proto, info)| {
Ok((conn,
Peer {
info: info,
proto: Box::new(proto),
state: Arc::new(RwLock::new(State::Connected)),
}))
info: info,
proto: Box::new(proto),
state: Arc::new(RwLock::new(State::Connected)),
}))
});
Box::new(connect_peer)
}
@ -70,10 +70,10 @@ impl Peer {
.and_then(|(conn, proto, info)| {
Ok((conn,
Peer {
info: info,
proto: Box::new(proto),
state: Arc::new(RwLock::new(State::Connected)),
}))
info: info,
proto: Box::new(proto),
state: Arc::new(RwLock::new(State::Connected)),
}))
});
Box::new(hs_peer)
}

190
p2p/src/rate_limit.rs Normal file
View file

@ -0,0 +1,190 @@
// Copyright 2016 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Provides wrappers for throttling readers and writers
use std::time::{Instant, Duration};
use std::io;
use futures::*;
use tokio_io::*;
use bytes::{Buf, BytesMut, BufMut};
/// A Rate Limited Reader
#[derive(Debug)]
pub struct ThrottledReader<R: AsyncRead> {
reader: R,
/// Max Bytes per second
max: u32,
/// Stores a count of last request and last request time
allowed: isize,
last_check: Instant,
}
impl<R: AsyncRead> ThrottledReader<R> {
/// Adds throttling to a reader.
/// The resulting reader will read at most `max` amount of bytes per second
pub fn new(reader: R, max: u32) -> Self {
ThrottledReader {
reader: reader,
max: max,
allowed: max as isize,
last_check: Instant::now(),
}
}
/// Get a shared reference to the inner sink.
pub fn get_ref(&self) -> &R {
&self.reader
}
/// Get a mutable reference to the inner sink.
pub fn get_mut(&mut self) -> &mut R {
&mut self.reader
}
/// Consumes this combinator, returning the underlying sink.
///
/// Note that this may discard intermediate state of this combinator, so
/// care should be taken to avoid losing resources when this is called.
pub fn into_inner(self) -> R {
self.reader
}
}
impl<R: AsyncRead> io::Read for ThrottledReader<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.reader.read(buf)
}
}
impl<R: AsyncRead> AsyncRead for ThrottledReader<R> {
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
self.reader.prepare_uninitialized_buffer(buf)
}
fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
// Check passed Time
let time_passed = self.last_check.elapsed();
self.last_check = Instant::now();
self.allowed += time_passed.as_secs() as isize * self.max as isize;
// Throttle
if self.allowed > self.max as isize {
self.allowed = self.max as isize;
}
// Check if Allowed
if self.allowed < 1 {
return Ok(Async::NotReady);
}
// Since we can't limit the scope that is read,
// we use a signed `allowed` counter in case n > allowed
let res = self.reader.read_buf(buf);
// Decrement Allowed amount written
if let Ok(Async::Ready(n)) = res {
self.allowed = self.allowed.saturating_sub(n as isize);
}
res
}
}
/// A Rate Limited Writer
#[derive(Debug)]
pub struct ThrottledWriter<W: AsyncWrite> {
writer: W,
/// Max Bytes per second
max: u32,
/// Stores a count of last request and last request time
allowed: usize,
last_check: Instant,
}
impl<W: AsyncWrite> ThrottledWriter<W> {
/// Adds throttling to a writer.
/// The resulting writer will write at most `max` amount of bytes per second
pub fn new(writer: W, max: u32) -> Self {
ThrottledWriter {
writer: writer,
max: max,
allowed: max as usize,
last_check: Instant::now(),
}
}
/// Get a shared reference to the inner sink.
pub fn get_ref(&self) -> &W {
&self.writer
}
/// Get a mutable reference to the inner sink.
pub fn get_mut(&mut self) -> &mut W {
&mut self.writer
}
/// Consumes this combinator, returning the underlying sink.
///
/// Note that this may discard intermediate state of this combinator, so
/// care should be taken to avoid losing resources when this is called.
pub fn into_inner(self) -> W {
self.writer
}
}
impl<W: AsyncWrite> io::Write for ThrottledWriter<W> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.writer.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.writer.flush()
}
}
impl<T: AsyncWrite> AsyncWrite for ThrottledWriter<T> {
fn shutdown(&mut self) -> Poll<(), io::Error> {
self.writer.shutdown()
}
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error>
where Self: Sized
{
// Check passed Time
let time_passed = self.last_check.elapsed();
self.last_check = Instant::now();
self.allowed += time_passed.as_secs() as usize * self.max as usize;
// Throttle
if self.allowed > self.max as usize {
self.allowed = self.max as usize;
}
// Check if Allowed
if self.allowed < 1 {
return Ok(Async::NotReady);
}
// Write max allowed
let ref mut lbuf = buf.by_ref().take(self.allowed);
let res = self.writer.write_buf(lbuf);
// Decrement Allowed amount written
if let Ok(Async::Ready(n)) = res {
self.allowed -= n;
}
res
}
}

View file

@ -132,11 +132,9 @@ impl Server {
let mut stop_mut = self.stop.borrow_mut();
*stop_mut = Some(stop);
}
Box::new(server.select(stop_rx.map_err(|_| Error::ConnectionClose)).then(|res| {
match res {
Ok((_, _)) => Ok(()),
Err((e, _)) => Err(e),
}
Box::new(server.select(stop_rx.map_err(|_| Error::ConnectionClose)).then(|res| match res {
Ok((_, _)) => Ok(()),
Err((e, _)) => Err(e),
}))
}
@ -304,12 +302,10 @@ fn with_timeout<T: 'static>(fut: Box<Future<Item = Result<T, ()>, Error = Error>
-> Box<Future<Item = T, Error = Error>> {
let timeout = reactor::Timeout::new(Duration::new(5, 0), h).unwrap();
let timed = fut.select(timeout.map(Err).from_err())
.then(|res| {
match res {
Ok((Ok(inner), _timeout)) => Ok(inner),
Ok((_, _accept)) => Err(Error::Timeout),
Err((e, _other)) => Err(e),
}
.then(|res| match res {
Ok((Ok(inner), _timeout)) => Ok(inner),
Ok((_, _accept)) => Err(Error::Timeout),
Err((e, _other)) => Err(e),
});
Box::new(timed)
}

View file

@ -30,49 +30,59 @@ use core::ser;
use core::core::target::Difficulty;
use p2p::Peer;
// Starts a server and connects a client peer to it to check handshake, followed by a ping/pong exchange to make sure the connection is live.
// Starts a server and connects a client peer to it to check handshake,
// followed by a ping/pong exchange to make sure the connection is live.
#[test]
fn peer_handshake() {
env_logger::init().unwrap();
env_logger::init().unwrap();
let mut evtlp = Core::new().unwrap();
let handle = evtlp.handle();
let p2p_conf = p2p::P2PConfig::default();
let net_adapter = Arc::new(p2p::DummyAdapter{});
let server = p2p::Server::new(p2p::UNKNOWN, p2p_conf, net_adapter.clone());
let run_server = server.start(handle.clone());
let my_addr = "127.0.0.1:5000".parse().unwrap();
let mut evtlp = Core::new().unwrap();
let handle = evtlp.handle();
let p2p_conf = p2p::P2PConfig::default();
let net_adapter = Arc::new(p2p::DummyAdapter {});
let server = p2p::Server::new(p2p::UNKNOWN, p2p_conf, net_adapter.clone());
let run_server = server.start(handle.clone());
let my_addr = "127.0.0.1:5000".parse().unwrap();
let phandle = handle.clone();
let rhandle = handle.clone();
let timeout = reactor::Timeout::new(time::Duration::new(1, 0), &handle).unwrap();
let timeout_send = reactor::Timeout::new(time::Duration::new(2, 0), &handle).unwrap();
handle.spawn(timeout.from_err().and_then(move |_| {
let p2p_conf = p2p::P2PConfig::default();
let addr = SocketAddr::new(p2p_conf.host, p2p_conf.port);
let socket = TcpStream::connect(&addr, &phandle).map_err(|e| p2p::Error::Connection(e));
socket.and_then(move |socket| {
Peer::connect(socket, p2p::UNKNOWN, Difficulty::one(), my_addr, &p2p::handshake::Handshake::new())
}).and_then(move |(socket, peer)| {
rhandle.spawn(peer.run(socket, net_adapter.clone()).map_err(|e| {
panic!("Client run failed: {:?}", e);
}));
peer.send_ping().unwrap();
timeout_send.from_err().map(|_| peer)
}).and_then(|peer| {
let (sent, recv) = peer.transmitted_bytes();
assert!(sent > 0);
assert!(recv > 0);
Ok(())
}).and_then(|_| {
assert!(server.peer_count() > 0);
server.stop();
Ok(())
})
}).map_err(|e| {
panic!("Client connection failed: {:?}", e);
}));
let phandle = handle.clone();
let rhandle = handle.clone();
let timeout = reactor::Timeout::new(time::Duration::new(1, 0), &handle).unwrap();
let timeout_send = reactor::Timeout::new(time::Duration::new(2, 0), &handle).unwrap();
handle.spawn(timeout.from_err()
.and_then(move |_| {
let p2p_conf = p2p::P2PConfig::default();
let addr = SocketAddr::new(p2p_conf.host, p2p_conf.port);
let socket = TcpStream::connect(&addr, &phandle).map_err(|e| p2p::Error::Connection(e));
socket.and_then(move |socket| {
Peer::connect(socket,
p2p::UNKNOWN,
Difficulty::one(),
my_addr,
&p2p::handshake::Handshake::new())
})
.and_then(move |(socket, peer)| {
rhandle.spawn(peer.run(socket, net_adapter.clone()).map_err(|e| {
panic!("Client run failed: {:?}", e);
}));
peer.send_ping().unwrap();
timeout_send.from_err().map(|_| peer)
})
.and_then(|peer| {
let (sent, recv) = peer.transmitted_bytes();
assert!(sent > 0);
assert!(recv > 0);
Ok(())
})
.and_then(|_| {
assert!(server.peer_count() > 0);
server.stop();
Ok(())
})
})
.map_err(|e| {
panic!("Client connection failed: {:?}", e);
}));
evtlp.run(run_server).unwrap();
evtlp.run(run_server).unwrap();
}