mirror of
https://github.com/mimblewimble/grin.git
synced 2025-02-01 17:01:09 +03:00
add pibd abort timeout case (#3704)
This commit is contained in:
parent
eda31ab9e5
commit
aa2a2a98df
3 changed files with 70 additions and 17 deletions
|
@ -157,6 +157,10 @@ pub enum ErrorKind {
|
||||||
/// PIBD segment related error
|
/// PIBD segment related error
|
||||||
#[fail(display = "Segment error")]
|
#[fail(display = "Segment error")]
|
||||||
SegmentError(segment::SegmentError),
|
SegmentError(segment::SegmentError),
|
||||||
|
/// We've decided to halt the PIBD process due to lack of supporting peers or
|
||||||
|
/// otherwise failing to progress for a certain amount of time
|
||||||
|
#[fail(display = "Aborting PIBD error")]
|
||||||
|
AbortingPIBDError,
|
||||||
/// The segmenter is associated to a different block header
|
/// The segmenter is associated to a different block header
|
||||||
#[fail(display = "Segmenter header mismatch")]
|
#[fail(display = "Segmenter header mismatch")]
|
||||||
SegmenterHeaderMismatch,
|
SegmenterHeaderMismatch,
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
//! Base types that the block chain pipeline requires.
|
//! Base types that the block chain pipeline requires.
|
||||||
|
|
||||||
use chrono::prelude::{DateTime, Utc};
|
use chrono::prelude::{DateTime, Utc};
|
||||||
|
use chrono::Duration;
|
||||||
|
|
||||||
use crate::core::core::hash::{Hash, Hashed, ZERO_HASH};
|
use crate::core::core::hash::{Hash, Hashed, ZERO_HASH};
|
||||||
use crate::core::core::{pmmr, Block, BlockHeader, HeaderVersion, SegmentTypeIdentifier};
|
use crate::core::core::{pmmr, Block, BlockHeader, HeaderVersion, SegmentTypeIdentifier};
|
||||||
|
@ -176,7 +177,6 @@ pub struct SyncState {
|
||||||
/// but it's currently the only place that makes the info
|
/// but it's currently the only place that makes the info
|
||||||
/// available where it will be needed (both in the adapter
|
/// available where it will be needed (both in the adapter
|
||||||
/// and the sync loop)
|
/// and the sync loop)
|
||||||
/// TODO: Better struct for this, perhaps hash identifiers
|
|
||||||
requested_pibd_segments: RwLock<Vec<PIBDSegmentContainer>>,
|
requested_pibd_segments: RwLock<Vec<PIBDSegmentContainer>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -291,11 +291,15 @@ impl SyncState {
|
||||||
.retain(|i| &i.identifier != id);
|
.retain(|i| &i.identifier != id);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Remove segments with request timestamps less cutoff time
|
/// Remove segments with request timestamps less than cutoff time
|
||||||
pub fn remove_stale_pibd_requests(&self, cutoff_time: DateTime<Utc>) {
|
pub fn remove_stale_pibd_requests(&self, timeout_seconds: i64) {
|
||||||
self.requested_pibd_segments
|
let cutoff_time = Utc::now() - Duration::seconds(timeout_seconds);
|
||||||
.write()
|
self.requested_pibd_segments.write().retain(|i| {
|
||||||
.retain(|i| i.request_time < cutoff_time);
|
if i.request_time <= cutoff_time {
|
||||||
|
debug!("Removing + retrying PIBD request after timeout: {:?}", i)
|
||||||
|
};
|
||||||
|
i.request_time > cutoff_time
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Check whether segment is in request list
|
/// Check whether segment is in request list
|
||||||
|
|
|
@ -36,6 +36,9 @@ pub struct StateSync {
|
||||||
|
|
||||||
prev_state_sync: Option<DateTime<Utc>>,
|
prev_state_sync: Option<DateTime<Utc>>,
|
||||||
state_sync_peer: Option<Arc<Peer>>,
|
state_sync_peer: Option<Arc<Peer>>,
|
||||||
|
|
||||||
|
pibd_aborted: bool,
|
||||||
|
earliest_zero_pibd_peer_time: Option<DateTime<Utc>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl StateSync {
|
impl StateSync {
|
||||||
|
@ -50,9 +53,22 @@ impl StateSync {
|
||||||
chain,
|
chain,
|
||||||
prev_state_sync: None,
|
prev_state_sync: None,
|
||||||
state_sync_peer: None,
|
state_sync_peer: None,
|
||||||
|
pibd_aborted: false,
|
||||||
|
earliest_zero_pibd_peer_time: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Flag to abort PIBD process
|
||||||
|
pub fn set_pibd_aborted(&mut self) {
|
||||||
|
self.pibd_aborted = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Record earliest time at which we had no suitable
|
||||||
|
/// peers for continuing PIBD
|
||||||
|
pub fn set_earliest_zero_pibd_peer_time(&mut self, t: Option<DateTime<Utc>>) {
|
||||||
|
self.earliest_zero_pibd_peer_time = t;
|
||||||
|
}
|
||||||
|
|
||||||
/// Check whether state sync should run and triggers a state download when
|
/// Check whether state sync should run and triggers a state download when
|
||||||
/// it's time (we have all headers). Returns true as long as state sync
|
/// it's time (we have all headers). Returns true as long as state sync
|
||||||
/// needs monitoring, false when it's either done or turned off.
|
/// needs monitoring, false when it's either done or turned off.
|
||||||
|
@ -81,6 +97,8 @@ impl StateSync {
|
||||||
let using_pibd =
|
let using_pibd =
|
||||||
if let SyncStatus::TxHashsetPibd { aborted: true, .. } = self.sync_state.status() {
|
if let SyncStatus::TxHashsetPibd { aborted: true, .. } = self.sync_state.status() {
|
||||||
false
|
false
|
||||||
|
} else if self.pibd_aborted {
|
||||||
|
false
|
||||||
} else {
|
} else {
|
||||||
// Only on testing chains for now
|
// Only on testing chains for now
|
||||||
if global::get_chain_type() != global::ChainTypes::Mainnet {
|
if global::get_chain_type() != global::ChainTypes::Mainnet {
|
||||||
|
@ -240,10 +258,9 @@ impl StateSync {
|
||||||
let archive_header = self.chain.txhashset_archive_header_header_only().unwrap();
|
let archive_header = self.chain.txhashset_archive_header_header_only().unwrap();
|
||||||
let desegmenter = self.chain.desegmenter(&archive_header).unwrap();
|
let desegmenter = self.chain.desegmenter(&archive_header).unwrap();
|
||||||
|
|
||||||
// Remove stale requests
|
// Remove stale requests, if we haven't recieved the segment within a minute re-request
|
||||||
// TODO: verify timing
|
// TODO: verify timing
|
||||||
let timeout_time = Utc::now() + Duration::seconds(15);
|
self.sync_state.remove_stale_pibd_requests(60);
|
||||||
self.sync_state.remove_stale_pibd_requests(timeout_time);
|
|
||||||
|
|
||||||
// Apply segments... TODO: figure out how this should be called, might
|
// Apply segments... TODO: figure out how this should be called, might
|
||||||
// need to be a separate thread.
|
// need to be a separate thread.
|
||||||
|
@ -280,22 +297,50 @@ impl StateSync {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
let peers_iter = || {
|
// TODO: urg
|
||||||
self.peers
|
let peers = self.peers.clone();
|
||||||
.iter()
|
|
||||||
|
// First, get max difficulty or greater peers
|
||||||
|
let peers_iter = || peers.iter().connected();
|
||||||
|
let max_diff = peers_iter().max_difficulty().unwrap_or(Difficulty::zero());
|
||||||
|
let peers_iter_max = || peers_iter().with_difficulty(|x| x >= max_diff);
|
||||||
|
|
||||||
|
// Then, further filter by PIBD capabilities
|
||||||
|
let peers_iter_pibd = || {
|
||||||
|
peers_iter_max()
|
||||||
.with_capabilities(Capabilities::PIBD_HIST)
|
.with_capabilities(Capabilities::PIBD_HIST)
|
||||||
.connected()
|
.connected()
|
||||||
};
|
};
|
||||||
|
|
||||||
// Filter peers further based on max difficulty.
|
// If there are no suitable PIBD-Enabled peers, AND there hasn't been one for a minute,
|
||||||
let max_diff = peers_iter().max_difficulty().unwrap_or(Difficulty::zero());
|
// abort PIBD and fall back to txhashset download
|
||||||
let peers_iter = || peers_iter().with_difficulty(|x| x >= max_diff);
|
// Waiting a minute helps ensures that the cancellation isn't simply due to a single non-PIBD enabled
|
||||||
|
// peer having the max difficulty
|
||||||
|
if peers_iter_pibd().count() == 0 {
|
||||||
|
if let None = self.earliest_zero_pibd_peer_time {
|
||||||
|
self.set_earliest_zero_pibd_peer_time(Some(Utc::now()));
|
||||||
|
}
|
||||||
|
if self.earliest_zero_pibd_peer_time.unwrap() + Duration::seconds(60) < Utc::now() {
|
||||||
|
// random abort test
|
||||||
|
info!("No PIBD-enabled max-difficulty peers for the past minute - Aborting PIBD and falling back to TxHashset.zip download");
|
||||||
|
self.sync_state
|
||||||
|
.update_pibd_progress(true, true, 0, 1, &archive_header);
|
||||||
|
self.sync_state
|
||||||
|
.set_sync_error(chain::ErrorKind::AbortingPIBDError.into());
|
||||||
|
self.set_pibd_aborted();
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
self.set_earliest_zero_pibd_peer_time(None)
|
||||||
|
}
|
||||||
|
|
||||||
// Choose a random "most work" peer, preferring outbound if at all possible.
|
// Choose a random "most work" peer, preferring outbound if at all possible.
|
||||||
let peer = peers_iter().outbound().choose_random().or_else(|| {
|
let peer = peers_iter_pibd().outbound().choose_random().or_else(|| {
|
||||||
warn!("no suitable outbound peer for pibd message, considering inbound");
|
warn!("no suitable outbound peer for pibd message, considering inbound");
|
||||||
peers_iter().inbound().choose_random()
|
peers_iter_pibd().inbound().choose_random()
|
||||||
});
|
});
|
||||||
trace!("Chosen peer is {:?}", peer);
|
trace!("Chosen peer is {:?}", peer);
|
||||||
|
|
||||||
if let Some(p) = peer {
|
if let Some(p) = peer {
|
||||||
// add to list of segments that are being tracked
|
// add to list of segments that are being tracked
|
||||||
self.sync_state.add_pibd_segment(seg_id);
|
self.sync_state.add_pibd_segment(seg_id);
|
||||||
|
|
Loading…
Reference in a new issue