Added a Connection wrapper to handle timeouts when we want information from a peer in a request/response style.

This commit is contained in:
Ignotus Peverell 2017-02-01 19:05:17 -08:00
parent 572c1951e1
commit 7f029cb4c0
No known key found for this signature in database
GPG key ID: 99CD25F39F8F8211
5 changed files with 196 additions and 81 deletions

View file

@ -11,6 +11,7 @@ log = "^0.3"
net2 = "0.2.0" net2 = "0.2.0"
rand = "^0.3" rand = "^0.3"
tokio-core="^0.1.1" tokio-core="^0.1.1"
tokio-timer="^0.1.0"
time = "^0.1" time = "^0.1"
enum_primitive = "^0.1.0" enum_primitive = "^0.1.0"
num = "^0.1.36" num = "^0.1.36"

View file

@ -12,11 +12,14 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
//! Provides a connection wrapper that handles the lower level tasks in sending or //! Provides a connection wrapper that handles the lower level tasks in sending
//! or
//! receiving data from the TCP socket, as well as dealing with timeouts. //! receiving data from the TCP socket, as well as dealing with timeouts.
use std::iter; use std::iter;
use std::ops::Deref;
use std::sync::{Mutex, Arc}; use std::sync::{Mutex, Arc};
use std::time::{Instant, Duration};
use futures; use futures;
use futures::{Stream, Future}; use futures::{Stream, Future};
@ -24,30 +27,42 @@ use futures::stream;
use futures::sync::mpsc::{Sender, UnboundedSender, UnboundedReceiver}; use futures::sync::mpsc::{Sender, UnboundedSender, UnboundedReceiver};
use tokio_core::io::{Io, WriteHalf, ReadHalf, write_all, read_exact}; use tokio_core::io::{Io, WriteHalf, ReadHalf, write_all, read_exact};
use tokio_core::net::TcpStream; use tokio_core::net::TcpStream;
use tokio_timer::{Timer, TimerError};
use core::core::hash::{Hash, ZERO_HASH};
use core::ser; use core::ser;
use msg::*; use msg::*;
/// Handler to provide to the connection, will be called back anytime a message is /// Handler to provide to the connection, will be called back anytime a message
/// received. The provided sender can be use to immediately send back another /// is received. The provided sender can be use to immediately send back
/// message. /// another message.
pub trait Handler: Sync + Send { pub trait Handler: Sync + Send {
/// Handle function to implement to process incoming messages. A sender to reply /// Handle function to implement to process incoming messages. A sender to
/// immediately as well as the message header and its unparsed body are provided. /// reply immediately as well as the message header and its unparsed body
fn handle(&self, sender: UnboundedSender<Vec<u8>>, header: MsgHeader, body: Vec<u8>) -> Result<(), ser::Error>; /// are provided.
fn handle(&self,
sender: UnboundedSender<Vec<u8>>,
header: MsgHeader,
body: Vec<u8>)
-> Result<Option<Hash>, ser::Error>;
} }
impl<F> Handler for F impl<F> Handler for F
where F: Fn(UnboundedSender<Vec<u8>>, MsgHeader, Vec<u8>) -> Result<(), ser::Error>, F: Sync + Send { where F: Fn(UnboundedSender<Vec<u8>>, MsgHeader, Vec<u8>) -> Result<Option<Hash>, ser::Error>,
F: Sync + Send
fn handle(&self, sender: UnboundedSender<Vec<u8>>, header: MsgHeader, body: Vec<u8>) -> Result<(), ser::Error> { {
self(sender, header, body) fn handle(&self,
} sender: UnboundedSender<Vec<u8>>,
header: MsgHeader,
body: Vec<u8>)
-> Result<Option<Hash>, ser::Error> {
self(sender, header, body)
}
} }
/// A higher level connection wrapping the TcpStream. Maintains the amount of data /// A higher level connection wrapping the TcpStream. Maintains the amount of
/// transmitted and deals with the low-level task of sending and receiving /// data transmitted and deals with the low-level task of sending and
/// data, parsing message headers and timeouts. /// receiving data, parsing message headers and timeouts.
pub struct Connection { pub struct Connection {
// Channel to push bytes to the remote peer // Channel to push bytes to the remote peer
outbound_chan: UnboundedSender<Vec<u8>>, outbound_chan: UnboundedSender<Vec<u8>>,
@ -66,11 +81,14 @@ pub struct Connection {
} }
impl Connection { impl Connection {
/// Start listening on the provided connection and wraps it. Does not hang
/// Start listening on the provided connection and wraps it. Does not hang the /// the current thread, instead just returns a future and the Connection
/// current thread, instead just returns a future and the Connection itself. /// itself.
pub fn listen<F>(conn: TcpStream, handler: F) -> (Connection, Box<Future<Item = (), Error = ser::Error>>) pub fn listen<F>(conn: TcpStream,
where F: Handler + 'static { handler: F)
-> (Connection, Box<Future<Item = (), Error = ser::Error>>)
where F: Handler + 'static
{
let (reader, writer) = conn.split(); let (reader, writer) = conn.split();
@ -97,11 +115,12 @@ impl Connection {
let write_msg = me.write_msg(rx, writer).map(|_| ()); let write_msg = me.write_msg(rx, writer).map(|_| ());
// select between our different futures and return them // select between our different futures and return them
let fut = Box::new(close_conn.select(read_msg.select(write_msg).map(|_| ()).map_err(|(e, _)| e)) let fut =
.map(|_| ()) Box::new(close_conn.select(read_msg.select(write_msg).map(|_| ()).map_err(|(e, _)| e))
.map_err(|(e, _)| e)); .map(|_| ())
.map_err(|(e, _)| e));
(me, fut) (me, fut)
} }
/// Prepares the future that gets message data produced by our system and /// Prepares the future that gets message data produced by our system and
@ -128,11 +147,12 @@ impl Connection {
/// Prepares the future reading from the peer connection, parsing each /// Prepares the future reading from the peer connection, parsing each
/// message and forwarding them appropriately based on their type /// message and forwarding them appropriately based on their type
fn read_msg<F>(&self, fn read_msg<F>(&self,
sender: UnboundedSender<Vec<u8>>, sender: UnboundedSender<Vec<u8>>,
reader: ReadHalf<TcpStream>, reader: ReadHalf<TcpStream>,
handler: F) handler: F)
-> Box<Future<Item = ReadHalf<TcpStream>, Error = ser::Error>> -> Box<Future<Item = ReadHalf<TcpStream>, Error = ser::Error>>
where F: Handler + 'static { where F: Handler + 'static
{
// infinite iterator stream so we repeat the message reading logic until the // infinite iterator stream so we repeat the message reading logic until the
// peer is stopped // peer is stopped
@ -140,12 +160,12 @@ impl Connection {
// setup the reading future, getting messages from the peer and processing them // setup the reading future, getting messages from the peer and processing them
let recv_bytes = self.received_bytes.clone(); let recv_bytes = self.received_bytes.clone();
let handler = Arc::new(handler); let handler = Arc::new(handler);
let read_msg = iter.fold(reader, move |reader, _| { let read_msg = iter.fold(reader, move |reader, _| {
let recv_bytes = recv_bytes.clone(); let recv_bytes = recv_bytes.clone();
let handler = handler.clone(); let handler = handler.clone();
let sender_inner = sender.clone(); let sender_inner = sender.clone();
// first read the message header // first read the message header
read_exact(reader, vec![0u8; HEADER_LEN as usize]) read_exact(reader, vec![0u8; HEADER_LEN as usize])
@ -166,7 +186,7 @@ impl Connection {
*recv_bytes += header.serialized_len() + header.msg_len; *recv_bytes += header.serialized_len() + header.msg_len;
// and handle the different message types // and handle the different message types
let msg_type = header.msg_type; let msg_type = header.msg_type;
if let Err(e) = handler.handle(sender_inner.clone(), header, buf) { if let Err(e) = handler.handle(sender_inner.clone(), header, buf) {
debug!("Invalid {:?} message: {}", msg_type, e); debug!("Invalid {:?} message: {}", msg_type, e);
} }
@ -197,3 +217,89 @@ impl Connection {
(sent, recv) (sent, recv)
} }
} }
/// Connection wrapper that handles a request/response oriented interaction with
/// a timeout.
pub struct TimeoutConnection {
underlying: Connection,
expected_responses: Arc<Mutex<Vec<(Type, Hash, Instant)>>>,
}
impl TimeoutConnection {
/// Same as Connection
pub fn listen<F>(conn: TcpStream,
handler: F)
-> (TimeoutConnection, Box<Future<Item = (), Error = ser::Error>>)
where F: Handler + 'static
{
let expects = Arc::new(Mutex::new(vec![]));
// Decorates the handler to remove the "subscription" from the expected
// responses. We got our replies, so no timeout should occur.
let exp = expects.clone();
let (conn, fut) = Connection::listen(conn, move |sender, header: MsgHeader, data| {
let msg_type = header.msg_type;
let recv_h = try!(handler.handle(sender, header, data));
let mut expects = exp.lock().unwrap();
let filtered = expects.iter()
.filter(|&&(typ, h, _)| msg_type != typ || recv_h.is_some() && recv_h.unwrap() != h)
.map(|&x| x)
.collect::<Vec<_>>();
*expects = filtered;
Ok(recv_h)
});
// Registers a timer with the event loop to regularly check for timeouts.
let exp = expects.clone();
let timer = Timer::default()
.interval(Duration::new(2, 0))
.fold((), move |_, _| {
let exp = exp.lock().unwrap();
for &(_, _, t) in exp.deref() {
if Instant::now() - t > Duration::new(2, 0) {
return Err(TimerError::TooLong);
}
}
Ok(())
})
.map_err(|_| ser::Error::CorruptedData);
let me = TimeoutConnection {
underlying: conn,
expected_responses: expects,
};
(me, Box::new(fut.join(timer).map(|_| ())))
}
/// Sends a request and registers a timer on the provided message type and
/// optionally the hash of the sent data.
pub fn send_request(&self,
t: Type,
body: &ser::Writeable,
expect_h: Option<Hash>)
-> Result<(), ser::Error> {
let sent = try!(self.underlying.send_msg(t, body));
let mut expects = self.expected_responses.lock().unwrap();
if let Some(h) = expect_h {
expects.push((t, h, Instant::now()));
} else {
expects.push((t, ZERO_HASH, Instant::now()));
}
Ok(())
}
/// Same as Connection
pub fn send_msg(&self, t: Type, body: &ser::Writeable) -> Result<(), ser::Error> {
self.underlying.send_msg(t, body)
}
/// Same as Connection
pub fn transmitted_bytes(&self) -> (u64, u64) {
self.underlying.transmitted_bytes()
}
}

View file

@ -32,6 +32,7 @@ extern crate log;
extern crate futures; extern crate futures;
#[macro_use] #[macro_use]
extern crate tokio_core; extern crate tokio_core;
extern crate tokio_timer;
extern crate rand; extern crate rand;
extern crate time; extern crate time;
extern crate num; extern crate num;

View file

@ -44,7 +44,7 @@ pub enum ErrCodes {
/// Types of messages /// Types of messages
enum_from_primitive! { enum_from_primitive! {
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy, PartialEq)]
pub enum Type { pub enum Type {
Error, Error,
Hand, Hand,

View file

@ -23,24 +23,24 @@ use tokio_core::net::TcpStream;
use core::core; use core::core;
use core::core::hash::Hash; use core::core::hash::Hash;
use core::ser; use core::ser;
use conn::Connection; use conn::TimeoutConnection;
use msg::*; use msg::*;
use types::*; use types::*;
use util::OneTime; use util::OneTime;
pub struct ProtocolV1 { pub struct ProtocolV1 {
conn: OneTime<Connection>, conn: OneTime<TimeoutConnection>,
expected_responses: Mutex<Vec<(Type, Hash)>>, expected_responses: Mutex<Vec<(Type, Hash)>>,
} }
impl ProtocolV1 { impl ProtocolV1 {
pub fn new() -> ProtocolV1 { pub fn new() -> ProtocolV1 {
ProtocolV1 { ProtocolV1 {
conn: OneTime::new(), conn: OneTime::new(),
expected_responses: Mutex::new(vec![]), expected_responses: Mutex::new(vec![]),
} }
} }
} }
impl Protocol for ProtocolV1 { impl Protocol for ProtocolV1 {
@ -50,25 +50,25 @@ impl Protocol for ProtocolV1 {
adapter: Arc<NetAdapter>) adapter: Arc<NetAdapter>)
-> Box<Future<Item = (), Error = ser::Error>> { -> Box<Future<Item = (), Error = ser::Error>> {
let (conn, listener) = Connection::listen(conn, move |sender, header, data| { let (conn, listener) = TimeoutConnection::listen(conn, move |sender, header, data| {
let adapt = adapter.as_ref(); let adapt = adapter.as_ref();
handle_payload(adapt, sender, header, data) handle_payload(adapt, sender, header, data)
}); });
self.conn.init(conn); self.conn.init(conn);
listener listener
} }
/// Bytes sent and received. /// Bytes sent and received.
fn transmitted_bytes(&self) -> (u64, u64) { fn transmitted_bytes(&self) -> (u64, u64) {
self.conn.borrow().transmitted_bytes() self.conn.borrow().transmitted_bytes()
} }
/// Sends a ping message to the remote peer. Will panic if handle has never /// Sends a ping message to the remote peer. Will panic if handle has never
/// been called on this protocol. /// been called on this protocol.
fn send_ping(&self) -> Result<(), ser::Error> { fn send_ping(&self) -> Result<(), ser::Error> {
self.send_msg(Type::Ping, &Empty {}) self.send_request(Type::Ping, &Empty {}, None)
} }
/// Serializes and sends a block to our remote peer /// Serializes and sends a block to our remote peer
@ -88,46 +88,53 @@ impl Protocol for ProtocolV1 {
} }
impl ProtocolV1 { impl ProtocolV1 {
fn send_msg(&self, t: Type, body: &ser::Writeable) -> Result<(), ser::Error> { fn send_msg(&self, t: Type, body: &ser::Writeable) -> Result<(), ser::Error> {
self.conn.borrow().send_msg(t, body) self.conn.borrow().send_msg(t, body)
} }
fn send_request(&self, t: Type, body: &ser::Writeable, expect_resp: Option<(Type, Hash)>) -> Result<(), ser::Error> { fn send_request(&self,
let sent = self.send_msg(t, body); t: Type,
body: &ser::Writeable,
expect_resp: Option<(Type, Hash)>)
-> Result<(), ser::Error> {
let sent = self.send_msg(t, body);
if let Err(e) = sent { if let Err(e) = sent {
warn!("Couldn't send message to remote peer: {}", e); warn!("Couldn't send message to remote peer: {}", e);
} else if let Some(exp) = expect_resp { } else if let Some(exp) = expect_resp {
let mut expects = self.expected_responses.lock().unwrap(); let mut expects = self.expected_responses.lock().unwrap();
expects.push(exp); expects.push(exp);
} }
Ok(()) Ok(())
} }
} }
fn handle_payload(adapter: &NetAdapter, fn handle_payload(adapter: &NetAdapter,
sender: UnboundedSender<Vec<u8>>, sender: UnboundedSender<Vec<u8>>,
header: MsgHeader, header: MsgHeader,
buf: Vec<u8>) buf: Vec<u8>)
-> Result<(), ser::Error> { -> Result<Option<Hash>, ser::Error> {
match header.msg_type { match header.msg_type {
Type::Ping => { Type::Ping => {
let data = try!(ser::ser_vec(&MsgHeader::new(Type::Pong, 0))); let data = try!(ser::ser_vec(&MsgHeader::new(Type::Pong, 0)));
sender.send(data); sender.send(data);
} Ok(None)
Type::Pong => {} }
Type::Transaction => { Type::Pong => Ok(None),
let tx = try!(ser::deserialize::<core::Transaction>(&mut &buf[..])); Type::Transaction => {
adapter.transaction_received(tx); let tx = try!(ser::deserialize::<core::Transaction>(&mut &buf[..]));
} adapter.transaction_received(tx);
Type::Block => { Ok(None)
let b = try!(ser::deserialize::<core::Block>(&mut &buf[..])); }
adapter.block_received(b); Type::Block => {
} let b = try!(ser::deserialize::<core::Block>(&mut &buf[..]));
_ => { let bh = b.hash();
debug!("unknown message type {:?}", header.msg_type); adapter.block_received(b);
} Ok(Some(bh))
}; }
Ok(()) _ => {
debug!("unknown message type {:?}", header.msg_type);
Ok(None)
}
}
} }