pass stopped into consume so we can halt txhashset mid download (#3157)

This commit is contained in:
Antioch Peverell 2019-12-05 12:14:41 +00:00 committed by GitHub
parent 0b21ee607a
commit 8b8f0a0abd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 24 additions and 4 deletions

View file

@ -45,7 +45,12 @@ const BODY_IO_TIMEOUT: Duration = Duration::from_millis(60000);
/// A trait to be implemented in order to receive messages from the
/// connection. Allows providing an optional response.
pub trait MessageHandler: Send + 'static {
fn consume<'a>(&self, msg: Message<'a>, tracker: Arc<Tracker>) -> Result<Option<Msg>, Error>;
fn consume<'a>(
&self,
msg: Message<'a>,
stopped: Arc<AtomicBool>,
tracker: Arc<Tracker>,
) -> Result<Option<Msg>, Error>;
}
// Macro to simplify the boilerplate around I/O and Grin error handling
@ -294,7 +299,11 @@ where
// Increase received bytes counter
reader_tracker.inc_received(MsgHeader::LEN as u64 + msg.header.msg_len);
let resp_msg = try_break!(handler.consume(msg, reader_tracker.clone()));
let resp_msg = try_break!(handler.consume(
msg,
reader_stopped.clone(),
reader_tracker.clone()
));
if let Some(Some(resp_msg)) = resp_msg {
try_break!(conn_handle.send(resp_msg));
}

View file

@ -52,7 +52,12 @@ impl Protocol {
}
impl MessageHandler for Protocol {
fn consume(&self, mut msg: Message, tracker: Arc<Tracker>) -> Result<Option<Msg>, Error> {
fn consume(
&self,
mut msg: Message,
stopped: Arc<AtomicBool>,
tracker: Arc<Tracker>,
) -> Result<Option<Msg>, Error> {
let adapter = &self.adapter;
// If we received a msg from a banned peer then log and drop it.
@ -395,7 +400,13 @@ impl MessageHandler for Protocol {
}
// Increase received bytes quietly (without affecting the counters).
// Otherwise we risk banning a peer as "abusive".
tracker.inc_quiet_received(size as u64)
tracker.inc_quiet_received(size as u64);
// check the close channel
if stopped.load(Ordering::Relaxed) {
debug!("stopping txhashset download early");
return Err(Error::ConnectionClose);
}
}
debug!(
"handle_payload: txhashset archive: {}/{} ... DONE",