handle mpsc channel disconnect from peer_write thread (#3241)

* handle mpsc channel disconnect from peer_write thread
also actually shutdown the writer when we say we are going to

* fix - we need to break here
This commit is contained in:
Antioch Peverell 2020-02-25 19:15:27 +00:00 committed by GitHub
parent 6bdeefd27e
commit 6855241a56
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -31,6 +31,7 @@ use crate::util::{RateCounter, RwLock};
use std::io::{self, Read, Write}; use std::io::{self, Read, Write};
use std::net::{Shutdown, TcpStream}; use std::net::{Shutdown, TcpStream};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::RecvTimeoutError;
use std::sync::{mpsc, Arc}; use std::sync::{mpsc, Arc};
use std::time::Duration; use std::time::Duration;
use std::{ use std::{
@ -362,13 +363,21 @@ where
loop { loop {
let maybe_data = retry_send.or_else(|_| send_rx.recv_timeout(CHANNEL_TIMEOUT)); let maybe_data = retry_send.or_else(|_| send_rx.recv_timeout(CHANNEL_TIMEOUT));
retry_send = Err(()); retry_send = Err(());
if let Ok(data) = maybe_data { match maybe_data {
let written = Ok(data) => {
try_break!(write_message(&mut writer, &data, writer_tracker.clone())); let written =
if written.is_none() { try_break!(write_message(&mut writer, &data, writer_tracker.clone()));
retry_send = Ok(data); if written.is_none() {
retry_send = Ok(data);
}
} }
Err(RecvTimeoutError::Disconnected) => {
debug!("peer_write: mpsc channel disconnected during recv_timeout");
break;
}
Err(RecvTimeoutError::Timeout) => {}
} }
// check the close channel // check the close channel
if stopped.load(Ordering::Relaxed) { if stopped.load(Ordering::Relaxed) {
break; break;
@ -382,6 +391,7 @@ where
.map(|a| a.to_string()) .map(|a| a.to_string())
.unwrap_or_else(|_| "?".to_owned()) .unwrap_or_else(|_| "?".to_owned())
); );
let _ = writer.shutdown(Shutdown::Both);
})?; })?;
Ok((reader_thread, writer_thread)) Ok((reader_thread, writer_thread))
} }