diff --git a/chain/src/chain.rs b/chain/src/chain.rs index b5001d62e..7c8d8be3f 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -30,7 +30,10 @@ use grin_store::Error::NotFoundErr; use pipe; use store; use txhashset; -use types::{BlockMarker, BlockSums, ChainAdapter, ChainStore, Error, Options, Tip}; +use types::{ + BlockMarker, BlockSums, ChainAdapter, ChainStore, Error, NoStatus, Options, Tip, + TxHashsetWriteStatus, +}; use util::secp::pedersen::{Commitment, RangeProof}; use util::LOGGER; @@ -532,7 +535,7 @@ impl Chain { txhashset::extending_readonly(&mut txhashset, |extension| { // TODO - is this rewind guaranteed to be redundant now? extension.rewind(&header, &header)?; - extension.validate(&header, skip_rproofs)?; + extension.validate(&header, skip_rproofs, &NoStatus)?; Ok(()) }) } @@ -621,14 +624,19 @@ impl Chain { /// If we're willing to accept that new state, the data stream will be /// read as a zip file, unzipped and the resulting state files should be /// rewound to the provided indexes. - pub fn txhashset_write( + pub fn txhashset_write( &self, h: Hash, rewind_to_output: u64, rewind_to_kernel: u64, txhashset_data: File, - ) -> Result<(), Error> { - let _lock = self.txhashset_lock.lock().unwrap(); + status: &T, + ) -> Result<(), Error> + where + T: TxHashsetWriteStatus, + { + self.txhashset_lock.lock().unwrap(); + status.on_setup(); let head = self.head().unwrap(); let header_head = self.get_header_head().unwrap(); if header_head.height - head.height < global::cut_through_horizon() as u64 { @@ -655,16 +663,18 @@ impl Chain { txhashset::TxHashSet::open(self.db_root.clone(), self.store.clone(), Some(&header))?; // Note: we are validating against a writeable extension. + status.on_validation(0, 0, 0, 0); txhashset::extending(&mut txhashset, |extension| { // TODO do we need to rewind here? We have no blocks to rewind // (and we need them for the pos to unremove) extension.rewind(&header, &header)?; - let (output_sum, kernel_sum) = extension.validate(&header, false)?; + let (output_sum, kernel_sum) = extension.validate(&header, false, status)?; extension.save_latest_block_sums(&header, output_sum, kernel_sum)?; extension.rebuild_index()?; Ok(()) })?; + status.on_save(); // replace the chain txhashset with the newly built one { let mut txhashset_ref = self.txhashset.write().unwrap(); @@ -682,6 +692,7 @@ impl Chain { self.check_orphans(header.height + 1); + status.on_done(); Ok(()) } diff --git a/chain/src/lib.rs b/chain/src/lib.rs index cdd0b44bc..3319ff264 100644 --- a/chain/src/lib.rs +++ b/chain/src/lib.rs @@ -46,4 +46,4 @@ pub mod types; // Re-export the base interface pub use chain::{Chain, MAX_ORPHAN_SIZE}; -pub use types::{BlockSums, ChainAdapter, ChainStore, Error, Options, Tip}; +pub use types::{BlockSums, ChainAdapter, ChainStore, Error, Options, Tip, TxHashsetWriteStatus}; diff --git a/chain/src/txhashset.rs b/chain/src/txhashset.rs index 336dfd529..3ee595b7b 100644 --- a/chain/src/txhashset.rs +++ b/chain/src/txhashset.rs @@ -29,15 +29,16 @@ use util::secp::pedersen::{Commitment, RangeProof}; use core::core::committed::Committed; use core::core::hash::{Hash, Hashed}; use core::core::pmmr::{self, MerkleProof, PMMR}; -use core::core::{Block, BlockHeader, Input, Output, OutputFeatures, OutputIdentifier, Transaction, - TxKernel}; +use core::core::{ + Block, BlockHeader, Input, Output, OutputFeatures, OutputIdentifier, Transaction, TxKernel, +}; use core::global; use core::ser::{PMMRIndexHashable, PMMRable}; use grin_store; use grin_store::pmmr::PMMRBackend; use grin_store::types::prune_noop; -use types::{BlockMarker, BlockSums, ChainStore, Error, TxHashSetRoots}; +use types::{BlockMarker, BlockSums, ChainStore, Error, TxHashSetRoots, TxHashsetWriteStatus}; use util::{secp_static, zip, LOGGER}; const TXHASHSET_SUBDIR: &'static str = "txhashset"; @@ -832,7 +833,8 @@ impl<'a> Extension<'a> { } let roots = self.roots(); - if roots.output_root != header.output_root || roots.rproof_root != header.range_proof_root + if roots.output_root != header.output_root + || roots.rproof_root != header.range_proof_root || roots.kernel_root != header.kernel_root { return Err(Error::InvalidRoot); @@ -864,11 +866,15 @@ impl<'a> Extension<'a> { } /// Validate the txhashset state against the provided block header. - pub fn validate( + pub fn validate( &mut self, header: &BlockHeader, skip_rproofs: bool, - ) -> Result<((Commitment, Commitment)), Error> { + status: &T, + ) -> Result<((Commitment, Commitment)), Error> + where + T: TxHashsetWriteStatus, + { self.validate_mmrs()?; self.validate_roots(header)?; @@ -877,9 +883,8 @@ impl<'a> Extension<'a> { return Ok((zero_commit.clone(), zero_commit.clone())); } - // The real magicking happens here. - // Sum of kernel excesses should equal sum of - // unspent outputs minus total supply. + // The real magicking happens here. Sum of kernel excesses should equal sum + // of unspent outputs minus total supply. let (output_sum, kernel_sum) = self.verify_kernel_sums( header.total_overage(), header.total_kernel_offset(), @@ -888,12 +893,12 @@ impl<'a> Extension<'a> { )?; // This is an expensive verification step. - self.verify_kernel_signatures()?; + self.verify_kernel_signatures(status)?; // Verify the rangeproof for each output in the sum above. // This is an expensive verification step (skip for faster verification). if !skip_rproofs { - self.verify_rangeproofs()?; + self.verify_rangeproofs(status)?; } Ok((output_sum, kernel_sum)) @@ -969,10 +974,14 @@ impl<'a> Extension<'a> { ) } - fn verify_kernel_signatures(&self) -> Result<(), Error> { + fn verify_kernel_signatures(&self, status: &T) -> Result<(), Error> + where + T: TxHashsetWriteStatus, + { let now = Instant::now(); let mut kern_count = 0; + let total_kernels = pmmr::n_leaves(self.kernel_pmmr.unpruned_size()); for n in 1..self.kernel_pmmr.unpruned_size() + 1 { if pmmr::is_leaf(n) { if let Some(kernel) = self.kernel_pmmr.get_data(n) { @@ -980,6 +989,9 @@ impl<'a> Extension<'a> { kern_count += 1; } } + if n % 20 == 0 { + status.on_validation(kern_count, total_kernels, 0, 0); + } } debug!( @@ -993,10 +1005,14 @@ impl<'a> Extension<'a> { Ok(()) } - fn verify_rangeproofs(&self) -> Result<(), Error> { + fn verify_rangeproofs(&self, status: &T) -> Result<(), Error> + where + T: TxHashsetWriteStatus, + { let now = Instant::now(); let mut proof_count = 0; + let total_rproofs = pmmr::n_leaves(self.output_pmmr.unpruned_size()); for n in 1..self.output_pmmr.unpruned_size() + 1 { if pmmr::is_leaf(n) { if let Some(out) = self.output_pmmr.get_data(n) { @@ -1016,6 +1032,9 @@ impl<'a> Extension<'a> { } } } + if n % 20 == 0 { + status.on_validation(0, 0, proof_count, total_rproofs); + } } debug!( LOGGER, diff --git a/chain/src/types.rs b/chain/src/types.rs index bb45566c0..d37b4271a 100644 --- a/chain/src/types.rs +++ b/chain/src/types.rs @@ -375,6 +375,32 @@ pub trait ChainAdapter { fn block_accepted(&self, b: &Block, opts: Options); } +/// Inform the caller of the current status of a txhashset write operation, +/// as it can take quite a while to process. Each function is called in the +/// order defined below and can be used to provide some feedback to the +/// caller. Functions taking arguments can be called repeatedly to update +/// those values as the processing progresses. +pub trait TxHashsetWriteStatus { + /// First setup of the txhashset + fn on_setup(&self); + /// Starting validation + fn on_validation(&self, kernels: u64, kernel_total: u64, rproofs: u64, rproof_total: u64); + /// Starting to save the txhashset and related data + fn on_save(&self); + /// Done writing a new txhashset + fn on_done(&self); +} + +/// Do-nothing implementation of TxHashsetWriteStatus +pub struct NoStatus; + +impl TxHashsetWriteStatus for NoStatus { + fn on_setup(&self) {} + fn on_validation(&self, _ks: u64, _kts: u64, _rs: u64, _rt: u64) {} + fn on_save(&self) {} + fn on_done(&self) {} +} + /// Dummy adapter used as a placeholder for real implementations pub struct NoopAdapter {} diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index 0e789e49e..dc2133e94 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -19,13 +19,12 @@ use rand::{self, Rng}; use std::fs::File; use std::net::SocketAddr; use std::ops::Deref; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock, Weak}; use std::thread; use std::time::Instant; use chain::{self, ChainAdapter, Options, Tip}; -use common::types::{ChainValidationMode, ServerConfig}; +use common::types::{ChainValidationMode, ServerConfig, SyncState}; use core::core; use core::core::block::BlockHeader; use core::core::hash::{Hash, Hashed}; @@ -51,7 +50,7 @@ fn wo(weak_one: &OneTime>) -> Arc { /// blocks and transactions are received and forwards to the chain and pool /// implementations. pub struct NetToChainAdapter { - currently_syncing: Arc, + sync_state: Arc, archive_mode: bool, chain: Weak, tx_pool: Arc>>, @@ -325,14 +324,17 @@ impl p2p::ChainAdapter for NetToChainAdapter { _peer_addr: SocketAddr, ) -> bool { // TODO check whether we should accept any txhashset now - if let Err(e) = - w(&self.chain).txhashset_write(h, rewind_to_output, rewind_to_kernel, txhashset_data) - { + if let Err(e) = w(&self.chain).txhashset_write( + h, + rewind_to_output, + rewind_to_kernel, + txhashset_data, + self.sync_state.as_ref(), + ) { error!(LOGGER, "Failed to save txhashset archive: {:?}", e); !e.is_bad_data() } else { info!(LOGGER, "Received valid txhashset data for {}.", h); - self.currently_syncing.store(true, Ordering::Relaxed); true } } @@ -341,14 +343,14 @@ impl p2p::ChainAdapter for NetToChainAdapter { impl NetToChainAdapter { /// Construct a new NetToChainAdapter instance pub fn new( - currently_syncing: Arc, + sync_state: Arc, archive_mode: bool, chain_ref: Weak, tx_pool: Arc>>, config: ServerConfig, ) -> NetToChainAdapter { NetToChainAdapter { - currently_syncing, + sync_state, archive_mode, chain: chain_ref, tx_pool, @@ -412,7 +414,7 @@ impl NetToChainAdapter { } Err(chain::Error::Orphan) => { // make sure we did not miss the parent block - if !chain.is_orphan(&prev_hash) && !self.currently_syncing.load(Ordering::Relaxed) { + if !chain.is_orphan(&prev_hash) && !self.sync_state.is_syncing() { debug!(LOGGER, "adapter: process_block: received an orphan block, checking the parent: {:}", prev_hash); self.request_block_by_hash(prev_hash, &addr) } @@ -448,7 +450,8 @@ impl NetToChainAdapter { // down as soon as possible. // Skip this if we are currently syncing (too slow). let chain = w(&self.chain); - if chain.head().unwrap().height > 0 && !self.currently_syncing.load(Ordering::Relaxed) + if chain.head().unwrap().height > 0 + && !self.sync_state.is_syncing() && self.config.chain_validation_mode == ChainValidationMode::EveryBlock { let now = Instant::now(); @@ -473,7 +476,7 @@ impl NetToChainAdapter { fn check_compact(&self, tip_res: Option) { // no compaction during sync or if we're in historical mode - if self.archive_mode || self.currently_syncing.load(Ordering::Relaxed) { + if self.archive_mode || self.sync_state.is_syncing() { return; } @@ -541,7 +544,7 @@ impl NetToChainAdapter { /// Prepare options for the chain pipeline fn chain_opts(&self) -> chain::Options { - let opts = if self.currently_syncing.load(Ordering::Relaxed) { + let opts = if self.sync_state.is_syncing() { chain::Options::SYNC } else { chain::Options::NONE diff --git a/servers/src/common/stats.rs b/servers/src/common/stats.rs index b09abc9a9..021b119d9 100644 --- a/servers/src/common/stats.rs +++ b/servers/src/common/stats.rs @@ -20,6 +20,7 @@ use std::sync::{Arc, RwLock}; use std::time::SystemTime; use chain; +use common::types::SyncStatus; use p2p; /// Server state info collection struct, to be passed around into internals @@ -51,7 +52,7 @@ pub struct ServerStats { /// sync header head pub header_head: chain::Tip, /// Whether we're currently syncing - pub is_syncing: bool, + pub sync_status: SyncStatus, /// Whether we're awaiting peers pub awaiting_peers: bool, /// Handle to current stratum server stats diff --git a/servers/src/common/types.rs b/servers/src/common/types.rs index cd901bac1..f3a176a52 100644 --- a/servers/src/common/types.rs +++ b/servers/src/common/types.rs @@ -15,6 +15,7 @@ //! Server types use std::convert::From; +use std::sync::RwLock; use api; use chain; @@ -253,3 +254,120 @@ impl Default for StratumServerConfig { } } } + +/// Various status sync can be in, whether it's fast sync or archival. +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +#[allow(missing_docs)] +pub enum SyncStatus { + /// Not syncing + NoSync, + /// Downloading block headers + HeaderSync { + current_height: u64, + highest_height: u64, + }, + /// Downloading the various txhashsets + TxHashsetDownload, + /// Setting up before validation + TxHashsetSetup, + /// Validating the full state + TxHashsetValidation { + kernels: u64, + kernel_total: u64, + rproofs: u64, + rproof_total: u64, + }, + /// Finalizing the new state + TxHashsetSave, + /// Downloading blocks + BodySync { + current_height: u64, + highest_height: u64, + }, +} + +/// Current sync state. Encapsulates the current SyncStatus. +pub struct SyncState { + current: RwLock, +} + +impl SyncState { + /// Return a new SyncState initialize to NoSync + pub fn new() -> SyncState { + SyncState { + current: RwLock::new(SyncStatus::NoSync), + } + } + + /// Whether the current state matches any active syncing operation + pub fn is_syncing(&self) -> bool { + *self.current.read().unwrap() != SyncStatus::NoSync + } + + /// Current syncing status + pub fn status(&self) -> SyncStatus { + *self.current.read().unwrap() + } + + /// Update the syncing status + pub fn update(&self, new_status: SyncStatus) { + let mut status = self.current.write().unwrap(); + *status = new_status; + } +} + +impl chain::TxHashsetWriteStatus for SyncState { + fn on_setup(&self) { + self.update(SyncStatus::TxHashsetSetup); + } + + fn on_validation(&self, vkernels: u64, vkernel_total: u64, vrproofs: u64, vrproof_total: u64) { + let mut status = self.current.write().unwrap(); + match *status { + SyncStatus::TxHashsetValidation { + kernels, + kernel_total, + rproofs, + rproof_total, + } => { + let ks = if vkernels > 0 { vkernels } else { kernels }; + let kt = if vkernel_total > 0 { + vkernel_total + } else { + kernel_total + }; + let rps = if vrproofs > 0 { vrproofs } else { rproofs }; + let rpt = if vrproof_total > 0 { + vrproof_total + } else { + rproof_total + }; + *status = SyncStatus::TxHashsetValidation { + kernels: ks, + kernel_total: kt, + rproofs: rps, + rproof_total: rpt, + }; + } + _ => { + *status = SyncStatus::TxHashsetValidation { + kernels: 0, + kernel_total: 0, + rproofs: 0, + rproof_total: 0, + } + } + } + } + + fn on_save(&self) { + self.update(SyncStatus::TxHashsetSave); + } + + fn on_done(&self) { + self.update(SyncStatus::BodySync { + current_height: 0, + highest_height: 0, + }); + } +} diff --git a/servers/src/grin/server.rs b/servers/src/grin/server.rs index 697caf462..9ae3391fa 100644 --- a/servers/src/grin/server.rs +++ b/servers/src/grin/server.rs @@ -26,7 +26,7 @@ use chain; use common::adapters::{ChainToPoolAndNetAdapter, NetToChainAdapter, PoolToChainAdapter, PoolToNetAdapter}; use common::stats::{DiffBlock, DiffStats, PeerStats, ServerStateInfo, ServerStats}; -use common::types::{Error, Seeding, ServerConfig, StratumServerConfig}; +use common::types::{Error, Seeding, ServerConfig, StratumServerConfig, SyncState}; use core::core::hash::Hashed; use core::core::target::Difficulty; use core::{consensus, genesis, global, pow}; @@ -48,7 +48,7 @@ pub struct Server { /// in-memory transaction pool tx_pool: Arc>>, /// Whether we're currently syncing - currently_syncing: Arc, + sync_state: Arc, /// To be passed around to collect stats and info state_info: ServerStateInfo, /// Stop flag @@ -139,11 +139,11 @@ impl Server { pool_adapter.set_chain(Arc::downgrade(&shared_chain)); - let currently_syncing = Arc::new(AtomicBool::new(true)); + let sync_state = Arc::new(SyncState::new()); let awaiting_peers = Arc::new(AtomicBool::new(false)); let net_adapter = Arc::new(NetToChainAdapter::new( - currently_syncing.clone(), + sync_state.clone(), archive_mode, Arc::downgrade(&shared_chain), tx_pool.clone(), @@ -202,7 +202,7 @@ impl Server { let syncer = sync::Syncer::new(); syncer.run_sync( - currently_syncing.clone(), + sync_state.clone(), awaiting_peers.clone(), p2p_server.peers.clone(), shared_chain.clone(), @@ -241,7 +241,7 @@ impl Server { p2p: p2p_server, chain: shared_chain, tx_pool: tx_pool, - currently_syncing: currently_syncing, + sync_state, state_info: ServerStateInfo { awaiting_peers: awaiting_peers, ..Default::default() @@ -265,7 +265,7 @@ impl Server { pub fn start_stratum_server(&self, config: StratumServerConfig) { let cuckoo_size = global::sizeshift(); let proof_size = global::proofsize(); - let currently_syncing = self.currently_syncing.clone(); + let sync_state = self.sync_state.clone(); let mut stratum_server = stratumserver::StratumServer::new( config.clone(), @@ -280,7 +280,7 @@ impl Server { stratum_stats, cuckoo_size as u32, proof_size, - currently_syncing, + sync_state, ); }); } @@ -289,7 +289,7 @@ impl Server { /// internal miner, and should only be used for automated testing. Burns /// reward if wallet_listener_url is 'None' pub fn start_test_miner(&self, wallet_listener_url: Option) { - let currently_syncing = self.currently_syncing.clone(); + let sync_state = self.sync_state.clone(); let config_wallet_url = match wallet_listener_url.clone() { Some(u) => u, None => String::from("http://127.0.0.1:13415"), @@ -317,7 +317,7 @@ impl Server { // TODO push this down in the run loop so miner gets paused anytime we // decide to sync again let secs_5 = time::Duration::from_secs(5); - while currently_syncing.load(Ordering::Relaxed) { + while sync_state.is_syncing() { thread::sleep(secs_5); } miner.run_loop(wallet_listener_url); @@ -405,7 +405,7 @@ impl Server { peer_count: self.peer_count(), head: self.head(), header_head: self.header_head(), - is_syncing: self.currently_syncing.load(Ordering::Relaxed), + sync_status: self.sync_state.status(), awaiting_peers: awaiting_peers, stratum_stats: stratum_stats, peer_stats: peer_stats, diff --git a/servers/src/grin/sync.rs b/servers/src/grin/sync.rs index ab8021d71..fda2ce8cf 100644 --- a/servers/src/grin/sync.rs +++ b/servers/src/grin/sync.rs @@ -19,7 +19,7 @@ use std::{cmp, thread}; use time; use chain; -use common::types::Error; +use common::types::{Error, SyncState, SyncStatus}; use core::core::hash::{Hash, Hashed}; use core::core::target::Difficulty; use core::global; @@ -36,7 +36,7 @@ impl Syncer { pub fn run_sync( &self, - currently_syncing: Arc, + sync_state: Arc, awaiting_peers: Arc, peers: Arc, chain: Arc, @@ -45,7 +45,7 @@ impl Syncer { stop: Arc, ) { sync::run_sync( - currently_syncing, + sync_state, awaiting_peers, peers, chain, @@ -58,7 +58,7 @@ impl Syncer { /// Starts the syncing loop, just spawns two threads that loop forever pub fn run_sync( - currently_syncing: Arc, + sync_state: Arc, awaiting_peers: Arc, peers: Arc, chain: Arc, @@ -98,7 +98,7 @@ pub fn run_sync( // is syncing generally needed when we compare our state with others let (syncing, most_work_height) = - needs_syncing(currently_syncing.as_ref(), peers.clone(), chain.clone()); + needs_syncing(sync_state.as_ref(), peers.clone(), chain.clone()); if most_work_height > 0 { // we can occasionally get a most work height of 0 if read locks fail @@ -112,23 +112,26 @@ pub fn run_sync( // run the header sync every 10s if si.header_sync_due(&header_head) { header_sync(peers.clone(), chain.clone()); + sync_state.update(SyncStatus::HeaderSync{current_height: header_head.height, highest_height: si.highest_height}); } if fast_sync_enabled { // run fast sync if applicable, every 5 min if header_head.height == si.highest_height && si.fast_sync_due() { fast_sync(peers.clone(), chain.clone(), &header_head); + sync_state.update(SyncStatus::TxHashsetDownload); } } else { // run the body_sync every 5s if si.body_sync_due(&head) { body_sync(peers.clone(), chain.clone()); + sync_state.update(SyncStatus::BodySync{current_height: head.height, highest_height: si.highest_height}); } } + } else { + sync_state.update(SyncStatus::NoSync); } - currently_syncing.store(syncing, Ordering::Relaxed); - thread::sleep(Duration::from_secs(1)); if stop.load(Ordering::Relaxed) { @@ -275,13 +278,13 @@ fn request_headers(peer: &Peer, chain: Arc) { /// Whether we're currently syncing the chain or we're fully caught up and /// just receiving blocks through gossip. fn needs_syncing( - currently_syncing: &AtomicBool, + sync_state: &SyncState, peers: Arc, chain: Arc, ) -> (bool, u64) { let local_diff = chain.total_difficulty(); let peer = peers.most_work_peer(); - let is_syncing = currently_syncing.load(Ordering::Relaxed); + let is_syncing = sync_state.is_syncing(); let mut most_work_height = 0; // if we're already syncing, we're caught up if no peer has a higher diff --git a/servers/src/mining/stratumserver.rs b/servers/src/mining/stratumserver.rs index 9c5d3cf9a..d22a6d322 100644 --- a/servers/src/mining/stratumserver.rs +++ b/servers/src/mining/stratumserver.rs @@ -19,7 +19,6 @@ use serde_json::Value; use std::error::Error; use std::io::{BufRead, ErrorKind, Write}; use std::net::{TcpListener, TcpStream}; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex, RwLock}; use std::time::{Duration, SystemTime}; use std::{cmp, thread}; @@ -28,7 +27,7 @@ use time; use chain; use common::adapters::PoolToChainAdapter; use common::stats::{StratumStats, WorkerStats}; -use common::types::StratumServerConfig; +use common::types::{StratumServerConfig, SyncState}; use core::core::{Block, BlockHeader}; use core::{consensus, pow}; use keychain; @@ -236,7 +235,7 @@ pub struct StratumServer { minimum_share_difficulty: u64, current_key_id: Option, workers: Arc>>, - currently_syncing: Arc, + sync_state: Arc, } impl StratumServer { @@ -256,7 +255,7 @@ impl StratumServer { current_difficulty: ::max_value(), current_key_id: None, workers: Arc::new(Mutex::new(Vec::new())), - currently_syncing: Arc::new(AtomicBool::new(false)), + sync_state: Arc::new(SyncState::new()), } } @@ -322,7 +321,7 @@ impl StratumServer { } "keepalive" => self.handle_keepalive(), "getjobtemplate" => { - if self.currently_syncing.load(Ordering::Relaxed) { + if self.sync_state.is_syncing() { let e = RpcError { code: -32701, message: "Node is syncing - Please wait".to_string(), @@ -625,7 +624,7 @@ impl StratumServer { stratum_stats: Arc>, cuckoo_size: u32, proof_size: usize, - currently_syncing: Arc, + sync_state: Arc, ) { info!( LOGGER, @@ -635,7 +634,7 @@ impl StratumServer { proof_size ); - self.currently_syncing = currently_syncing; + self.sync_state = sync_state; // "globals" for this function let attempt_time_per_block = self.config.attempt_time_per_block; @@ -675,7 +674,7 @@ impl StratumServer { loop { // If we're fallen into sync mode, (or are just starting up, // tell connected clients to stop what they're doing - let mining_stopped = self.currently_syncing.load(Ordering::Relaxed); + let mining_stopped = self.sync_state.is_syncing(); // Remove workers with failed connections num_workers = self.clean_workers(&mut stratum_stats.clone()); diff --git a/src/bin/tui/status.rs b/src/bin/tui/status.rs index 95bc04ccc..0ba1133d4 100644 --- a/src/bin/tui/status.rs +++ b/src/bin/tui/status.rs @@ -23,6 +23,7 @@ use cursive::Cursive; use tui::constants::VIEW_BASIC_STATUS; use tui::types::TUIStatusListener; +use servers::common::types::SyncStatus; use servers::ServerStats; pub struct TUIStatusView; @@ -76,14 +77,63 @@ impl TUIStatusListener for TUIStatusView { fn update(c: &mut Cursive, stats: &ServerStats) { //find and update here as needed let basic_status = { - if stats.is_syncing { - if stats.awaiting_peers { - "Waiting for peers".to_string() - } else { - format!("Syncing - Latest header: {}", stats.header_head.height).to_string() - } + if stats.awaiting_peers { + "Waiting for peers".to_string() } else { - "Running".to_string() + match stats.sync_status { + SyncStatus::NoSync => "Running".to_string(), + SyncStatus::HeaderSync { + current_height, + highest_height, + } => { + let percent = if highest_height == 0 { + 0 + } else { + current_height * 100 / highest_height + }; + format!("Downloading headers: {}%, step 1/4", percent) + } + SyncStatus::TxHashsetDownload => { + "Downloading chain state for fast sync, step 2/4".to_string() + } + SyncStatus::TxHashsetSetup => { + "Preparing chain state for validation, step 3/4".to_string() + } + SyncStatus::TxHashsetValidation { + kernels, + kernel_total, + rproofs, + rproof_total, + } => { + // 10% of overall progress is attributed to kernel validation + // 90% to range proofs (which are much longer) + let mut percent = if kernel_total > 0 { + kernels * 10 / kernel_total + } else { + 0 + }; + percent += if rproof_total > 0 { + rproofs * 90 / rproof_total + } else { + 0 + }; + format!("Validating chain state: {}%, step 3/4", percent) + } + SyncStatus::TxHashsetSave => { + "Finalizing chain state for fast sync, step 3/4".to_string() + } + SyncStatus::BodySync { + current_height, + highest_height, + } => { + let percent = if highest_height == 0 { + 0 + } else { + current_height * 100 / highest_height + }; + format!("Downloading blocks: {}%, step 4/4", percent) + } + } } }; /*let basic_mining_config_status = {