From 7f029cb4c03f0ae626069e23bffcf6c7c6ac2dc5 Mon Sep 17 00:00:00 2001 From: Ignotus Peverell Date: Wed, 1 Feb 2017 19:05:17 -0800 Subject: [PATCH] Added a Connection wrapper to handle timeouts when we want information from a peer in a request/response style. --- p2p/Cargo.toml | 1 + p2p/src/conn.rs | 172 +++++++++++++++++++++++++++++++++++--------- p2p/src/lib.rs | 1 + p2p/src/msg.rs | 2 +- p2p/src/protocol.rs | 101 ++++++++++++++------------ 5 files changed, 196 insertions(+), 81 deletions(-) diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 4d949e47f..35172649b 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -11,6 +11,7 @@ log = "^0.3" net2 = "0.2.0" rand = "^0.3" tokio-core="^0.1.1" +tokio-timer="^0.1.0" time = "^0.1" enum_primitive = "^0.1.0" num = "^0.1.36" diff --git a/p2p/src/conn.rs b/p2p/src/conn.rs index 383871df9..0ff95a01a 100644 --- a/p2p/src/conn.rs +++ b/p2p/src/conn.rs @@ -12,11 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Provides a connection wrapper that handles the lower level tasks in sending or +//! Provides a connection wrapper that handles the lower level tasks in sending +//! or //! receiving data from the TCP socket, as well as dealing with timeouts. use std::iter; +use std::ops::Deref; use std::sync::{Mutex, Arc}; +use std::time::{Instant, Duration}; use futures; use futures::{Stream, Future}; @@ -24,30 +27,42 @@ use futures::stream; use futures::sync::mpsc::{Sender, UnboundedSender, UnboundedReceiver}; use tokio_core::io::{Io, WriteHalf, ReadHalf, write_all, read_exact}; use tokio_core::net::TcpStream; +use tokio_timer::{Timer, TimerError}; +use core::core::hash::{Hash, ZERO_HASH}; use core::ser; use msg::*; -/// Handler to provide to the connection, will be called back anytime a message is -/// received. The provided sender can be use to immediately send back another -/// message. +/// Handler to provide to the connection, will be called back anytime a message +/// is received. The provided sender can be use to immediately send back +/// another message. pub trait Handler: Sync + Send { - /// Handle function to implement to process incoming messages. A sender to reply - /// immediately as well as the message header and its unparsed body are provided. - fn handle(&self, sender: UnboundedSender>, header: MsgHeader, body: Vec) -> Result<(), ser::Error>; + /// Handle function to implement to process incoming messages. A sender to + /// reply immediately as well as the message header and its unparsed body + /// are provided. + fn handle(&self, + sender: UnboundedSender>, + header: MsgHeader, + body: Vec) + -> Result, ser::Error>; } impl Handler for F - where F: Fn(UnboundedSender>, MsgHeader, Vec) -> Result<(), ser::Error>, F: Sync + Send { - - fn handle(&self, sender: UnboundedSender>, header: MsgHeader, body: Vec) -> Result<(), ser::Error> { - self(sender, header, body) - } + where F: Fn(UnboundedSender>, MsgHeader, Vec) -> Result, ser::Error>, + F: Sync + Send +{ + fn handle(&self, + sender: UnboundedSender>, + header: MsgHeader, + body: Vec) + -> Result, ser::Error> { + self(sender, header, body) + } } -/// A higher level connection wrapping the TcpStream. Maintains the amount of data -/// transmitted and deals with the low-level task of sending and receiving -/// data, parsing message headers and timeouts. +/// A higher level connection wrapping the TcpStream. Maintains the amount of +/// data transmitted and deals with the low-level task of sending and +/// receiving data, parsing message headers and timeouts. pub struct Connection { // Channel to push bytes to the remote peer outbound_chan: UnboundedSender>, @@ -66,11 +81,14 @@ pub struct Connection { } impl Connection { - - /// Start listening on the provided connection and wraps it. Does not hang the - /// current thread, instead just returns a future and the Connection itself. - pub fn listen(conn: TcpStream, handler: F) -> (Connection, Box>) - where F: Handler + 'static { + /// Start listening on the provided connection and wraps it. Does not hang + /// the current thread, instead just returns a future and the Connection + /// itself. + pub fn listen(conn: TcpStream, + handler: F) + -> (Connection, Box>) + where F: Handler + 'static + { let (reader, writer) = conn.split(); @@ -97,11 +115,12 @@ impl Connection { let write_msg = me.write_msg(rx, writer).map(|_| ()); // select between our different futures and return them - let fut = Box::new(close_conn.select(read_msg.select(write_msg).map(|_| ()).map_err(|(e, _)| e)) - .map(|_| ()) - .map_err(|(e, _)| e)); + let fut = + Box::new(close_conn.select(read_msg.select(write_msg).map(|_| ()).map_err(|(e, _)| e)) + .map(|_| ()) + .map_err(|(e, _)| e)); - (me, fut) + (me, fut) } /// Prepares the future that gets message data produced by our system and @@ -128,11 +147,12 @@ impl Connection { /// Prepares the future reading from the peer connection, parsing each /// message and forwarding them appropriately based on their type fn read_msg(&self, - sender: UnboundedSender>, - reader: ReadHalf, - handler: F) - -> Box, Error = ser::Error>> - where F: Handler + 'static { + sender: UnboundedSender>, + reader: ReadHalf, + handler: F) + -> Box, Error = ser::Error>> + where F: Handler + 'static + { // infinite iterator stream so we repeat the message reading logic until the // peer is stopped @@ -140,12 +160,12 @@ impl Connection { // setup the reading future, getting messages from the peer and processing them let recv_bytes = self.received_bytes.clone(); - let handler = Arc::new(handler); + let handler = Arc::new(handler); let read_msg = iter.fold(reader, move |reader, _| { let recv_bytes = recv_bytes.clone(); - let handler = handler.clone(); - let sender_inner = sender.clone(); + let handler = handler.clone(); + let sender_inner = sender.clone(); // first read the message header read_exact(reader, vec![0u8; HEADER_LEN as usize]) @@ -166,7 +186,7 @@ impl Connection { *recv_bytes += header.serialized_len() + header.msg_len; // and handle the different message types - let msg_type = header.msg_type; + let msg_type = header.msg_type; if let Err(e) = handler.handle(sender_inner.clone(), header, buf) { debug!("Invalid {:?} message: {}", msg_type, e); } @@ -197,3 +217,89 @@ impl Connection { (sent, recv) } } + +/// Connection wrapper that handles a request/response oriented interaction with +/// a timeout. +pub struct TimeoutConnection { + underlying: Connection, + + expected_responses: Arc>>, +} + +impl TimeoutConnection { + /// Same as Connection + pub fn listen(conn: TcpStream, + handler: F) + -> (TimeoutConnection, Box>) + where F: Handler + 'static + { + + let expects = Arc::new(Mutex::new(vec![])); + + // Decorates the handler to remove the "subscription" from the expected + // responses. We got our replies, so no timeout should occur. + let exp = expects.clone(); + let (conn, fut) = Connection::listen(conn, move |sender, header: MsgHeader, data| { + let msg_type = header.msg_type; + let recv_h = try!(handler.handle(sender, header, data)); + + let mut expects = exp.lock().unwrap(); + let filtered = expects.iter() + .filter(|&&(typ, h, _)| msg_type != typ || recv_h.is_some() && recv_h.unwrap() != h) + .map(|&x| x) + .collect::>(); + *expects = filtered; + + Ok(recv_h) + }); + + // Registers a timer with the event loop to regularly check for timeouts. + let exp = expects.clone(); + let timer = Timer::default() + .interval(Duration::new(2, 0)) + .fold((), move |_, _| { + let exp = exp.lock().unwrap(); + for &(_, _, t) in exp.deref() { + if Instant::now() - t > Duration::new(2, 0) { + return Err(TimerError::TooLong); + } + } + Ok(()) + }) + .map_err(|_| ser::Error::CorruptedData); + + let me = TimeoutConnection { + underlying: conn, + expected_responses: expects, + }; + (me, Box::new(fut.join(timer).map(|_| ()))) + } + + /// Sends a request and registers a timer on the provided message type and + /// optionally the hash of the sent data. + pub fn send_request(&self, + t: Type, + body: &ser::Writeable, + expect_h: Option) + -> Result<(), ser::Error> { + let sent = try!(self.underlying.send_msg(t, body)); + + let mut expects = self.expected_responses.lock().unwrap(); + if let Some(h) = expect_h { + expects.push((t, h, Instant::now())); + } else { + expects.push((t, ZERO_HASH, Instant::now())); + } + Ok(()) + } + + /// Same as Connection + pub fn send_msg(&self, t: Type, body: &ser::Writeable) -> Result<(), ser::Error> { + self.underlying.send_msg(t, body) + } + + /// Same as Connection + pub fn transmitted_bytes(&self) -> (u64, u64) { + self.underlying.transmitted_bytes() + } +} diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index f061b8e07..499263269 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -32,6 +32,7 @@ extern crate log; extern crate futures; #[macro_use] extern crate tokio_core; +extern crate tokio_timer; extern crate rand; extern crate time; extern crate num; diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index 25339cd6e..094898af6 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -44,7 +44,7 @@ pub enum ErrCodes { /// Types of messages enum_from_primitive! { - #[derive(Debug, Clone, Copy)] + #[derive(Debug, Clone, Copy, PartialEq)] pub enum Type { Error, Hand, diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index 3af8f7757..6d47f86ea 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -23,24 +23,24 @@ use tokio_core::net::TcpStream; use core::core; use core::core::hash::Hash; use core::ser; -use conn::Connection; +use conn::TimeoutConnection; use msg::*; use types::*; use util::OneTime; pub struct ProtocolV1 { - conn: OneTime, + conn: OneTime, - expected_responses: Mutex>, + expected_responses: Mutex>, } impl ProtocolV1 { - pub fn new() -> ProtocolV1 { - ProtocolV1 { - conn: OneTime::new(), - expected_responses: Mutex::new(vec![]), - } - } + pub fn new() -> ProtocolV1 { + ProtocolV1 { + conn: OneTime::new(), + expected_responses: Mutex::new(vec![]), + } + } } impl Protocol for ProtocolV1 { @@ -50,25 +50,25 @@ impl Protocol for ProtocolV1 { adapter: Arc) -> Box> { - let (conn, listener) = Connection::listen(conn, move |sender, header, data| { - let adapt = adapter.as_ref(); - handle_payload(adapt, sender, header, data) - }); + let (conn, listener) = TimeoutConnection::listen(conn, move |sender, header, data| { + let adapt = adapter.as_ref(); + handle_payload(adapt, sender, header, data) + }); - self.conn.init(conn); + self.conn.init(conn); - listener + listener } /// Bytes sent and received. fn transmitted_bytes(&self) -> (u64, u64) { - self.conn.borrow().transmitted_bytes() + self.conn.borrow().transmitted_bytes() } /// Sends a ping message to the remote peer. Will panic if handle has never /// been called on this protocol. fn send_ping(&self) -> Result<(), ser::Error> { - self.send_msg(Type::Ping, &Empty {}) + self.send_request(Type::Ping, &Empty {}, None) } /// Serializes and sends a block to our remote peer @@ -88,46 +88,53 @@ impl Protocol for ProtocolV1 { } impl ProtocolV1 { - fn send_msg(&self, t: Type, body: &ser::Writeable) -> Result<(), ser::Error> { - self.conn.borrow().send_msg(t, body) - } + self.conn.borrow().send_msg(t, body) + } - fn send_request(&self, t: Type, body: &ser::Writeable, expect_resp: Option<(Type, Hash)>) -> Result<(), ser::Error> { - let sent = self.send_msg(t, body); + fn send_request(&self, + t: Type, + body: &ser::Writeable, + expect_resp: Option<(Type, Hash)>) + -> Result<(), ser::Error> { + let sent = self.send_msg(t, body); if let Err(e) = sent { warn!("Couldn't send message to remote peer: {}", e); } else if let Some(exp) = expect_resp { - let mut expects = self.expected_responses.lock().unwrap(); - expects.push(exp); - } - Ok(()) - } + let mut expects = self.expected_responses.lock().unwrap(); + expects.push(exp); + } + Ok(()) + } } fn handle_payload(adapter: &NetAdapter, sender: UnboundedSender>, header: MsgHeader, buf: Vec) - -> Result<(), ser::Error> { - match header.msg_type { - Type::Ping => { - let data = try!(ser::ser_vec(&MsgHeader::new(Type::Pong, 0))); - sender.send(data); - } - Type::Pong => {} - Type::Transaction => { - let tx = try!(ser::deserialize::(&mut &buf[..])); - adapter.transaction_received(tx); - } - Type::Block => { - let b = try!(ser::deserialize::(&mut &buf[..])); - adapter.block_received(b); - } - _ => { - debug!("unknown message type {:?}", header.msg_type); - } - }; - Ok(()) + -> Result, ser::Error> { + match header.msg_type { + Type::Ping => { + let data = try!(ser::ser_vec(&MsgHeader::new(Type::Pong, 0))); + sender.send(data); + Ok(None) + } + Type::Pong => Ok(None), + Type::Transaction => { + let tx = try!(ser::deserialize::(&mut &buf[..])); + adapter.transaction_received(tx); + Ok(None) + } + Type::Block => { + let b = try!(ser::deserialize::(&mut &buf[..])); + let bh = b.hash(); + adapter.block_received(b); + Ok(Some(bh)) + } + _ => { + debug!("unknown message type {:?}", header.msg_type); + Ok(None) + } + } }