From 6855241a56305a3b1822de974d40a649c7a5337b Mon Sep 17 00:00:00 2001 From: Antioch Peverell Date: Tue, 25 Feb 2020 19:15:27 +0000 Subject: [PATCH] handle mpsc channel disconnect from peer_write thread (#3241) * handle mpsc channel disconnect from peer_write thread also actually shutdown the writer when we say we are going to * fix - we need to break here --- p2p/src/conn.rs | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/p2p/src/conn.rs b/p2p/src/conn.rs index a0c1cdf55..55a90a737 100644 --- a/p2p/src/conn.rs +++ b/p2p/src/conn.rs @@ -31,6 +31,7 @@ use crate::util::{RateCounter, RwLock}; use std::io::{self, Read, Write}; use std::net::{Shutdown, TcpStream}; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::RecvTimeoutError; use std::sync::{mpsc, Arc}; use std::time::Duration; use std::{ @@ -362,13 +363,21 @@ where loop { let maybe_data = retry_send.or_else(|_| send_rx.recv_timeout(CHANNEL_TIMEOUT)); retry_send = Err(()); - if let Ok(data) = maybe_data { - let written = - try_break!(write_message(&mut writer, &data, writer_tracker.clone())); - if written.is_none() { - retry_send = Ok(data); + match maybe_data { + Ok(data) => { + let written = + try_break!(write_message(&mut writer, &data, writer_tracker.clone())); + if written.is_none() { + retry_send = Ok(data); + } } + Err(RecvTimeoutError::Disconnected) => { + debug!("peer_write: mpsc channel disconnected during recv_timeout"); + break; + } + Err(RecvTimeoutError::Timeout) => {} } + // check the close channel if stopped.load(Ordering::Relaxed) { break; @@ -382,6 +391,7 @@ where .map(|a| a.to_string()) .unwrap_or_else(|_| "?".to_owned()) ); + let _ = writer.shutdown(Shutdown::Both); })?; Ok((reader_thread, writer_thread)) }