fix: in case txhashset validation fail, 'TxHashsetDownload' should retry () ()

* fix: in case txhashset validation fail, 'TxHashsetDownload' should retry ()

And add 2 more fixes:
1. handle sending request failure of 'send_txhashset_request';
2. use a 10-mins timeout to handle noresponse failure, in case a peer ignore our txhashset request, or in case a peer fail to send a file for some reason.
This commit is contained in:
Gary Yu 2018-08-08 23:14:54 +08:00 committed by hashmap
parent 328d832bd6
commit 63a4d95df1
4 changed files with 90 additions and 18 deletions
p2p/src
servers/src

View file

@ -66,6 +66,7 @@ pub enum Error {
peer: Hash, peer: Hash,
}, },
Send(String), Send(String),
PeerException,
} }
impl From<ser::Error> for Error { impl From<ser::Error> for Error {

View file

@ -24,7 +24,7 @@ use std::thread;
use std::time::Instant; use std::time::Instant;
use chain::{self, ChainAdapter, Options, Tip}; use chain::{self, ChainAdapter, Options, Tip};
use common::types::{ChainValidationMode, ServerConfig, SyncState, SyncStatus}; use common::types::{self, ChainValidationMode, ServerConfig, SyncState, SyncStatus};
use core::{core, global}; use core::{core, global};
use core::core::block::BlockHeader; use core::core::block::BlockHeader;
use core::core::hash::{Hash, Hashed}; use core::core::hash::{Hash, Hashed};
@ -361,7 +361,9 @@ impl p2p::ChainAdapter for NetToChainAdapter {
w(&self.chain).txhashset_write(h, txhashset_data, self.sync_state.as_ref()) w(&self.chain).txhashset_write(h, txhashset_data, self.sync_state.as_ref())
{ {
error!(LOGGER, "Failed to save txhashset archive: {}", e); error!(LOGGER, "Failed to save txhashset archive: {}", e);
!e.is_bad_data() let is_good_data = !e.is_bad_data();
self.sync_state.set_sync_error(types::Error::Chain(e));
is_good_data
} else { } else {
info!(LOGGER, "Received valid txhashset data for {}.", h); info!(LOGGER, "Received valid txhashset data for {}.", h);
true true

View file

@ -15,7 +15,7 @@
//! Server types //! Server types
use std::convert::From; use std::convert::From;
use std::sync::RwLock; use std::sync::{Arc, RwLock};
use api; use api;
use chain; use chain;
@ -296,6 +296,7 @@ pub enum SyncStatus {
/// Current sync state. Encapsulates the current SyncStatus. /// Current sync state. Encapsulates the current SyncStatus.
pub struct SyncState { pub struct SyncState {
current: RwLock<SyncStatus>, current: RwLock<SyncStatus>,
sync_error: Arc<RwLock<Option<Error>>>,
} }
impl SyncState { impl SyncState {
@ -303,6 +304,7 @@ impl SyncState {
pub fn new() -> SyncState { pub fn new() -> SyncState {
SyncState { SyncState {
current: RwLock::new(SyncStatus::Initial), current: RwLock::new(SyncStatus::Initial),
sync_error: Arc::new(RwLock::new(None)),
} }
} }
@ -332,6 +334,22 @@ impl SyncState {
*status = new_status; *status = new_status;
} }
/// Communicate sync error
pub fn set_sync_error(&self, error: Error){
*self.sync_error.write().unwrap() = Some(error);
}
/// Get sync error
pub fn sync_error(&self) -> Arc<RwLock<Option<Error>>> {
Arc::clone(&self.sync_error)
}
/// Clear sync error
pub fn clear_sync_error(&self){
*self.sync_error.write().unwrap() = None;
}
} }
impl chain::TxHashsetWriteStatus for SyncState { impl chain::TxHashsetWriteStatus for SyncState {

View file

@ -120,21 +120,51 @@ pub fn run_sync(
// run the header sync every 10s // run the header sync every 10s
if si.header_sync_due(&header_head) { if si.header_sync_due(&header_head) {
header_sync(peers.clone(), chain.clone());
let status = sync_state.status(); let status = sync_state.status();
match status{ match status{
SyncStatus::TxHashsetDownload => (), SyncStatus::TxHashsetDownload => (),
_ => { _ => {
header_sync(peers.clone(), chain.clone());
sync_state.update(SyncStatus::HeaderSync{current_height: header_head.height, highest_height: si.highest_height}); sync_state.update(SyncStatus::HeaderSync{current_height: header_head.height, highest_height: si.highest_height});
} }
}; };
} }
if fast_sync_enabled { if fast_sync_enabled {
// run fast sync if applicable, every 5 min
if header_head.height == si.highest_height && si.fast_sync_due() { // check sync error
fast_sync(peers.clone(), chain.clone(), &header_head); {
sync_state.update(SyncStatus::TxHashsetDownload); let mut sync_error_need_clear = false;
{
let clone = sync_state.sync_error();
if let Some(ref sync_error) = *clone.read().unwrap() {
error!(LOGGER, "fast_sync: error = {:?}. restart fast sync", sync_error);
si.fast_sync_reset();
sync_error_need_clear = true;
}
drop(clone);
}
if sync_error_need_clear {
sync_state.clear_sync_error();
}
}
// run fast sync if applicable, normally only run one-time, except restart in error
if header_head.height == si.highest_height {
let (go,download_timeout) = si.fast_sync_due();
if go {
if let Err(e) = fast_sync(peers.clone(), chain.clone(), &header_head) {
sync_state.set_sync_error(Error::P2P(e));
}
sync_state.update(SyncStatus::TxHashsetDownload);
}
if SyncStatus::TxHashsetDownload == sync_state.status() && download_timeout {
error!(LOGGER, "fast_sync: TxHashsetDownload status timeout in 10 minutes!");
sync_state.set_sync_error(Error::P2P(p2p::Error::Timeout));
}
} }
} else { } else {
// run the body_sync every 5s // run the body_sync every 5s
@ -254,7 +284,7 @@ fn header_sync(peers: Arc<Peers>, chain: Arc<chain::Chain>) {
} }
} }
fn fast_sync(peers: Arc<Peers>, chain: Arc<chain::Chain>, header_head: &chain::Tip) { fn fast_sync(peers: Arc<Peers>, chain: Arc<chain::Chain>, header_head: &chain::Tip) -> Result<(), p2p::Error> {
let horizon = global::cut_through_horizon() as u64; let horizon = global::cut_through_horizon() as u64;
if let Some(peer) = peers.most_work_peer() { if let Some(peer) = peers.most_work_peer() {
@ -275,10 +305,18 @@ fn fast_sync(peers: Arc<Peers>, chain: Arc<chain::Chain>, header_head: &chain::T
txhashset_head.height, txhashset_head.height,
bhash bhash
); );
p.send_txhashset_request(txhashset_head.height, bhash) if let Err(e) = p.send_txhashset_request(txhashset_head.height, bhash) {
.unwrap(); error!(
LOGGER,
"fast_sync: send_txhashset_request err! {:?}",
e
);
return Err(e);
}
return Ok(());
} }
} }
Err(p2p::Error::PeerException)
} }
/// Request some block headers from a peer to advance us. /// Request some block headers from a peer to advance us.
@ -442,15 +480,28 @@ impl SyncInfo {
} }
// For now this is a one-time thing (it can be slow) at initial startup. // For now this is a one-time thing (it can be slow) at initial startup.
fn fast_sync_due(&mut self) -> bool { fn fast_sync_due(&mut self) -> (bool,bool) {
if let None = self.prev_fast_sync { let now = Utc::now();
let now = Utc::now(); let mut download_timeout = false;
self.prev_fast_sync = Some(now);
true match self.prev_fast_sync {
} else { None => {
false self.prev_fast_sync = Some(now);
(true,download_timeout)
}
Some(prev) => {
if now - prev > Duration::minutes(10) {
download_timeout = true;
}
(false,download_timeout)
}
} }
} }
fn fast_sync_reset(&mut self) {
self.prev_fast_sync = None;
}
} }
#[cfg(test)] #[cfg(test)]