diff --git a/chain/src/error.rs b/chain/src/error.rs index 613dd4277..b35c88a3f 100644 --- a/chain/src/error.rs +++ b/chain/src/error.rs @@ -157,6 +157,10 @@ pub enum ErrorKind { /// PIBD segment related error #[fail(display = "Segment error")] SegmentError(segment::SegmentError), + /// We've decided to halt the PIBD process due to lack of supporting peers or + /// otherwise failing to progress for a certain amount of time + #[fail(display = "Aborting PIBD error")] + AbortingPIBDError, /// The segmenter is associated to a different block header #[fail(display = "Segmenter header mismatch")] SegmenterHeaderMismatch, diff --git a/chain/src/types.rs b/chain/src/types.rs index 0a56b04bc..602843e98 100644 --- a/chain/src/types.rs +++ b/chain/src/types.rs @@ -15,6 +15,7 @@ //! Base types that the block chain pipeline requires. use chrono::prelude::{DateTime, Utc}; +use chrono::Duration; use crate::core::core::hash::{Hash, Hashed, ZERO_HASH}; use crate::core::core::{pmmr, Block, BlockHeader, HeaderVersion, SegmentTypeIdentifier}; @@ -176,7 +177,6 @@ pub struct SyncState { /// but it's currently the only place that makes the info /// available where it will be needed (both in the adapter /// and the sync loop) - /// TODO: Better struct for this, perhaps hash identifiers requested_pibd_segments: RwLock>, } @@ -291,11 +291,15 @@ impl SyncState { .retain(|i| &i.identifier != id); } - /// Remove segments with request timestamps less cutoff time - pub fn remove_stale_pibd_requests(&self, cutoff_time: DateTime) { - self.requested_pibd_segments - .write() - .retain(|i| i.request_time < cutoff_time); + /// Remove segments with request timestamps less than cutoff time + pub fn remove_stale_pibd_requests(&self, timeout_seconds: i64) { + let cutoff_time = Utc::now() - Duration::seconds(timeout_seconds); + self.requested_pibd_segments.write().retain(|i| { + if i.request_time <= cutoff_time { + debug!("Removing + retrying PIBD request after timeout: {:?}", i) + }; + i.request_time > cutoff_time + }); } /// Check whether segment is in request list diff --git a/servers/src/grin/sync/state_sync.rs b/servers/src/grin/sync/state_sync.rs index a580cf42b..08358b52c 100644 --- a/servers/src/grin/sync/state_sync.rs +++ b/servers/src/grin/sync/state_sync.rs @@ -36,6 +36,9 @@ pub struct StateSync { prev_state_sync: Option>, state_sync_peer: Option>, + + pibd_aborted: bool, + earliest_zero_pibd_peer_time: Option>, } impl StateSync { @@ -50,9 +53,22 @@ impl StateSync { chain, prev_state_sync: None, state_sync_peer: None, + pibd_aborted: false, + earliest_zero_pibd_peer_time: None, } } + /// Flag to abort PIBD process + pub fn set_pibd_aborted(&mut self) { + self.pibd_aborted = true; + } + + /// Record earliest time at which we had no suitable + /// peers for continuing PIBD + pub fn set_earliest_zero_pibd_peer_time(&mut self, t: Option>) { + self.earliest_zero_pibd_peer_time = t; + } + /// Check whether state sync should run and triggers a state download when /// it's time (we have all headers). Returns true as long as state sync /// needs monitoring, false when it's either done or turned off. @@ -81,6 +97,8 @@ impl StateSync { let using_pibd = if let SyncStatus::TxHashsetPibd { aborted: true, .. } = self.sync_state.status() { false + } else if self.pibd_aborted { + false } else { // Only on testing chains for now if global::get_chain_type() != global::ChainTypes::Mainnet { @@ -240,10 +258,9 @@ impl StateSync { let archive_header = self.chain.txhashset_archive_header_header_only().unwrap(); let desegmenter = self.chain.desegmenter(&archive_header).unwrap(); - // Remove stale requests + // Remove stale requests, if we haven't recieved the segment within a minute re-request // TODO: verify timing - let timeout_time = Utc::now() + Duration::seconds(15); - self.sync_state.remove_stale_pibd_requests(timeout_time); + self.sync_state.remove_stale_pibd_requests(60); // Apply segments... TODO: figure out how this should be called, might // need to be a separate thread. @@ -280,22 +297,50 @@ impl StateSync { continue; } - let peers_iter = || { - self.peers - .iter() + // TODO: urg + let peers = self.peers.clone(); + + // First, get max difficulty or greater peers + let peers_iter = || peers.iter().connected(); + let max_diff = peers_iter().max_difficulty().unwrap_or(Difficulty::zero()); + let peers_iter_max = || peers_iter().with_difficulty(|x| x >= max_diff); + + // Then, further filter by PIBD capabilities + let peers_iter_pibd = || { + peers_iter_max() .with_capabilities(Capabilities::PIBD_HIST) .connected() }; - // Filter peers further based on max difficulty. - let max_diff = peers_iter().max_difficulty().unwrap_or(Difficulty::zero()); - let peers_iter = || peers_iter().with_difficulty(|x| x >= max_diff); + // If there are no suitable PIBD-Enabled peers, AND there hasn't been one for a minute, + // abort PIBD and fall back to txhashset download + // Waiting a minute helps ensures that the cancellation isn't simply due to a single non-PIBD enabled + // peer having the max difficulty + if peers_iter_pibd().count() == 0 { + if let None = self.earliest_zero_pibd_peer_time { + self.set_earliest_zero_pibd_peer_time(Some(Utc::now())); + } + if self.earliest_zero_pibd_peer_time.unwrap() + Duration::seconds(60) < Utc::now() { + // random abort test + info!("No PIBD-enabled max-difficulty peers for the past minute - Aborting PIBD and falling back to TxHashset.zip download"); + self.sync_state + .update_pibd_progress(true, true, 0, 1, &archive_header); + self.sync_state + .set_sync_error(chain::ErrorKind::AbortingPIBDError.into()); + self.set_pibd_aborted(); + return false; + } + } else { + self.set_earliest_zero_pibd_peer_time(None) + } + // Choose a random "most work" peer, preferring outbound if at all possible. - let peer = peers_iter().outbound().choose_random().or_else(|| { + let peer = peers_iter_pibd().outbound().choose_random().or_else(|| { warn!("no suitable outbound peer for pibd message, considering inbound"); - peers_iter().inbound().choose_random() + peers_iter_pibd().inbound().choose_random() }); trace!("Chosen peer is {:?}", peer); + if let Some(p) = peer { // add to list of segments that are being tracked self.sync_state.add_pibd_segment(seg_id);