From 6c54c901016a2a937e4f9ff94418aab9409a8b61 Mon Sep 17 00:00:00 2001
From: Antioch Peverell <apeverell@protonmail.com>
Date: Fri, 3 May 2019 15:35:43 +0100
Subject: [PATCH] remove the error_channel and simplify how we close peer
 connections (#2796)

---
 p2p/src/conn.rs  | 34 +++++++++++--------------
 p2p/src/peer.rs  | 65 +++++-------------------------------------------
 p2p/src/peers.rs | 19 ++++++++++----
 3 files changed, 34 insertions(+), 84 deletions(-)

diff --git a/p2p/src/conn.rs b/p2p/src/conn.rs
index f105c4db6..8a0177cfe 100644
--- a/p2p/src/conn.rs
+++ b/p2p/src/conn.rs
@@ -47,7 +47,7 @@ pub trait MessageHandler: Send + 'static {
 // Macro to simplify the boilerplate around async I/O error handling,
 // especially with WouldBlock kind of errors.
 macro_rules! try_break {
-	($chan:ident, $inner:expr) => {
+	($inner:expr) => {
 		match $inner {
 			Ok(v) => Some(v),
 			Err(Error::Connection(ref e)) if e.kind() == io::ErrorKind::WouldBlock => None,
@@ -55,8 +55,8 @@ macro_rules! try_break {
 			| Err(Error::Chain(_))
 			| Err(Error::Internal)
 			| Err(Error::NoDandelionRelay) => None,
-			Err(e) => {
-				let _ = $chan.send(e);
+			Err(ref e) => {
+				debug!("try_break: exit the loop: {:?}", e);
 				break;
 				}
 			}
@@ -171,8 +171,6 @@ pub struct Tracker {
 	pub send_channel: mpsc::SyncSender<Vec<u8>>,
 	/// Channel to close the connection
 	pub close_channel: mpsc::Sender<()>,
-	/// Channel to check for errors on the connection
-	pub error_channel: mpsc::Receiver<Error>,
 }
 
 impl Tracker {
@@ -201,7 +199,6 @@ where
 {
 	let (send_tx, send_rx) = mpsc::sync_channel(SEND_CHANNEL_CAP);
 	let (close_tx, close_rx) = mpsc::channel();
-	let (error_tx, error_rx) = mpsc::channel();
 
 	// Counter of number of bytes received
 	let received_bytes = Arc::new(RwLock::new(RateCounter::new()));
@@ -215,7 +212,6 @@ where
 		stream,
 		handler,
 		send_rx,
-		error_tx,
 		close_rx,
 		received_bytes.clone(),
 		sent_bytes.clone(),
@@ -226,7 +222,6 @@ where
 		received_bytes: received_bytes.clone(),
 		send_channel: send_tx,
 		close_channel: close_tx,
-		error_channel: error_rx,
 	}
 }
 
@@ -234,7 +229,6 @@ fn poll<H>(
 	conn: TcpStream,
 	handler: H,
 	send_rx: mpsc::Receiver<Vec<u8>>,
-	error_tx: mpsc::Sender<Error>,
 	close_rx: mpsc::Receiver<()>,
 	received_bytes: Arc<RwLock<RateCounter>>,
 	sent_bytes: Arc<RwLock<RateCounter>>,
@@ -252,7 +246,7 @@ fn poll<H>(
 			let mut retry_send = Err(());
 			loop {
 				// check the read end
-				if let Some(h) = try_break!(error_tx, read_header(&mut reader, None)) {
+				if let Some(h) = try_break!(read_header(&mut reader, None)) {
 					let msg = Message::from_header(h, &mut reader);
 
 					trace!(
@@ -269,9 +263,9 @@ fn poll<H>(
 					}
 
 					if let Some(Some(resp)) =
-						try_break!(error_tx, handler.consume(msg, &mut writer, received))
+						try_break!(handler.consume(msg, &mut writer, received))
 					{
-						try_break!(error_tx, resp.write(sent_bytes.clone()));
+						try_break!(resp.write(sent_bytes.clone()));
 					}
 				}
 
@@ -279,8 +273,7 @@ fn poll<H>(
 				let maybe_data = retry_send.or_else(|_| send_rx.try_recv());
 				retry_send = Err(());
 				if let Ok(data) = maybe_data {
-					let written =
-						try_break!(error_tx, writer.write_all(&data[..]).map_err(&From::from));
+					let written = try_break!(writer.write_all(&data[..]).map_err(&From::from));
 					if written.is_none() {
 						retry_send = Ok(data);
 					}
@@ -288,17 +281,18 @@ fn poll<H>(
 
 				// check the close channel
 				if let Ok(_) = close_rx.try_recv() {
-					debug!(
-						"Connection close with {} initiated by us",
-						conn.peer_addr()
-							.map(|a| a.to_string())
-							.unwrap_or("?".to_owned())
-					);
 					break;
 				}
 
 				thread::sleep(sleep_time);
 			}
+
+			debug!(
+				"Shutting down connection with {}",
+				conn.peer_addr()
+					.map(|a| a.to_string())
+					.unwrap_or("?".to_owned())
+			);
 			let _ = conn.shutdown(Shutdown::Both);
 		});
 }
diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs
index 86b28591d..2a302db01 100644
--- a/p2p/src/peer.rs
+++ b/p2p/src/peer.rs
@@ -42,11 +42,7 @@ const MAX_PEER_MSG_PER_MIN: u64 = 500;
 ///   For example: 'Disconnected' state here could still be 'Healthy' and could reconnect in next loop.
 enum State {
 	Connected,
-	Disconnected,
 	Banned,
-	// Banned from Peers side, by ban_peer().
-	//   This could happen when error in block (or compact block) received, header(s) received,
-	//   or txhashset received.
 }
 
 pub struct Peer {
@@ -173,9 +169,12 @@ impl Peer {
 		false
 	}
 
-	/// Whether this peer is still connected.
+	/// Whether this peer is currently connected.
 	pub fn is_connected(&self) -> bool {
-		self.check_connection()
+		if self.connection.is_none() {
+			return false;
+		}
+		State::Connected == *self.state.read()
 	}
 
 	/// Whether this peer has been banned.
@@ -409,61 +408,9 @@ impl Peer {
 	/// Stops the peer, closing its connection
 	pub fn stop(&self) {
 		if let Some(conn) = self.connection.as_ref() {
-			stop_with_connection(&conn.lock());
+			let _ = conn.lock().close_channel.send(());
 		}
 	}
-
-	fn check_connection(&self) -> bool {
-		let connection = match self.connection.as_ref() {
-			Some(conn) => conn.lock(),
-			None => return false,
-		};
-		match connection.error_channel.try_recv() {
-			Ok(Error::Serialization(e)) => {
-				let need_stop = {
-					let mut state = self.state.write();
-					if State::Banned != *state {
-						*state = State::Disconnected;
-						true
-					} else {
-						false
-					}
-				};
-				if need_stop {
-					debug!(
-						"Client {} corrupted, will disconnect ({:?}).",
-						self.info.addr, e
-					);
-					stop_with_connection(&connection);
-				}
-				false
-			}
-			Ok(e) => {
-				let need_stop = {
-					let mut state = self.state.write();
-					if State::Disconnected != *state {
-						*state = State::Disconnected;
-						true
-					} else {
-						false
-					}
-				};
-				if need_stop {
-					debug!("Client {} connection lost: {:?}", self.info.addr, e);
-					stop_with_connection(&connection);
-				}
-				false
-			}
-			Err(_) => {
-				let state = self.state.read();
-				State::Connected == *state
-			}
-		}
-	}
-}
-
-fn stop_with_connection(connection: &conn::Tracker) {
-	let _ = connection.close_channel.send(());
 }
 
 /// Adapter implementation that forwards everything to an underlying adapter
diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs
index 3865f8c45..3fe18f708 100644
--- a/p2p/src/peers.rs
+++ b/p2p/src/peers.rs
@@ -225,6 +225,7 @@ impl Peers {
 			};
 			peer.set_banned();
 			peer.stop();
+			self.peers.write().remove(&peer.info.addr);
 		}
 	}
 
@@ -257,7 +258,14 @@ impl Peers {
 			match inner(&p) {
 				Ok(true) => count += 1,
 				Ok(false) => (),
-				Err(e) => debug!("Error sending {} to peer: {:?}", obj_name, e),
+				Err(e) => {
+					debug!(
+						"Error sending {:?} to peer {:?}: {:?}",
+						obj_name, &p.info.addr, e
+					);
+					p.stop();
+					self.peers.write().remove(&p.info.addr);
+				}
 			}
 
 			if count >= num_peers {
@@ -319,10 +327,11 @@ impl Peers {
 	/// Ping all our connected peers. Always automatically expects a pong back
 	/// or disconnects. This acts as a liveness test.
 	pub fn check_all(&self, total_difficulty: Difficulty, height: u64) {
-		let peers_map = self.peers.read();
-		for p in peers_map.values() {
-			if p.is_connected() {
-				let _ = p.send_ping(total_difficulty, height);
+		for p in self.connected_peers().iter() {
+			if let Err(e) = p.send_ping(total_difficulty, height) {
+				debug!("Error pinging peer {:?}: {:?}", &p.info.addr, e);
+				p.stop();
+				self.peers.write().remove(&p.info.addr);
 			}
 		}
 	}