Refactor SyncState (#3297)

* Refactor SyncState

Method sync_error() retrun type was simplified.
update_txhashset_download() was  made type safe, which eliminates a runtime enum variant's  check, added an atomic status update
This commit is contained in:
hashmap 2020-04-20 12:30:04 +02:00 committed by GitHub
parent e64e90623b
commit d8c6eef485
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 130 additions and 106 deletions

View file

@ -79,16 +79,11 @@ fn sync_status_to_api(sync_status: SyncStatus) -> (String, Option<serde_json::Va
"header_sync".to_string(),
Some(json!({ "current_height": current_height, "highest_height": highest_height })),
),
SyncStatus::TxHashsetDownload {
start_time: _,
prev_update_time: _,
update_time: _,
prev_downloaded_size: _,
downloaded_size,
total_size,
} => (
SyncStatus::TxHashsetDownload(stats) => (
"txhashset_download".to_string(),
Some(json!({ "downloaded_size": downloaded_size, "total_size": total_size })),
Some(
json!({ "downloaded_size": stats.downloaded_size, "total_size": stats.total_size }),
),
),
SyncStatus::TxHashsetRangeProofsValidation {
rproofs,

View file

@ -46,5 +46,6 @@ pub use crate::chain::{Chain, MAX_ORPHAN_SIZE};
pub use crate::error::{Error, ErrorKind};
pub use crate::store::ChainStore;
pub use crate::types::{
BlockStatus, ChainAdapter, Options, SyncState, SyncStatus, Tip, TxHashsetWriteStatus,
BlockStatus, ChainAdapter, Options, SyncState, SyncStatus, Tip, TxHashsetDownloadStats,
TxHashsetWriteStatus,
};

View file

@ -15,14 +15,13 @@
//! Base types that the block chain pipeline requires.
use chrono::prelude::{DateTime, Utc};
use std::sync::Arc;
use crate::core::core::hash::{Hash, Hashed, ZERO_HASH};
use crate::core::core::{Block, BlockHeader, HeaderVersion};
use crate::core::pow::Difficulty;
use crate::core::ser::{self, PMMRIndexHashable, Readable, Reader, Writeable, Writer};
use crate::error::{Error, ErrorKind};
use crate::util::RwLock;
use crate::util::{RwLock, RwLockWriteGuard};
bitflags! {
/// Options for block validation
@ -40,7 +39,6 @@ bitflags! {
/// Various status sync can be in, whether it's fast sync or archival.
#[derive(Debug, Clone, Copy, Eq, PartialEq, Deserialize, Serialize)]
#[allow(missing_docs)]
pub enum SyncStatus {
/// Initial State (we do not yet know if we are/should be syncing)
Initial,
@ -51,28 +49,27 @@ pub enum SyncStatus {
AwaitingPeers(bool),
/// Downloading block headers
HeaderSync {
/// current node height
current_height: u64,
/// height of the most advanced peer
highest_height: u64,
},
/// Downloading the various txhashsets
TxHashsetDownload {
start_time: DateTime<Utc>,
prev_update_time: DateTime<Utc>,
update_time: DateTime<Utc>,
prev_downloaded_size: u64,
downloaded_size: u64,
total_size: u64,
},
TxHashsetDownload(TxHashsetDownloadStats),
/// Setting up before validation
TxHashsetSetup,
/// Validating the kernels
TxHashsetKernelsValidation {
/// kernels validated
kernels: u64,
/// kernels in total
kernels_total: u64,
},
/// Validating the range proofs
TxHashsetRangeProofsValidation {
/// range proofs validated
rproofs: u64,
/// range proofs in total
rproofs_total: u64,
},
/// Finalizing the new state
@ -81,16 +78,49 @@ pub enum SyncStatus {
TxHashsetDone,
/// Downloading blocks
BodySync {
/// current node height
current_height: u64,
/// height of the most advanced peer
highest_height: u64,
},
/// Shutdown
Shutdown,
}
/// Stats for TxHashsetDownload stage
#[derive(Debug, Clone, Copy, Eq, PartialEq, Deserialize, Serialize)]
pub struct TxHashsetDownloadStats {
/// when download started
pub start_time: DateTime<Utc>,
/// time of the previous update
pub prev_update_time: DateTime<Utc>,
/// time of the latest update
pub update_time: DateTime<Utc>,
/// size of the previous chunk
pub prev_downloaded_size: u64,
/// size of the the latest chunk
pub downloaded_size: u64,
/// downloaded since the start
pub total_size: u64,
}
impl Default for TxHashsetDownloadStats {
fn default() -> Self {
TxHashsetDownloadStats {
start_time: Utc::now(),
update_time: Utc::now(),
prev_update_time: Utc::now(),
prev_downloaded_size: 0,
downloaded_size: 0,
total_size: 0,
}
}
}
/// Current sync state. Encapsulates the current SyncStatus.
pub struct SyncState {
current: RwLock<SyncStatus>,
sync_error: Arc<RwLock<Option<Error>>>,
sync_error: RwLock<Option<Error>>,
}
impl SyncState {
@ -98,7 +128,7 @@ impl SyncState {
pub fn new() -> SyncState {
SyncState {
current: RwLock::new(SyncStatus::Initial),
sync_error: Arc::new(RwLock::new(None)),
sync_error: RwLock::new(None),
}
}
@ -114,37 +144,54 @@ impl SyncState {
}
/// Update the syncing status
pub fn update(&self, new_status: SyncStatus) {
if self.status() == new_status {
return;
}
let mut status = self.current.write();
debug!("sync_state: sync_status: {:?} -> {:?}", *status, new_status,);
*status = new_status;
pub fn update(&self, new_status: SyncStatus) -> bool {
let status = self.current.write();
self.update_with_guard(new_status, status)
}
/// Update txhashset downloading progress
pub fn update_txhashset_download(&self, new_status: SyncStatus) -> bool {
if let SyncStatus::TxHashsetDownload { .. } = new_status {
let mut status = self.current.write();
*status = new_status;
true
fn update_with_guard(
&self,
new_status: SyncStatus,
mut status: RwLockWriteGuard<SyncStatus>,
) -> bool {
if *status == new_status {
return false;
}
debug!("sync_state: sync_status: {:?} -> {:?}", *status, new_status,);
*status = new_status;
true
}
/// Update the syncing status if predicate f is satisfied
pub fn update_if<F>(&self, new_status: SyncStatus, f: F) -> bool
where
F: Fn(SyncStatus) -> bool,
{
let status = self.current.write();
if f(*status) {
self.update_with_guard(new_status, status)
} else {
false
}
}
/// Update txhashset downloading progress
pub fn update_txhashset_download(&self, stats: TxHashsetDownloadStats) {
*self.current.write() = SyncStatus::TxHashsetDownload(stats);
}
/// Communicate sync error
pub fn set_sync_error(&self, error: Error) {
*self.sync_error.write() = Some(error);
}
/// Get sync error
pub fn sync_error(&self) -> Arc<RwLock<Option<Error>>> {
Arc::clone(&self.sync_error)
pub fn sync_error(&self) -> Option<String> {
self.sync_error
.read()
.as_ref()
.and_then(|e| Some(e.to_string()))
}
/// Clear sync error

View file

@ -23,7 +23,9 @@ use std::sync::{Arc, Weak};
use std::thread;
use std::time::Instant;
use crate::chain::{self, BlockStatus, ChainAdapter, Options, SyncState, SyncStatus};
use crate::chain::{
self, BlockStatus, ChainAdapter, Options, SyncState, SyncStatus, TxHashsetDownloadStats,
};
use crate::common::hooks::{ChainEvents, NetEvents};
use crate::common::types::{ChainValidationMode, DandelionEpoch, ServerConfig};
use crate::core::core::hash::{Hash, Hashed};
@ -399,20 +401,18 @@ impl p2p::ChainAdapter for NetToChainAdapter {
total_size: u64,
) -> bool {
match self.sync_state.status() {
SyncStatus::TxHashsetDownload {
update_time: old_update_time,
downloaded_size: old_downloaded_size,
..
} => self
.sync_state
.update_txhashset_download(SyncStatus::TxHashsetDownload {
start_time,
prev_update_time: old_update_time,
update_time: Utc::now(),
prev_downloaded_size: old_downloaded_size,
downloaded_size,
total_size,
}),
SyncStatus::TxHashsetDownload(prev) => {
self.sync_state
.update_txhashset_download(TxHashsetDownloadStats {
start_time,
prev_update_time: prev.update_time,
update_time: Utc::now(),
prev_downloaded_size: prev.downloaded_size,
downloaded_size,
total_size,
});
true
}
_ => false,
}
}

View file

@ -68,13 +68,9 @@ impl StateSync {
let mut sync_need_restart = false;
// check sync error
{
let clone = self.sync_state.sync_error();
if let Some(ref sync_error) = *clone.read() {
error!("state_sync: error = {:?}. restart fast sync", sync_error);
sync_need_restart = true;
}
drop(clone);
if let Some(sync_error) = self.sync_state.sync_error() {
error!("state_sync: error = {}. restart fast sync", sync_error);
sync_need_restart = true;
}
// check peer connection status of this sync
@ -92,15 +88,16 @@ impl StateSync {
// if txhashset downloaded and validated successfully, we switch to BodySync state,
// and we need call state_sync_reset() to make it ready for next possible state sync.
let done = if let SyncStatus::TxHashsetDone = self.sync_state.status() {
self.sync_state.update(SyncStatus::BodySync {
let done = self.sync_state.update_if(
SyncStatus::BodySync {
current_height: 0,
highest_height: 0,
});
true
} else {
false
};
},
|s| match s {
SyncStatus::TxHashsetDone => true,
_ => false,
},
);
if sync_need_restart || done {
self.state_sync_reset();
@ -137,24 +134,19 @@ impl StateSync {
// to avoid the confusing log,
// update the final HeaderSync state mainly for 'current_height'
{
let status = self.sync_state.status();
if let SyncStatus::HeaderSync { .. } = status {
self.sync_state.update(SyncStatus::HeaderSync {
current_height: header_head.height,
highest_height,
});
}
}
self.sync_state.update_if(
SyncStatus::HeaderSync {
current_height: header_head.height,
highest_height,
},
|s| match s {
SyncStatus::HeaderSync { .. } => true,
_ => false,
},
);
self.sync_state.update(SyncStatus::TxHashsetDownload {
start_time: Utc::now(),
prev_update_time: Utc::now(),
update_time: Utc::now(),
prev_downloaded_size: 0,
downloaded_size: 0,
total_size: 0,
});
self.sync_state
.update(SyncStatus::TxHashsetDownload(Default::default()));
}
}
true

View file

@ -48,31 +48,20 @@ impl TUIStatusView {
};
format!("Sync step 1/7: Downloading headers: {}%", percent)
}
SyncStatus::TxHashsetDownload {
start_time,
prev_update_time,
update_time: _,
prev_downloaded_size,
downloaded_size,
total_size,
} => {
if total_size > 0 {
let percent = if total_size > 0 {
downloaded_size * 100 / total_size
} else {
0
};
let start = prev_update_time.timestamp_nanos();
SyncStatus::TxHashsetDownload(stat) => {
if stat.total_size > 0 {
let percent = stat.downloaded_size * 100 / stat.total_size;
let start = stat.prev_update_time.timestamp_nanos();
let fin = Utc::now().timestamp_nanos();
let dur_ms = (fin - start) as f64 * NANO_TO_MILLIS;
format!("Sync step 2/7: Downloading {}(MB) chain state for state sync: {}% at {:.1?}(kB/s)",
total_size / 1_000_000,
stat.total_size / 1_000_000,
percent,
if dur_ms > 1.0f64 { downloaded_size.saturating_sub(prev_downloaded_size) as f64 / dur_ms as f64 } else { 0f64 },
if dur_ms > 1.0f64 { stat.downloaded_size.saturating_sub(stat.prev_downloaded_size) as f64 / dur_ms as f64 } else { 0f64 },
)
} else {
let start = start_time.timestamp_millis();
let start = stat.start_time.timestamp_millis();
let fin = Utc::now().timestamp_millis();
let dur_secs = (fin - start) / 1000;

View file

@ -29,7 +29,7 @@ extern crate lazy_static;
extern crate serde_derive;
// Re-export so only has to be included once
pub use parking_lot::Mutex;
pub use parking_lot::{RwLock, RwLockReadGuard};
pub use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
// Re-export so only has to be included once
pub use secp256k1zkp as secp;