Set longer timeout for msg body read/write operations (#3145)

* Set longer timeout for msg body read operations

* Introduce a channel timeout
This commit is contained in:
hashmap 2019-12-02 13:00:30 +01:00 committed by Antioch Peverell
parent 7f7d51a748
commit cbc17ff5f7

View file

@ -38,7 +38,9 @@ use std::{
thread::{self, JoinHandle}, thread::{self, JoinHandle},
}; };
const IO_TIMEOUT: Duration = Duration::from_millis(10000); const HEADER_IO_TIMEOUT: Duration = Duration::from_millis(2000);
const CHANNEL_TIMEOUT: Duration = Duration::from_millis(1000);
const BODY_IO_TIMEOUT: Duration = Duration::from_millis(60000);
/// A trait to be implemented in order to receive messages from the /// A trait to be implemented in order to receive messages from the
/// connection. Allows providing an optional response. /// connection. Allows providing an optional response.
@ -46,8 +48,7 @@ pub trait MessageHandler: Send + 'static {
fn consume<'a>(&self, msg: Message<'a>, tracker: Arc<Tracker>) -> Result<Option<Msg>, Error>; fn consume<'a>(&self, msg: Message<'a>, tracker: Arc<Tracker>) -> Result<Option<Msg>, Error>;
} }
// Macro to simplify the boilerplate around async I/O error handling, // Macro to simplify the boilerplate around I/O and Grin error handling
// especially with WouldBlock kind of errors.
macro_rules! try_break { macro_rules! try_break {
($inner:expr) => { ($inner:expr) => {
match $inner { match $inner {
@ -70,6 +71,15 @@ macro_rules! try_break {
}; };
} }
macro_rules! try_header {
($res:expr, $conn: expr) => {{
$conn
.set_read_timeout(Some(HEADER_IO_TIMEOUT))
.expect("set timeout");
try_break!($res)
}};
}
/// A message as received by the connection. Provides access to the message /// A message as received by the connection. Provides access to the message
/// header lazily consumes the message body, handling its deserialization. /// header lazily consumes the message body, handling its deserialization.
pub struct Message<'a> { pub struct Message<'a> {
@ -217,13 +227,6 @@ where
{ {
let (send_tx, send_rx) = mpsc::sync_channel(SEND_CHANNEL_CAP); let (send_tx, send_rx) = mpsc::sync_channel(SEND_CHANNEL_CAP);
stream
.set_read_timeout(Some(IO_TIMEOUT))
.expect("can't set read timeout");
stream
.set_write_timeout(Some(IO_TIMEOUT))
.expect("can't set read timeout");
let stopped = Arc::new(AtomicBool::new(false)); let stopped = Arc::new(AtomicBool::new(false));
let conn_handle = ConnHandle { let conn_handle = ConnHandle {
@ -275,8 +278,11 @@ where
.spawn(move || { .spawn(move || {
loop { loop {
// check the read end // check the read end
match try_break!(read_header(&mut reader, version)) { match try_header!(read_header(&mut reader, version), &mut reader) {
Some(MsgHeaderWrapper::Known(header)) => { Some(MsgHeaderWrapper::Known(header)) => {
reader
.set_read_timeout(Some(BODY_IO_TIMEOUT))
.expect("set timeout");
let msg = Message::from_header(header, &mut reader, version); let msg = Message::from_header(header, &mut reader, version);
trace!( trace!(
@ -326,8 +332,11 @@ where
.name("peer_write".to_string()) .name("peer_write".to_string())
.spawn(move || { .spawn(move || {
let mut retry_send = Err(()); let mut retry_send = Err(());
writer
.set_write_timeout(Some(BODY_IO_TIMEOUT))
.expect("set timeout");
loop { loop {
let maybe_data = retry_send.or_else(|_| send_rx.recv_timeout(IO_TIMEOUT)); let maybe_data = retry_send.or_else(|_| send_rx.recv_timeout(CHANNEL_TIMEOUT));
retry_send = Err(()); retry_send = Err(());
if let Ok(data) = maybe_data { if let Ok(data) = maybe_data {
let written = let written =