diff --git a/p2p/src/conn.rs b/p2p/src/conn.rs index 1402d1640..d3dabbd1f 100644 --- a/p2p/src/conn.rs +++ b/p2p/src/conn.rs @@ -38,7 +38,9 @@ use std::{ thread::{self, JoinHandle}, }; -const IO_TIMEOUT: Duration = Duration::from_millis(10000); +const HEADER_IO_TIMEOUT: Duration = Duration::from_millis(2000); +const CHANNEL_TIMEOUT: Duration = Duration::from_millis(1000); +const BODY_IO_TIMEOUT: Duration = Duration::from_millis(60000); /// A trait to be implemented in order to receive messages from the /// connection. Allows providing an optional response. @@ -46,8 +48,7 @@ pub trait MessageHandler: Send + 'static { fn consume<'a>(&self, msg: Message<'a>, tracker: Arc) -> Result, Error>; } -// Macro to simplify the boilerplate around async I/O error handling, -// especially with WouldBlock kind of errors. +// Macro to simplify the boilerplate around I/O and Grin error handling macro_rules! try_break { ($inner:expr) => { match $inner { @@ -70,6 +71,15 @@ macro_rules! try_break { }; } +macro_rules! try_header { + ($res:expr, $conn: expr) => {{ + $conn + .set_read_timeout(Some(HEADER_IO_TIMEOUT)) + .expect("set timeout"); + try_break!($res) + }}; +} + /// A message as received by the connection. Provides access to the message /// header lazily consumes the message body, handling its deserialization. pub struct Message<'a> { @@ -217,13 +227,6 @@ where { let (send_tx, send_rx) = mpsc::sync_channel(SEND_CHANNEL_CAP); - stream - .set_read_timeout(Some(IO_TIMEOUT)) - .expect("can't set read timeout"); - stream - .set_write_timeout(Some(IO_TIMEOUT)) - .expect("can't set read timeout"); - let stopped = Arc::new(AtomicBool::new(false)); let conn_handle = ConnHandle { @@ -275,8 +278,11 @@ where .spawn(move || { loop { // check the read end - match try_break!(read_header(&mut reader, version)) { + match try_header!(read_header(&mut reader, version), &mut reader) { Some(MsgHeaderWrapper::Known(header)) => { + reader + .set_read_timeout(Some(BODY_IO_TIMEOUT)) + .expect("set timeout"); let msg = Message::from_header(header, &mut reader, version); trace!( @@ -326,8 +332,11 @@ where .name("peer_write".to_string()) .spawn(move || { let mut retry_send = Err(()); + writer + .set_write_timeout(Some(BODY_IO_TIMEOUT)) + .expect("set timeout"); loop { - let maybe_data = retry_send.or_else(|_| send_rx.recv_timeout(IO_TIMEOUT)); + let maybe_data = retry_send.or_else(|_| send_rx.recv_timeout(CHANNEL_TIMEOUT)); retry_send = Err(()); if let Ok(data) = maybe_data { let written =