diff --git a/p2p/src/types.rs b/p2p/src/types.rs index be2407f54..108448135 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -66,6 +66,7 @@ pub enum Error { peer: Hash, }, Send(String), + PeerException, } impl From<ser::Error> for Error { diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index 234da16a8..bf50c4164 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -24,7 +24,7 @@ use std::thread; use std::time::Instant; use chain::{self, ChainAdapter, Options, Tip}; -use common::types::{ChainValidationMode, ServerConfig, SyncState, SyncStatus}; +use common::types::{self, ChainValidationMode, ServerConfig, SyncState, SyncStatus}; use core::{core, global}; use core::core::block::BlockHeader; use core::core::hash::{Hash, Hashed}; @@ -361,7 +361,9 @@ impl p2p::ChainAdapter for NetToChainAdapter { w(&self.chain).txhashset_write(h, txhashset_data, self.sync_state.as_ref()) { error!(LOGGER, "Failed to save txhashset archive: {}", e); - !e.is_bad_data() + let is_good_data = !e.is_bad_data(); + self.sync_state.set_sync_error(types::Error::Chain(e)); + is_good_data } else { info!(LOGGER, "Received valid txhashset data for {}.", h); true diff --git a/servers/src/common/types.rs b/servers/src/common/types.rs index a14d3c51b..73a1b9421 100644 --- a/servers/src/common/types.rs +++ b/servers/src/common/types.rs @@ -15,7 +15,7 @@ //! Server types use std::convert::From; -use std::sync::RwLock; +use std::sync::{Arc, RwLock}; use api; use chain; @@ -296,6 +296,7 @@ pub enum SyncStatus { /// Current sync state. Encapsulates the current SyncStatus. pub struct SyncState { current: RwLock<SyncStatus>, + sync_error: Arc<RwLock<Option<Error>>>, } impl SyncState { @@ -303,6 +304,7 @@ impl SyncState { pub fn new() -> SyncState { SyncState { current: RwLock::new(SyncStatus::Initial), + sync_error: Arc::new(RwLock::new(None)), } } @@ -332,6 +334,22 @@ impl SyncState { *status = new_status; } + + /// Communicate sync error + pub fn set_sync_error(&self, error: Error){ + *self.sync_error.write().unwrap() = Some(error); + } + + /// Get sync error + pub fn sync_error(&self) -> Arc<RwLock<Option<Error>>> { + Arc::clone(&self.sync_error) + } + + /// Clear sync error + pub fn clear_sync_error(&self){ + *self.sync_error.write().unwrap() = None; + } + } impl chain::TxHashsetWriteStatus for SyncState { diff --git a/servers/src/grin/sync.rs b/servers/src/grin/sync.rs index c5e99f5e8..101dca5bc 100644 --- a/servers/src/grin/sync.rs +++ b/servers/src/grin/sync.rs @@ -120,21 +120,51 @@ pub fn run_sync( // run the header sync every 10s if si.header_sync_due(&header_head) { + header_sync(peers.clone(), chain.clone()); + let status = sync_state.status(); match status{ SyncStatus::TxHashsetDownload => (), _ => { - header_sync(peers.clone(), chain.clone()); sync_state.update(SyncStatus::HeaderSync{current_height: header_head.height, highest_height: si.highest_height}); } }; } if fast_sync_enabled { - // run fast sync if applicable, every 5 min - if header_head.height == si.highest_height && si.fast_sync_due() { - fast_sync(peers.clone(), chain.clone(), &header_head); - sync_state.update(SyncStatus::TxHashsetDownload); + + // check sync error + { + let mut sync_error_need_clear = false; + { + let clone = sync_state.sync_error(); + if let Some(ref sync_error) = *clone.read().unwrap() { + error!(LOGGER, "fast_sync: error = {:?}. restart fast sync", sync_error); + si.fast_sync_reset(); + sync_error_need_clear = true; + } + drop(clone); + } + if sync_error_need_clear { + sync_state.clear_sync_error(); + } + } + + // run fast sync if applicable, normally only run one-time, except restart in error + if header_head.height == si.highest_height { + let (go,download_timeout) = si.fast_sync_due(); + + if go { + if let Err(e) = fast_sync(peers.clone(), chain.clone(), &header_head) { + sync_state.set_sync_error(Error::P2P(e)); + } + sync_state.update(SyncStatus::TxHashsetDownload); + } + + if SyncStatus::TxHashsetDownload == sync_state.status() && download_timeout { + error!(LOGGER, "fast_sync: TxHashsetDownload status timeout in 10 minutes!"); + sync_state.set_sync_error(Error::P2P(p2p::Error::Timeout)); + } } } else { // run the body_sync every 5s @@ -254,7 +284,7 @@ fn header_sync(peers: Arc<Peers>, chain: Arc<chain::Chain>) { } } -fn fast_sync(peers: Arc<Peers>, chain: Arc<chain::Chain>, header_head: &chain::Tip) { +fn fast_sync(peers: Arc<Peers>, chain: Arc<chain::Chain>, header_head: &chain::Tip) -> Result<(), p2p::Error> { let horizon = global::cut_through_horizon() as u64; if let Some(peer) = peers.most_work_peer() { @@ -275,10 +305,18 @@ fn fast_sync(peers: Arc<Peers>, chain: Arc<chain::Chain>, header_head: &chain::T txhashset_head.height, bhash ); - p.send_txhashset_request(txhashset_head.height, bhash) - .unwrap(); + if let Err(e) = p.send_txhashset_request(txhashset_head.height, bhash) { + error!( + LOGGER, + "fast_sync: send_txhashset_request err! {:?}", + e + ); + return Err(e); + } + return Ok(()); } } + Err(p2p::Error::PeerException) } /// Request some block headers from a peer to advance us. @@ -442,15 +480,28 @@ impl SyncInfo { } // For now this is a one-time thing (it can be slow) at initial startup. - fn fast_sync_due(&mut self) -> bool { - if let None = self.prev_fast_sync { - let now = Utc::now(); - self.prev_fast_sync = Some(now); - true - } else { - false + fn fast_sync_due(&mut self) -> (bool,bool) { + let now = Utc::now(); + let mut download_timeout = false; + + match self.prev_fast_sync { + None => { + self.prev_fast_sync = Some(now); + (true,download_timeout) + } + Some(prev) => { + if now - prev > Duration::minutes(10) { + download_timeout = true; + } + (false,download_timeout) + } } } + + fn fast_sync_reset(&mut self) { + self.prev_fast_sync = None; + } + } #[cfg(test)]