TxHashSet Download Improvement (#2984)

* Ban on cannot get block header in txhashset_write

* Rusfmt

* Fix typo

* Missing error handling

* Rustfmt

* Only accept txhashset from corresponding peer

* Switch to AtomicBool instead of RwLock<bool>

* Rustfmt
This commit is contained in:
Quentin Le Sceller 2019-08-01 18:46:06 +02:00 committed by Antioch Peverell
parent 4bd3aa109d
commit f79d05ba53
5 changed files with 63 additions and 21 deletions

View file

@ -869,7 +869,7 @@ impl Chain {
h: Hash, h: Hash,
txhashset_data: File, txhashset_data: File,
status: &dyn TxHashsetWriteStatus, status: &dyn TxHashsetWriteStatus,
) -> Result<(), Error> { ) -> Result<bool, Error> {
status.on_setup(); status.on_setup();
// Initial check whether this txhashset is needed or not // Initial check whether this txhashset is needed or not
@ -879,7 +879,14 @@ impl Chain {
return Err(ErrorKind::InvalidTxHashSet("not needed".to_owned()).into()); return Err(ErrorKind::InvalidTxHashSet("not needed".to_owned()).into());
} }
let header = self.get_block_header(&h)?; let header = match self.get_block_header(&h) {
Ok(header) => header,
Err(_) => {
warn!("txhashset_write: cannot find block header");
// This is a bannable reason
return Ok(true);
}
};
// Write txhashset to sandbox (in the Grin specific tmp dir) // Write txhashset to sandbox (in the Grin specific tmp dir)
let sandbox_dir = self.get_tmp_dir(); let sandbox_dir = self.get_tmp_dir();
@ -977,7 +984,7 @@ impl Chain {
self.check_orphans(header.height + 1); self.check_orphans(header.height + 1);
status.on_done(); status.on_done();
Ok(()) Ok(false)
} }
/// Cleanup old blocks from the db. /// Cleanup old blocks from the db.

View file

