From d8c6eef4855cdba60e241ea7caf497c7185ea9f4 Mon Sep 17 00:00:00 2001 From: hashmap Date: Mon, 20 Apr 2020 12:30:04 +0200 Subject: [PATCH] Refactor SyncState (#3297) * Refactor SyncState Method sync_error() retrun type was simplified. update_txhashset_download() was made type safe, which eliminates a runtime enum variant's check, added an atomic status update --- api/src/handlers/server_api.rs | 13 +--- chain/src/lib.rs | 3 +- chain/src/types.rs | 109 ++++++++++++++++++++-------- servers/src/common/adapters.rs | 30 ++++---- servers/src/grin/sync/state_sync.rs | 54 ++++++-------- src/bin/tui/status.rs | 25 ++----- util/src/lib.rs | 2 +- 7 files changed, 130 insertions(+), 106 deletions(-) diff --git a/api/src/handlers/server_api.rs b/api/src/handlers/server_api.rs index d4e9808f2..5b14557d0 100644 --- a/api/src/handlers/server_api.rs +++ b/api/src/handlers/server_api.rs @@ -79,16 +79,11 @@ fn sync_status_to_api(sync_status: SyncStatus) -> (String, Option ( + SyncStatus::TxHashsetDownload(stats) => ( "txhashset_download".to_string(), - Some(json!({ "downloaded_size": downloaded_size, "total_size": total_size })), + Some( + json!({ "downloaded_size": stats.downloaded_size, "total_size": stats.total_size }), + ), ), SyncStatus::TxHashsetRangeProofsValidation { rproofs, diff --git a/chain/src/lib.rs b/chain/src/lib.rs index 90ff59ce7..78da144bc 100644 --- a/chain/src/lib.rs +++ b/chain/src/lib.rs @@ -46,5 +46,6 @@ pub use crate::chain::{Chain, MAX_ORPHAN_SIZE}; pub use crate::error::{Error, ErrorKind}; pub use crate::store::ChainStore; pub use crate::types::{ - BlockStatus, ChainAdapter, Options, SyncState, SyncStatus, Tip, TxHashsetWriteStatus, + BlockStatus, ChainAdapter, Options, SyncState, SyncStatus, Tip, TxHashsetDownloadStats, + TxHashsetWriteStatus, }; diff --git a/chain/src/types.rs b/chain/src/types.rs index b07e100fd..0cc9d3273 100644 --- a/chain/src/types.rs +++ b/chain/src/types.rs @@ -15,14 +15,13 @@ //! Base types that the block chain pipeline requires. use chrono::prelude::{DateTime, Utc}; -use std::sync::Arc; use crate::core::core::hash::{Hash, Hashed, ZERO_HASH}; use crate::core::core::{Block, BlockHeader, HeaderVersion}; use crate::core::pow::Difficulty; use crate::core::ser::{self, PMMRIndexHashable, Readable, Reader, Writeable, Writer}; use crate::error::{Error, ErrorKind}; -use crate::util::RwLock; +use crate::util::{RwLock, RwLockWriteGuard}; bitflags! { /// Options for block validation @@ -40,7 +39,6 @@ bitflags! { /// Various status sync can be in, whether it's fast sync or archival. #[derive(Debug, Clone, Copy, Eq, PartialEq, Deserialize, Serialize)] -#[allow(missing_docs)] pub enum SyncStatus { /// Initial State (we do not yet know if we are/should be syncing) Initial, @@ -51,28 +49,27 @@ pub enum SyncStatus { AwaitingPeers(bool), /// Downloading block headers HeaderSync { + /// current node height current_height: u64, + /// height of the most advanced peer highest_height: u64, }, /// Downloading the various txhashsets - TxHashsetDownload { - start_time: DateTime, - prev_update_time: DateTime, - update_time: DateTime, - prev_downloaded_size: u64, - downloaded_size: u64, - total_size: u64, - }, + TxHashsetDownload(TxHashsetDownloadStats), /// Setting up before validation TxHashsetSetup, /// Validating the kernels TxHashsetKernelsValidation { + /// kernels validated kernels: u64, + /// kernels in total kernels_total: u64, }, /// Validating the range proofs TxHashsetRangeProofsValidation { + /// range proofs validated rproofs: u64, + /// range proofs in total rproofs_total: u64, }, /// Finalizing the new state @@ -81,16 +78,49 @@ pub enum SyncStatus { TxHashsetDone, /// Downloading blocks BodySync { + /// current node height current_height: u64, + /// height of the most advanced peer highest_height: u64, }, + /// Shutdown Shutdown, } +/// Stats for TxHashsetDownload stage +#[derive(Debug, Clone, Copy, Eq, PartialEq, Deserialize, Serialize)] +pub struct TxHashsetDownloadStats { + /// when download started + pub start_time: DateTime, + /// time of the previous update + pub prev_update_time: DateTime, + /// time of the latest update + pub update_time: DateTime, + /// size of the previous chunk + pub prev_downloaded_size: u64, + /// size of the the latest chunk + pub downloaded_size: u64, + /// downloaded since the start + pub total_size: u64, +} + +impl Default for TxHashsetDownloadStats { + fn default() -> Self { + TxHashsetDownloadStats { + start_time: Utc::now(), + update_time: Utc::now(), + prev_update_time: Utc::now(), + prev_downloaded_size: 0, + downloaded_size: 0, + total_size: 0, + } + } +} + /// Current sync state. Encapsulates the current SyncStatus. pub struct SyncState { current: RwLock, - sync_error: Arc>>, + sync_error: RwLock>, } impl SyncState { @@ -98,7 +128,7 @@ impl SyncState { pub fn new() -> SyncState { SyncState { current: RwLock::new(SyncStatus::Initial), - sync_error: Arc::new(RwLock::new(None)), + sync_error: RwLock::new(None), } } @@ -114,37 +144,54 @@ impl SyncState { } /// Update the syncing status - pub fn update(&self, new_status: SyncStatus) { - if self.status() == new_status { - return; - } - - let mut status = self.current.write(); - - debug!("sync_state: sync_status: {:?} -> {:?}", *status, new_status,); - - *status = new_status; + pub fn update(&self, new_status: SyncStatus) -> bool { + let status = self.current.write(); + self.update_with_guard(new_status, status) } - /// Update txhashset downloading progress - pub fn update_txhashset_download(&self, new_status: SyncStatus) -> bool { - if let SyncStatus::TxHashsetDownload { .. } = new_status { - let mut status = self.current.write(); - *status = new_status; - true + fn update_with_guard( + &self, + new_status: SyncStatus, + mut status: RwLockWriteGuard, + ) -> bool { + if *status == new_status { + return false; + } + + debug!("sync_state: sync_status: {:?} -> {:?}", *status, new_status,); + *status = new_status; + true + } + + /// Update the syncing status if predicate f is satisfied + pub fn update_if(&self, new_status: SyncStatus, f: F) -> bool + where + F: Fn(SyncStatus) -> bool, + { + let status = self.current.write(); + if f(*status) { + self.update_with_guard(new_status, status) } else { false } } + /// Update txhashset downloading progress + pub fn update_txhashset_download(&self, stats: TxHashsetDownloadStats) { + *self.current.write() = SyncStatus::TxHashsetDownload(stats); + } + /// Communicate sync error pub fn set_sync_error(&self, error: Error) { *self.sync_error.write() = Some(error); } /// Get sync error - pub fn sync_error(&self) -> Arc>> { - Arc::clone(&self.sync_error) + pub fn sync_error(&self) -> Option { + self.sync_error + .read() + .as_ref() + .and_then(|e| Some(e.to_string())) } /// Clear sync error diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index 8680c009f..8af204788 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -23,7 +23,9 @@ use std::sync::{Arc, Weak}; use std::thread; use std::time::Instant; -use crate::chain::{self, BlockStatus, ChainAdapter, Options, SyncState, SyncStatus}; +use crate::chain::{ + self, BlockStatus, ChainAdapter, Options, SyncState, SyncStatus, TxHashsetDownloadStats, +}; use crate::common::hooks::{ChainEvents, NetEvents}; use crate::common::types::{ChainValidationMode, DandelionEpoch, ServerConfig}; use crate::core::core::hash::{Hash, Hashed}; @@ -399,20 +401,18 @@ impl p2p::ChainAdapter for NetToChainAdapter { total_size: u64, ) -> bool { match self.sync_state.status() { - SyncStatus::TxHashsetDownload { - update_time: old_update_time, - downloaded_size: old_downloaded_size, - .. - } => self - .sync_state - .update_txhashset_download(SyncStatus::TxHashsetDownload { - start_time, - prev_update_time: old_update_time, - update_time: Utc::now(), - prev_downloaded_size: old_downloaded_size, - downloaded_size, - total_size, - }), + SyncStatus::TxHashsetDownload(prev) => { + self.sync_state + .update_txhashset_download(TxHashsetDownloadStats { + start_time, + prev_update_time: prev.update_time, + update_time: Utc::now(), + prev_downloaded_size: prev.downloaded_size, + downloaded_size, + total_size, + }); + true + } _ => false, } } diff --git a/servers/src/grin/sync/state_sync.rs b/servers/src/grin/sync/state_sync.rs index 23a87bb39..6ea354204 100644 --- a/servers/src/grin/sync/state_sync.rs +++ b/servers/src/grin/sync/state_sync.rs @@ -68,13 +68,9 @@ impl StateSync { let mut sync_need_restart = false; // check sync error - { - let clone = self.sync_state.sync_error(); - if let Some(ref sync_error) = *clone.read() { - error!("state_sync: error = {:?}. restart fast sync", sync_error); - sync_need_restart = true; - } - drop(clone); + if let Some(sync_error) = self.sync_state.sync_error() { + error!("state_sync: error = {}. restart fast sync", sync_error); + sync_need_restart = true; } // check peer connection status of this sync @@ -92,15 +88,16 @@ impl StateSync { // if txhashset downloaded and validated successfully, we switch to BodySync state, // and we need call state_sync_reset() to make it ready for next possible state sync. - let done = if let SyncStatus::TxHashsetDone = self.sync_state.status() { - self.sync_state.update(SyncStatus::BodySync { + let done = self.sync_state.update_if( + SyncStatus::BodySync { current_height: 0, highest_height: 0, - }); - true - } else { - false - }; + }, + |s| match s { + SyncStatus::TxHashsetDone => true, + _ => false, + }, + ); if sync_need_restart || done { self.state_sync_reset(); @@ -137,24 +134,19 @@ impl StateSync { // to avoid the confusing log, // update the final HeaderSync state mainly for 'current_height' - { - let status = self.sync_state.status(); - if let SyncStatus::HeaderSync { .. } = status { - self.sync_state.update(SyncStatus::HeaderSync { - current_height: header_head.height, - highest_height, - }); - } - } + self.sync_state.update_if( + SyncStatus::HeaderSync { + current_height: header_head.height, + highest_height, + }, + |s| match s { + SyncStatus::HeaderSync { .. } => true, + _ => false, + }, + ); - self.sync_state.update(SyncStatus::TxHashsetDownload { - start_time: Utc::now(), - prev_update_time: Utc::now(), - update_time: Utc::now(), - prev_downloaded_size: 0, - downloaded_size: 0, - total_size: 0, - }); + self.sync_state + .update(SyncStatus::TxHashsetDownload(Default::default())); } } true diff --git a/src/bin/tui/status.rs b/src/bin/tui/status.rs index ace873adb..d3ad1ff4f 100644 --- a/src/bin/tui/status.rs +++ b/src/bin/tui/status.rs @@ -48,31 +48,20 @@ impl TUIStatusView { }; format!("Sync step 1/7: Downloading headers: {}%", percent) } - SyncStatus::TxHashsetDownload { - start_time, - prev_update_time, - update_time: _, - prev_downloaded_size, - downloaded_size, - total_size, - } => { - if total_size > 0 { - let percent = if total_size > 0 { - downloaded_size * 100 / total_size - } else { - 0 - }; - let start = prev_update_time.timestamp_nanos(); + SyncStatus::TxHashsetDownload(stat) => { + if stat.total_size > 0 { + let percent = stat.downloaded_size * 100 / stat.total_size; + let start = stat.prev_update_time.timestamp_nanos(); let fin = Utc::now().timestamp_nanos(); let dur_ms = (fin - start) as f64 * NANO_TO_MILLIS; format!("Sync step 2/7: Downloading {}(MB) chain state for state sync: {}% at {:.1?}(kB/s)", - total_size / 1_000_000, + stat.total_size / 1_000_000, percent, - if dur_ms > 1.0f64 { downloaded_size.saturating_sub(prev_downloaded_size) as f64 / dur_ms as f64 } else { 0f64 }, + if dur_ms > 1.0f64 { stat.downloaded_size.saturating_sub(stat.prev_downloaded_size) as f64 / dur_ms as f64 } else { 0f64 }, ) } else { - let start = start_time.timestamp_millis(); + let start = stat.start_time.timestamp_millis(); let fin = Utc::now().timestamp_millis(); let dur_secs = (fin - start) / 1000; diff --git a/util/src/lib.rs b/util/src/lib.rs index 669f7751a..dd1c4087d 100644 --- a/util/src/lib.rs +++ b/util/src/lib.rs @@ -29,7 +29,7 @@ extern crate lazy_static; extern crate serde_derive; // Re-export so only has to be included once pub use parking_lot::Mutex; -pub use parking_lot::{RwLock, RwLockReadGuard}; +pub use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}; // Re-export so only has to be included once pub use secp256k1zkp as secp;