mirror of
https://github.com/mimblewimble/grin.git
synced 2025-01-20 19:11:08 +03:00
Use io::Read/Write interface instead of AsyncRead/AsyncWrite + Finish Tests (#74)
This commit is contained in:
parent
15ea8da34a
commit
60705eff76
1 changed files with 60 additions and 41 deletions
|
@ -28,7 +28,7 @@ pub struct ThrottledReader<R: AsyncRead> {
|
|||
/// Max Bytes per second
|
||||
max: u32,
|
||||
/// Stores a count of last request and last request time
|
||||
allowed: isize,
|
||||
allowed: usize,
|
||||
last_check: Instant,
|
||||
}
|
||||
|
||||
|
@ -39,7 +39,7 @@ impl<R: AsyncRead> ThrottledReader<R> {
|
|||
ThrottledReader {
|
||||
reader: reader,
|
||||
max: max,
|
||||
allowed: max as isize,
|
||||
allowed: max as usize,
|
||||
last_check: Instant::now(),
|
||||
}
|
||||
}
|
||||
|
@ -65,43 +65,35 @@ impl<R: AsyncRead> ThrottledReader<R> {
|
|||
|
||||
impl<R: AsyncRead> io::Read for ThrottledReader<R> {
|
||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
self.reader.read(buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: AsyncRead> AsyncRead for ThrottledReader<R> {
|
||||
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
|
||||
self.reader.prepare_uninitialized_buffer(buf)
|
||||
}
|
||||
|
||||
fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
|
||||
// Check passed Time
|
||||
let time_passed = self.last_check.elapsed();
|
||||
self.last_check = Instant::now();
|
||||
self.allowed += time_passed.as_secs() as isize * self.max as isize;
|
||||
self.allowed += time_passed.as_secs() as usize * self.max as usize;
|
||||
|
||||
// Throttle
|
||||
if self.allowed > self.max as isize {
|
||||
self.allowed = self.max as isize;
|
||||
if self.allowed > self.max as usize {
|
||||
self.allowed = self.max as usize;
|
||||
}
|
||||
|
||||
// Check if Allowed
|
||||
if self.allowed < 1 {
|
||||
return Ok(Async::NotReady);
|
||||
return Err(io::Error::new(io::ErrorKind::WouldBlock, "Reached Allowed Read Limit"))
|
||||
}
|
||||
|
||||
// Since we can't limit the scope that is read,
|
||||
// we use a signed `allowed` counter in case n > allowed
|
||||
let res = self.reader.read_buf(buf);
|
||||
// Read Max Allowed
|
||||
let buf = if buf.len() > self.allowed { &mut buf[0..self.allowed]} else { buf };
|
||||
let res = self.reader.read(buf);
|
||||
|
||||
// Decrement Allowed amount written
|
||||
if let Ok(Async::Ready(n)) = res {
|
||||
self.allowed = self.allowed.saturating_sub(n as isize);
|
||||
if let Ok(n) = res {
|
||||
self.allowed -= n;
|
||||
}
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: AsyncRead> AsyncRead for ThrottledReader<R> { }
|
||||
|
||||
/// A Rate Limited Writer
|
||||
#[derive(Debug)]
|
||||
pub struct ThrottledWriter<W: AsyncWrite> {
|
||||
|
@ -146,22 +138,6 @@ impl<W: AsyncWrite> ThrottledWriter<W> {
|
|||
|
||||
impl<W: AsyncWrite> io::Write for ThrottledWriter<W> {
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
self.writer.write(buf)
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
self.writer.flush()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsyncWrite> AsyncWrite for ThrottledWriter<T> {
|
||||
fn shutdown(&mut self) -> Poll<(), io::Error> {
|
||||
self.writer.shutdown()
|
||||
}
|
||||
|
||||
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error>
|
||||
where Self: Sized
|
||||
{
|
||||
// Check passed Time
|
||||
let time_passed = self.last_check.elapsed();
|
||||
self.last_check = Instant::now();
|
||||
|
@ -174,17 +150,60 @@ impl<T: AsyncWrite> AsyncWrite for ThrottledWriter<T> {
|
|||
|
||||
// Check if Allowed
|
||||
if self.allowed < 1 {
|
||||
return Ok(Async::NotReady);
|
||||
return Err(io::Error::new(io::ErrorKind::WouldBlock, "Reached Allowed Write Limit"))
|
||||
}
|
||||
|
||||
// Write max allowed
|
||||
let ref mut lbuf = buf.by_ref().take(self.allowed);
|
||||
let res = self.writer.write_buf(lbuf);
|
||||
let buf = if buf.len() > self.allowed { &buf[0..self.allowed]} else { buf };
|
||||
let res = self.writer.write(buf);
|
||||
|
||||
// Decrement Allowed amount written
|
||||
if let Ok(Async::Ready(n)) = res {
|
||||
if let Ok(n) = res {
|
||||
self.allowed -= n;
|
||||
}
|
||||
res
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
self.writer.flush()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsyncWrite> AsyncWrite for ThrottledWriter<T> {
|
||||
fn shutdown(&mut self) -> Poll<(), io::Error> {
|
||||
self.writer.shutdown()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use std::io::Cursor;
|
||||
|
||||
#[test]
|
||||
fn should_throttle_write() {
|
||||
let buf = vec![0; 64];
|
||||
let mut t_buf = ThrottledWriter::new(Cursor::new(buf), 8);
|
||||
|
||||
for _ in 0..16 {
|
||||
let _ = t_buf.write_buf(&mut Cursor::new(vec![1; 8]));
|
||||
}
|
||||
|
||||
let cursor = t_buf.into_inner();
|
||||
assert_eq!(cursor.position(), 8);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn should_throttle_read() {
|
||||
let buf = vec![1; 64];
|
||||
let mut t_buf = ThrottledReader::new(Cursor::new(buf), 8);
|
||||
|
||||
let mut dst = Cursor::new(vec![0; 64]);
|
||||
|
||||
for _ in 0..16 {
|
||||
let _ = t_buf.read_buf(&mut dst);
|
||||
}
|
||||
|
||||
assert_eq!(dst.position(), 8);
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue