diff --git a/p2p/src/conn.rs b/p2p/src/conn.rs index a0c1cdf55..55a90a737 100644 --- a/p2p/src/conn.rs +++ b/p2p/src/conn.rs @@ -31,6 +31,7 @@ use crate::util::{RateCounter, RwLock}; use std::io::{self, Read, Write}; use std::net::{Shutdown, TcpStream}; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::RecvTimeoutError; use std::sync::{mpsc, Arc}; use std::time::Duration; use std::{ @@ -362,13 +363,21 @@ where loop { let maybe_data = retry_send.or_else(|_| send_rx.recv_timeout(CHANNEL_TIMEOUT)); retry_send = Err(()); - if let Ok(data) = maybe_data { - let written = - try_break!(write_message(&mut writer, &data, writer_tracker.clone())); - if written.is_none() { - retry_send = Ok(data); + match maybe_data { + Ok(data) => { + let written = + try_break!(write_message(&mut writer, &data, writer_tracker.clone())); + if written.is_none() { + retry_send = Ok(data); + } } + Err(RecvTimeoutError::Disconnected) => { + debug!("peer_write: mpsc channel disconnected during recv_timeout"); + break; + } + Err(RecvTimeoutError::Timeout) => {} } + // check the close channel if stopped.load(Ordering::Relaxed) { break; @@ -382,6 +391,7 @@ where .map(|a| a.to_string()) .unwrap_or_else(|_| "?".to_owned()) ); + let _ = writer.shutdown(Shutdown::Both); })?; Ok((reader_thread, writer_thread)) }