diff --git a/p2p/src/conn.rs b/p2p/src/conn.rs index d3dabbd1f..1e704b0d7 100644 --- a/p2p/src/conn.rs +++ b/p2p/src/conn.rs @@ -45,7 +45,12 @@ const BODY_IO_TIMEOUT: Duration = Duration::from_millis(60000); /// A trait to be implemented in order to receive messages from the /// connection. Allows providing an optional response. pub trait MessageHandler: Send + 'static { - fn consume<'a>(&self, msg: Message<'a>, tracker: Arc) -> Result, Error>; + fn consume<'a>( + &self, + msg: Message<'a>, + stopped: Arc, + tracker: Arc, + ) -> Result, Error>; } // Macro to simplify the boilerplate around I/O and Grin error handling @@ -294,7 +299,11 @@ where // Increase received bytes counter reader_tracker.inc_received(MsgHeader::LEN as u64 + msg.header.msg_len); - let resp_msg = try_break!(handler.consume(msg, reader_tracker.clone())); + let resp_msg = try_break!(handler.consume( + msg, + reader_stopped.clone(), + reader_tracker.clone() + )); if let Some(Some(resp_msg)) = resp_msg { try_break!(conn_handle.send(resp_msg)); } diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index f03bd3408..2d36381e0 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -52,7 +52,12 @@ impl Protocol { } impl MessageHandler for Protocol { - fn consume(&self, mut msg: Message, tracker: Arc) -> Result, Error> { + fn consume( + &self, + mut msg: Message, + stopped: Arc, + tracker: Arc, + ) -> Result, Error> { let adapter = &self.adapter; // If we received a msg from a banned peer then log and drop it. @@ -395,7 +400,13 @@ impl MessageHandler for Protocol { } // Increase received bytes quietly (without affecting the counters). // Otherwise we risk banning a peer as "abusive". - tracker.inc_quiet_received(size as u64) + tracker.inc_quiet_received(size as u64); + + // check the close channel + if stopped.load(Ordering::Relaxed) { + debug!("stopping txhashset download early"); + return Err(Error::ConnectionClose); + } } debug!( "handle_payload: txhashset archive: {}/{} ... DONE",