@ -18,6 +18,7 @@ use std::fs::File;
use std::io::Read; use std::io::Read;
use std::net::{Shutdown, TcpStream}; use std::net::{Shutdown, TcpStream};
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use crate::chain; use crate::chain;
@ -60,6 +61,8 @@ pub struct Peer {
// because it may be locked by different reasons, so we should wait for that, close // because it may be locked by different reasons, so we should wait for that, close
// mutex can be taken only during shutdown, it happens once // mutex can be taken only during shutdown, it happens once
stop_handle: Mutex<conn::StopHandle>, stop_handle: Mutex<conn::StopHandle>,
// Whether or not we requested a txhashset from this peer
state_sync_requested: Arc<AtomicBool>,
} }
impl fmt::Debug for Peer { impl fmt::Debug for Peer {
@ -72,8 +75,13 @@ impl Peer {
// Only accept and connect can be externally used to build a peer // Only accept and connect can be externally used to build a peer
fn new(info: PeerInfo, conn: TcpStream, adapter: Arc<dyn NetAdapter>) -> std::io::Result<Peer> { fn new(info: PeerInfo, conn: TcpStream, adapter: Arc<dyn NetAdapter>) -> std::io::Result<Peer> {
let state = Arc::new(RwLock::new(State::Connected)); let state = Arc::new(RwLock::new(State::Connected));
let state_sync_requested = Arc::new(AtomicBool::new(false));
let tracking_adapter = TrackingAdapter::new(adapter); let tracking_adapter = TrackingAdapter::new(adapter);
let handler = Protocol::new(Arc::new(tracking_adapter.clone()), info.clone()); let handler = Protocol::new(
Arc::new(tracking_adapter.clone()),
info.clone(),
state_sync_requested.clone(),
);
let tracker = Arc::new(conn::Tracker::new()); let tracker = Arc::new(conn::Tracker::new());
let (sendh, stoph) = conn::listen(conn, info.version, tracker.clone(), handler)?; let (sendh, stoph) = conn::listen(conn, info.version, tracker.clone(), handler)?;
let send_handle = Mutex::new(sendh); let send_handle = Mutex::new(sendh);
@ -85,6 +93,7 @@ impl Peer {
tracker, tracker,
send_handle, send_handle,
stop_handle, stop_handle,
state_sync_requested,
}) })
} }
@ -387,6 +396,7 @@ impl Peer {
"Asking {} for txhashset archive at {} {}.", "Asking {} for txhashset archive at {} {}.",
self.info.addr, height, hash self.info.addr, height, hash
); );
self.state_sync_requested.store(true, Ordering::Relaxed);
self.send( self.send(
&TxHashSetRequest { hash, height }, &TxHashSetRequest { hash, height },
msg::Type::TxHashSetRequest, msg::Type::TxHashSetRequest,

View file

@ -688,15 +688,15 @@ impl ChainAdapter for Peers {
txhashset_data: File, txhashset_data: File,
peer_info: &PeerInfo, peer_info: &PeerInfo,
) -> Result<bool, chain::Error> { ) -> Result<bool, chain::Error> {
if !self.adapter.txhashset_write(h, txhashset_data, peer_info)? { if self.adapter.txhashset_write(h, txhashset_data, peer_info)? {
debug!( debug!(
"Received a bad txhashset data from {}, the peer will be banned", "Received a bad txhashset data from {}, the peer will be banned",
peer_info.addr peer_info.addr
); );
self.ban_peer(peer_info.addr, ReasonForBan::BadTxHashSet); self.ban_peer(peer_info.addr, ReasonForBan::BadTxHashSet);
Ok(false)
} else {
Ok(true) Ok(true)
} else {
Ok(false)
} }
} }

View file

@ -25,6 +25,7 @@ use rand::{thread_rng, Rng};
use std::cmp; use std::cmp;
use std::fs::{self, File, OpenOptions}; use std::fs::{self, File, OpenOptions};
use std::io::{BufWriter, Seek, SeekFrom, Write}; use std::io::{BufWriter, Seek, SeekFrom, Write};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant; use std::time::Instant;
use tempfile::tempfile; use tempfile::tempfile;
@ -32,11 +33,20 @@ use tempfile::tempfile;
pub struct Protocol { pub struct Protocol {
adapter: Arc<dyn NetAdapter>, adapter: Arc<dyn NetAdapter>,
peer_info: PeerInfo, peer_info: PeerInfo,
state_sync_requested: Arc<AtomicBool>,
} }
impl Protocol { impl Protocol {
pub fn new(adapter: Arc<dyn NetAdapter>, peer_info: PeerInfo) -> Protocol { pub fn new(
Protocol { adapter, peer_info } adapter: Arc<dyn NetAdapter>,
peer_info: PeerInfo,
state_sync_requested: Arc<AtomicBool>,
) -> Protocol {
Protocol {
adapter,
peer_info,
state_sync_requested,
}
} }
} }
@ -356,6 +366,12 @@ impl MessageHandler for Protocol {
); );
return Err(Error::BadMessage); return Err(Error::BadMessage);
} }
if !self.state_sync_requested.load(Ordering::Relaxed) {
error!("handle_payload: txhashset archive received but from the wrong peer",);
return Err(Error::BadMessage);
}
// Update the sync state requested status
self.state_sync_requested.store(false, Ordering::Relaxed);
let download_start_time = Utc::now(); let download_start_time = Utc::now();
self.adapter self.adapter
@ -425,7 +441,7 @@ impl MessageHandler for Protocol {
debug!( debug!(
"handle_payload: txhashset archive for {} at {}, DONE. Data Ok: {}", "handle_payload: txhashset archive for {} at {}, DONE. Data Ok: {}",
sm_arch.hash, sm_arch.height, res sm_arch.hash, sm_arch.height, !res
); );
if let Err(e) = fs::remove_file(tmp.clone()) { if let Err(e) = fs::remove_file(tmp.clone()) {

View file

@ -419,22 +419,31 @@ impl p2p::ChainAdapter for NetToChainAdapter {
// check status again after download, in case 2 txhashsets made it somehow // check status again after download, in case 2 txhashsets made it somehow
if let SyncStatus::TxHashsetDownload { .. } = self.sync_state.status() { if let SyncStatus::TxHashsetDownload { .. } = self.sync_state.status() {
} else { } else {
return Ok(true); return Ok(false);
} }
if let Err(e) = self match self
.chain() .chain()
.txhashset_write(h, txhashset_data, self.sync_state.as_ref()) .txhashset_write(h, txhashset_data, self.sync_state.as_ref())
{ {
self.chain().clean_txhashset_sandbox(); Ok(is_bad_data) => {
error!("Failed to save txhashset archive: {}", e); if is_bad_data {
self.chain().clean_txhashset_sandbox();
let is_good_data = !e.is_bad_data(); error!("Failed to save txhashset archive: bad data");
self.sync_state.set_sync_error(e); self.sync_state.set_sync_error(
Ok(is_good_data) chain::ErrorKind::TxHashSetErr("bad txhashset data".to_string()).into(),
} else { );
info!("Received valid txhashset data for {}.", h); } else {
Ok(true) info!("Received valid txhashset data for {}.", h);
}
Ok(is_bad_data)
}
Err(e) => {
self.chain().clean_txhashset_sandbox();
error!("Failed to save txhashset archive: {}", e);
self.sync_state.set_sync_error(e);
Ok(false)
}
} }
} }