remove the error_channel and simplify how we close peer connections (#2796)

This commit is contained in:
Antioch Peverell 2019-05-03 15:35:43 +01:00 committed by GitHub
parent d3b4526e03
commit 6c54c90101
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 34 additions and 84 deletions

View file

@ -47,7 +47,7 @@ pub trait MessageHandler: Send + 'static {
// Macro to simplify the boilerplate around async I/O error handling, // Macro to simplify the boilerplate around async I/O error handling,
// especially with WouldBlock kind of errors. // especially with WouldBlock kind of errors.
macro_rules! try_break { macro_rules! try_break {
($chan:ident, $inner:expr) => { ($inner:expr) => {
match $inner { match $inner {
Ok(v) => Some(v), Ok(v) => Some(v),
Err(Error::Connection(ref e)) if e.kind() == io::ErrorKind::WouldBlock => None, Err(Error::Connection(ref e)) if e.kind() == io::ErrorKind::WouldBlock => None,
@ -55,8 +55,8 @@ macro_rules! try_break {
| Err(Error::Chain(_)) | Err(Error::Chain(_))
| Err(Error::Internal) | Err(Error::Internal)
| Err(Error::NoDandelionRelay) => None, | Err(Error::NoDandelionRelay) => None,
Err(e) => { Err(ref e) => {
let _ = $chan.send(e); debug!("try_break: exit the loop: {:?}", e);
break; break;
} }
} }
@ -171,8 +171,6 @@ pub struct Tracker {
pub send_channel: mpsc::SyncSender<Vec<u8>>, pub send_channel: mpsc::SyncSender<Vec<u8>>,
/// Channel to close the connection /// Channel to close the connection
pub close_channel: mpsc::Sender<()>, pub close_channel: mpsc::Sender<()>,
/// Channel to check for errors on the connection
pub error_channel: mpsc::Receiver<Error>,
} }
impl Tracker { impl Tracker {
@ -201,7 +199,6 @@ where
{ {
let (send_tx, send_rx) = mpsc::sync_channel(SEND_CHANNEL_CAP); let (send_tx, send_rx) = mpsc::sync_channel(SEND_CHANNEL_CAP);
let (close_tx, close_rx) = mpsc::channel(); let (close_tx, close_rx) = mpsc::channel();
let (error_tx, error_rx) = mpsc::channel();
// Counter of number of bytes received // Counter of number of bytes received
let received_bytes = Arc::new(RwLock::new(RateCounter::new())); let received_bytes = Arc::new(RwLock::new(RateCounter::new()));
@ -215,7 +212,6 @@ where
stream, stream,
handler, handler,
send_rx, send_rx,
error_tx,
close_rx, close_rx,
received_bytes.clone(), received_bytes.clone(),
sent_bytes.clone(), sent_bytes.clone(),
@ -226,7 +222,6 @@ where
received_bytes: received_bytes.clone(), received_bytes: received_bytes.clone(),
send_channel: send_tx, send_channel: send_tx,
close_channel: close_tx, close_channel: close_tx,
error_channel: error_rx,
} }
} }
@ -234,7 +229,6 @@ fn poll<H>(
conn: TcpStream, conn: TcpStream,
handler: H, handler: H,
send_rx: mpsc::Receiver<Vec<u8>>, send_rx: mpsc::Receiver<Vec<u8>>,
error_tx: mpsc::Sender<Error>,
close_rx: mpsc::Receiver<()>, close_rx: mpsc::Receiver<()>,
received_bytes: Arc<RwLock<RateCounter>>, received_bytes: Arc<RwLock<RateCounter>>,
sent_bytes: Arc<RwLock<RateCounter>>, sent_bytes: Arc<RwLock<RateCounter>>,
@ -252,7 +246,7 @@ fn poll<H>(
let mut retry_send = Err(()); let mut retry_send = Err(());
loop { loop {
// check the read end // check the read end
if let Some(h) = try_break!(error_tx, read_header(&mut reader, None)) { if let Some(h) = try_break!(read_header(&mut reader, None)) {
let msg = Message::from_header(h, &mut reader); let msg = Message::from_header(h, &mut reader);
trace!( trace!(
@ -269,9 +263,9 @@ fn poll<H>(
} }
if let Some(Some(resp)) = if let Some(Some(resp)) =
try_break!(error_tx, handler.consume(msg, &mut writer, received)) try_break!(handler.consume(msg, &mut writer, received))
{ {
try_break!(error_tx, resp.write(sent_bytes.clone())); try_break!(resp.write(sent_bytes.clone()));
} }
} }
@ -279,8 +273,7 @@ fn poll<H>(
let maybe_data = retry_send.or_else(|_| send_rx.try_recv()); let maybe_data = retry_send.or_else(|_| send_rx.try_recv());
retry_send = Err(()); retry_send = Err(());
if let Ok(data) = maybe_data { if let Ok(data) = maybe_data {
let written = let written = try_break!(writer.write_all(&data[..]).map_err(&From::from));
try_break!(error_tx, writer.write_all(&data[..]).map_err(&From::from));
if written.is_none() { if written.is_none() {
retry_send = Ok(data); retry_send = Ok(data);
} }
@ -288,17 +281,18 @@ fn poll<H>(
// check the close channel // check the close channel
if let Ok(_) = close_rx.try_recv() { if let Ok(_) = close_rx.try_recv() {
debug!(
"Connection close with {} initiated by us",
conn.peer_addr()
.map(|a| a.to_string())
.unwrap_or("?".to_owned())
);
break; break;
} }
thread::sleep(sleep_time); thread::sleep(sleep_time);
} }
debug!(
"Shutting down connection with {}",
conn.peer_addr()
.map(|a| a.to_string())
.unwrap_or("?".to_owned())
);
let _ = conn.shutdown(Shutdown::Both); let _ = conn.shutdown(Shutdown::Both);
}); });
} }

View file

@ -42,11 +42,7 @@ const MAX_PEER_MSG_PER_MIN: u64 = 500;
/// For example: 'Disconnected' state here could still be 'Healthy' and could reconnect in next loop. /// For example: 'Disconnected' state here could still be 'Healthy' and could reconnect in next loop.
enum State { enum State {
Connected, Connected,
Disconnected,
Banned, Banned,
// Banned from Peers side, by ban_peer().
// This could happen when error in block (or compact block) received, header(s) received,
// or txhashset received.
} }
pub struct Peer { pub struct Peer {
@ -173,9 +169,12 @@ impl Peer {
false false
} }
/// Whether this peer is still connected. /// Whether this peer is currently connected.
pub fn is_connected(&self) -> bool { pub fn is_connected(&self) -> bool {
self.check_connection() if self.connection.is_none() {
return false;
}
State::Connected == *self.state.read()
} }
/// Whether this peer has been banned. /// Whether this peer has been banned.
@ -409,61 +408,9 @@ impl Peer {
/// Stops the peer, closing its connection /// Stops the peer, closing its connection
pub fn stop(&self) { pub fn stop(&self) {
if let Some(conn) = self.connection.as_ref() { if let Some(conn) = self.connection.as_ref() {
stop_with_connection(&conn.lock()); let _ = conn.lock().close_channel.send(());
} }
} }
fn check_connection(&self) -> bool {
let connection = match self.connection.as_ref() {
Some(conn) => conn.lock(),
None => return false,
};
match connection.error_channel.try_recv() {
Ok(Error::Serialization(e)) => {
let need_stop = {
let mut state = self.state.write();
if State::Banned != *state {
*state = State::Disconnected;
true
} else {
false
}
};
if need_stop {
debug!(
"Client {} corrupted, will disconnect ({:?}).",
self.info.addr, e
);
stop_with_connection(&connection);
}
false
}
Ok(e) => {
let need_stop = {
let mut state = self.state.write();
if State::Disconnected != *state {
*state = State::Disconnected;
true
} else {
false
}
};
if need_stop {
debug!("Client {} connection lost: {:?}", self.info.addr, e);
stop_with_connection(&connection);
}
false
}
Err(_) => {
let state = self.state.read();
State::Connected == *state
}
}
}
}
fn stop_with_connection(connection: &conn::Tracker) {
let _ = connection.close_channel.send(());
} }
/// Adapter implementation that forwards everything to an underlying adapter /// Adapter implementation that forwards everything to an underlying adapter

View file

@ -225,6 +225,7 @@ impl Peers {
}; };
peer.set_banned(); peer.set_banned();
peer.stop(); peer.stop();
self.peers.write().remove(&peer.info.addr);
} }
} }
@ -257,7 +258,14 @@ impl Peers {
match inner(&p) { match inner(&p) {
Ok(true) => count += 1, Ok(true) => count += 1,
Ok(false) => (), Ok(false) => (),
Err(e) => debug!("Error sending {} to peer: {:?}", obj_name, e), Err(e) => {
debug!(
"Error sending {:?} to peer {:?}: {:?}",
obj_name, &p.info.addr, e
);
p.stop();
self.peers.write().remove(&p.info.addr);
}
} }
if count >= num_peers { if count >= num_peers {
@ -319,10 +327,11 @@ impl Peers {
/// Ping all our connected peers. Always automatically expects a pong back /// Ping all our connected peers. Always automatically expects a pong back
/// or disconnects. This acts as a liveness test. /// or disconnects. This acts as a liveness test.
pub fn check_all(&self, total_difficulty: Difficulty, height: u64) { pub fn check_all(&self, total_difficulty: Difficulty, height: u64) {
let peers_map = self.peers.read(); for p in self.connected_peers().iter() {
for p in peers_map.values() { if let Err(e) = p.send_ping(total_difficulty, height) {
if p.is_connected() { debug!("Error pinging peer {:?}: {:?}", &p.info.addr, e);
let _ = p.send_ping(total_difficulty, height); p.stop();
self.peers.write().remove(&p.info.addr);
} }
} }
} }