Introduce state to track sync stages (#1210)

* A new sync status enum encapsulated in a state struct allows tracking of where sync is at. Leveraging it in the TUI to provide more helpful messages.
* Percentage progression for most sync steps
This commit is contained in:
Ignotus Peverell 2018-07-01 22:33:47 +01:00 committed by GitHub
parent 9654ee237b
commit 8e1b602516
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 299 additions and 69 deletions

View file

@ -30,7 +30,10 @@ use grin_store::Error::NotFoundErr;
use pipe; use pipe;
use store; use store;
use txhashset; use txhashset;
use types::{BlockMarker, BlockSums, ChainAdapter, ChainStore, Error, Options, Tip}; use types::{
BlockMarker, BlockSums, ChainAdapter, ChainStore, Error, NoStatus, Options, Tip,
TxHashsetWriteStatus,
};
use util::secp::pedersen::{Commitment, RangeProof}; use util::secp::pedersen::{Commitment, RangeProof};
use util::LOGGER; use util::LOGGER;
@ -532,7 +535,7 @@ impl Chain {
txhashset::extending_readonly(&mut txhashset, |extension| { txhashset::extending_readonly(&mut txhashset, |extension| {
// TODO - is this rewind guaranteed to be redundant now? // TODO - is this rewind guaranteed to be redundant now?
extension.rewind(&header, &header)?; extension.rewind(&header, &header)?;
extension.validate(&header, skip_rproofs)?; extension.validate(&header, skip_rproofs, &NoStatus)?;
Ok(()) Ok(())
}) })
} }
@ -621,14 +624,19 @@ impl Chain {
/// If we're willing to accept that new state, the data stream will be /// If we're willing to accept that new state, the data stream will be
/// read as a zip file, unzipped and the resulting state files should be /// read as a zip file, unzipped and the resulting state files should be
/// rewound to the provided indexes. /// rewound to the provided indexes.
pub fn txhashset_write( pub fn txhashset_write<T>(
&self, &self,
h: Hash, h: Hash,
rewind_to_output: u64, rewind_to_output: u64,
rewind_to_kernel: u64, rewind_to_kernel: u64,
txhashset_data: File, txhashset_data: File,
) -> Result<(), Error> { status: &T,
let _lock = self.txhashset_lock.lock().unwrap(); ) -> Result<(), Error>
where
T: TxHashsetWriteStatus,
{
self.txhashset_lock.lock().unwrap();
status.on_setup();
let head = self.head().unwrap(); let head = self.head().unwrap();
let header_head = self.get_header_head().unwrap(); let header_head = self.get_header_head().unwrap();
if header_head.height - head.height < global::cut_through_horizon() as u64 { if header_head.height - head.height < global::cut_through_horizon() as u64 {
@ -655,16 +663,18 @@ impl Chain {
txhashset::TxHashSet::open(self.db_root.clone(), self.store.clone(), Some(&header))?; txhashset::TxHashSet::open(self.db_root.clone(), self.store.clone(), Some(&header))?;
// Note: we are validating against a writeable extension. // Note: we are validating against a writeable extension.
status.on_validation(0, 0, 0, 0);
txhashset::extending(&mut txhashset, |extension| { txhashset::extending(&mut txhashset, |extension| {
// TODO do we need to rewind here? We have no blocks to rewind // TODO do we need to rewind here? We have no blocks to rewind
// (and we need them for the pos to unremove) // (and we need them for the pos to unremove)
extension.rewind(&header, &header)?; extension.rewind(&header, &header)?;
let (output_sum, kernel_sum) = extension.validate(&header, false)?; let (output_sum, kernel_sum) = extension.validate(&header, false, status)?;
extension.save_latest_block_sums(&header, output_sum, kernel_sum)?; extension.save_latest_block_sums(&header, output_sum, kernel_sum)?;
extension.rebuild_index()?; extension.rebuild_index()?;
Ok(()) Ok(())
})?; })?;
status.on_save();
// replace the chain txhashset with the newly built one // replace the chain txhashset with the newly built one
{ {
let mut txhashset_ref = self.txhashset.write().unwrap(); let mut txhashset_ref = self.txhashset.write().unwrap();
@ -682,6 +692,7 @@ impl Chain {
self.check_orphans(header.height + 1); self.check_orphans(header.height + 1);
status.on_done();
Ok(()) Ok(())
} }

View file

@ -46,4 +46,4 @@ pub mod types;
// Re-export the base interface // Re-export the base interface
pub use chain::{Chain, MAX_ORPHAN_SIZE}; pub use chain::{Chain, MAX_ORPHAN_SIZE};
pub use types::{BlockSums, ChainAdapter, ChainStore, Error, Options, Tip}; pub use types::{BlockSums, ChainAdapter, ChainStore, Error, Options, Tip, TxHashsetWriteStatus};

View file

@ -29,15 +29,16 @@ use util::secp::pedersen::{Commitment, RangeProof};
use core::core::committed::Committed; use core::core::committed::Committed;
use core::core::hash::{Hash, Hashed}; use core::core::hash::{Hash, Hashed};
use core::core::pmmr::{self, MerkleProof, PMMR}; use core::core::pmmr::{self, MerkleProof, PMMR};
use core::core::{Block, BlockHeader, Input, Output, OutputFeatures, OutputIdentifier, Transaction, use core::core::{
TxKernel}; Block, BlockHeader, Input, Output, OutputFeatures, OutputIdentifier, Transaction, TxKernel,
};
use core::global; use core::global;
use core::ser::{PMMRIndexHashable, PMMRable}; use core::ser::{PMMRIndexHashable, PMMRable};
use grin_store; use grin_store;
use grin_store::pmmr::PMMRBackend; use grin_store::pmmr::PMMRBackend;
use grin_store::types::prune_noop; use grin_store::types::prune_noop;
use types::{BlockMarker, BlockSums, ChainStore, Error, TxHashSetRoots}; use types::{BlockMarker, BlockSums, ChainStore, Error, TxHashSetRoots, TxHashsetWriteStatus};
use util::{secp_static, zip, LOGGER}; use util::{secp_static, zip, LOGGER};
const TXHASHSET_SUBDIR: &'static str = "txhashset"; const TXHASHSET_SUBDIR: &'static str = "txhashset";
@ -832,7 +833,8 @@ impl<'a> Extension<'a> {
} }
let roots = self.roots(); let roots = self.roots();
if roots.output_root != header.output_root || roots.rproof_root != header.range_proof_root if roots.output_root != header.output_root
|| roots.rproof_root != header.range_proof_root
|| roots.kernel_root != header.kernel_root || roots.kernel_root != header.kernel_root
{ {
return Err(Error::InvalidRoot); return Err(Error::InvalidRoot);
@ -864,11 +866,15 @@ impl<'a> Extension<'a> {
} }
/// Validate the txhashset state against the provided block header. /// Validate the txhashset state against the provided block header.
pub fn validate( pub fn validate<T>(
&mut self, &mut self,
header: &BlockHeader, header: &BlockHeader,
skip_rproofs: bool, skip_rproofs: bool,
) -> Result<((Commitment, Commitment)), Error> { status: &T,
) -> Result<((Commitment, Commitment)), Error>
where
T: TxHashsetWriteStatus,
{
self.validate_mmrs()?; self.validate_mmrs()?;
self.validate_roots(header)?; self.validate_roots(header)?;
@ -877,9 +883,8 @@ impl<'a> Extension<'a> {
return Ok((zero_commit.clone(), zero_commit.clone())); return Ok((zero_commit.clone(), zero_commit.clone()));
} }
// The real magicking happens here. // The real magicking happens here. Sum of kernel excesses should equal sum
// Sum of kernel excesses should equal sum of // of unspent outputs minus total supply.
// unspent outputs minus total supply.
let (output_sum, kernel_sum) = self.verify_kernel_sums( let (output_sum, kernel_sum) = self.verify_kernel_sums(
header.total_overage(), header.total_overage(),
header.total_kernel_offset(), header.total_kernel_offset(),
@ -888,12 +893,12 @@ impl<'a> Extension<'a> {
)?; )?;
// This is an expensive verification step. // This is an expensive verification step.
self.verify_kernel_signatures()?; self.verify_kernel_signatures(status)?;
// Verify the rangeproof for each output in the sum above. // Verify the rangeproof for each output in the sum above.
// This is an expensive verification step (skip for faster verification). // This is an expensive verification step (skip for faster verification).
if !skip_rproofs { if !skip_rproofs {
self.verify_rangeproofs()?; self.verify_rangeproofs(status)?;
} }
Ok((output_sum, kernel_sum)) Ok((output_sum, kernel_sum))
@ -969,10 +974,14 @@ impl<'a> Extension<'a> {
) )
} }
fn verify_kernel_signatures(&self) -> Result<(), Error> { fn verify_kernel_signatures<T>(&self, status: &T) -> Result<(), Error>
where
T: TxHashsetWriteStatus,
{
let now = Instant::now(); let now = Instant::now();
let mut kern_count = 0; let mut kern_count = 0;
let total_kernels = pmmr::n_leaves(self.kernel_pmmr.unpruned_size());
for n in 1..self.kernel_pmmr.unpruned_size() + 1 { for n in 1..self.kernel_pmmr.unpruned_size() + 1 {
if pmmr::is_leaf(n) { if pmmr::is_leaf(n) {
if let Some(kernel) = self.kernel_pmmr.get_data(n) { if let Some(kernel) = self.kernel_pmmr.get_data(n) {
@ -980,6 +989,9 @@ impl<'a> Extension<'a> {
kern_count += 1; kern_count += 1;
} }
} }
if n % 20 == 0 {
status.on_validation(kern_count, total_kernels, 0, 0);
}
} }
debug!( debug!(
@ -993,10 +1005,14 @@ impl<'a> Extension<'a> {
Ok(()) Ok(())
} }
fn verify_rangeproofs(&self) -> Result<(), Error> { fn verify_rangeproofs<T>(&self, status: &T) -> Result<(), Error>
where
T: TxHashsetWriteStatus,
{
let now = Instant::now(); let now = Instant::now();
let mut proof_count = 0; let mut proof_count = 0;
let total_rproofs = pmmr::n_leaves(self.output_pmmr.unpruned_size());
for n in 1..self.output_pmmr.unpruned_size() + 1 { for n in 1..self.output_pmmr.unpruned_size() + 1 {
if pmmr::is_leaf(n) { if pmmr::is_leaf(n) {
if let Some(out) = self.output_pmmr.get_data(n) { if let Some(out) = self.output_pmmr.get_data(n) {
@ -1016,6 +1032,9 @@ impl<'a> Extension<'a> {
} }
} }
} }
if n % 20 == 0 {
status.on_validation(0, 0, proof_count, total_rproofs);
}
} }
debug!( debug!(
LOGGER, LOGGER,

View file

@ -375,6 +375,32 @@ pub trait ChainAdapter {
fn block_accepted(&self, b: &Block, opts: Options); fn block_accepted(&self, b: &Block, opts: Options);
} }
/// Inform the caller of the current status of a txhashset write operation,
/// as it can take quite a while to process. Each function is called in the
/// order defined below and can be used to provide some feedback to the
/// caller. Functions taking arguments can be called repeatedly to update
/// those values as the processing progresses.
pub trait TxHashsetWriteStatus {
/// First setup of the txhashset
fn on_setup(&self);
/// Starting validation
fn on_validation(&self, kernels: u64, kernel_total: u64, rproofs: u64, rproof_total: u64);
/// Starting to save the txhashset and related data
fn on_save(&self);
/// Done writing a new txhashset
fn on_done(&self);
}
/// Do-nothing implementation of TxHashsetWriteStatus
pub struct NoStatus;
impl TxHashsetWriteStatus for NoStatus {
fn on_setup(&self) {}
fn on_validation(&self, _ks: u64, _kts: u64, _rs: u64, _rt: u64) {}
fn on_save(&self) {}
fn on_done(&self) {}
}
/// Dummy adapter used as a placeholder for real implementations /// Dummy adapter used as a placeholder for real implementations
pub struct NoopAdapter {} pub struct NoopAdapter {}

View file

@ -19,13 +19,12 @@ use rand::{self, Rng};
use std::fs::File; use std::fs::File;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::ops::Deref; use std::ops::Deref;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock, Weak}; use std::sync::{Arc, RwLock, Weak};
use std::thread; use std::thread;
use std::time::Instant; use std::time::Instant;
use chain::{self, ChainAdapter, Options, Tip}; use chain::{self, ChainAdapter, Options, Tip};
use common::types::{ChainValidationMode, ServerConfig}; use common::types::{ChainValidationMode, ServerConfig, SyncState};
use core::core; use core::core;
use core::core::block::BlockHeader; use core::core::block::BlockHeader;
use core::core::hash::{Hash, Hashed}; use core::core::hash::{Hash, Hashed};
@ -51,7 +50,7 @@ fn wo<T>(weak_one: &OneTime<Weak<T>>) -> Arc<T> {
/// blocks and transactions are received and forwards to the chain and pool /// blocks and transactions are received and forwards to the chain and pool
/// implementations. /// implementations.
pub struct NetToChainAdapter { pub struct NetToChainAdapter {
currently_syncing: Arc<AtomicBool>, sync_state: Arc<SyncState>,
archive_mode: bool, archive_mode: bool,
chain: Weak<chain::Chain>, chain: Weak<chain::Chain>,
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>, tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
@ -325,14 +324,17 @@ impl p2p::ChainAdapter for NetToChainAdapter {
_peer_addr: SocketAddr, _peer_addr: SocketAddr,
) -> bool { ) -> bool {
// TODO check whether we should accept any txhashset now // TODO check whether we should accept any txhashset now
if let Err(e) = if let Err(e) = w(&self.chain).txhashset_write(
w(&self.chain).txhashset_write(h, rewind_to_output, rewind_to_kernel, txhashset_data) h,
{ rewind_to_output,
rewind_to_kernel,
txhashset_data,
self.sync_state.as_ref(),
) {
error!(LOGGER, "Failed to save txhashset archive: {:?}", e); error!(LOGGER, "Failed to save txhashset archive: {:?}", e);
!e.is_bad_data() !e.is_bad_data()
} else { } else {
info!(LOGGER, "Received valid txhashset data for {}.", h); info!(LOGGER, "Received valid txhashset data for {}.", h);
self.currently_syncing.store(true, Ordering::Relaxed);
true true
} }
} }
@ -341,14 +343,14 @@ impl p2p::ChainAdapter for NetToChainAdapter {
impl NetToChainAdapter { impl NetToChainAdapter {
/// Construct a new NetToChainAdapter instance /// Construct a new NetToChainAdapter instance
pub fn new( pub fn new(
currently_syncing: Arc<AtomicBool>, sync_state: Arc<SyncState>,
archive_mode: bool, archive_mode: bool,
chain_ref: Weak<chain::Chain>, chain_ref: Weak<chain::Chain>,
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>, tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
config: ServerConfig, config: ServerConfig,
) -> NetToChainAdapter { ) -> NetToChainAdapter {
NetToChainAdapter { NetToChainAdapter {
currently_syncing, sync_state,
archive_mode, archive_mode,
chain: chain_ref, chain: chain_ref,
tx_pool, tx_pool,
@ -412,7 +414,7 @@ impl NetToChainAdapter {
} }
Err(chain::Error::Orphan) => { Err(chain::Error::Orphan) => {
// make sure we did not miss the parent block // make sure we did not miss the parent block
if !chain.is_orphan(&prev_hash) && !self.currently_syncing.load(Ordering::Relaxed) { if !chain.is_orphan(&prev_hash) && !self.sync_state.is_syncing() {
debug!(LOGGER, "adapter: process_block: received an orphan block, checking the parent: {:}", prev_hash); debug!(LOGGER, "adapter: process_block: received an orphan block, checking the parent: {:}", prev_hash);
self.request_block_by_hash(prev_hash, &addr) self.request_block_by_hash(prev_hash, &addr)
} }
@ -448,7 +450,8 @@ impl NetToChainAdapter {
// down as soon as possible. // down as soon as possible.
// Skip this if we are currently syncing (too slow). // Skip this if we are currently syncing (too slow).
let chain = w(&self.chain); let chain = w(&self.chain);
if chain.head().unwrap().height > 0 && !self.currently_syncing.load(Ordering::Relaxed) if chain.head().unwrap().height > 0
&& !self.sync_state.is_syncing()
&& self.config.chain_validation_mode == ChainValidationMode::EveryBlock && self.config.chain_validation_mode == ChainValidationMode::EveryBlock
{ {
let now = Instant::now(); let now = Instant::now();
@ -473,7 +476,7 @@ impl NetToChainAdapter {
fn check_compact(&self, tip_res: Option<Tip>) { fn check_compact(&self, tip_res: Option<Tip>) {
// no compaction during sync or if we're in historical mode // no compaction during sync or if we're in historical mode
if self.archive_mode || self.currently_syncing.load(Ordering::Relaxed) { if self.archive_mode || self.sync_state.is_syncing() {
return; return;
} }
@ -541,7 +544,7 @@ impl NetToChainAdapter {
/// Prepare options for the chain pipeline /// Prepare options for the chain pipeline
fn chain_opts(&self) -> chain::Options { fn chain_opts(&self) -> chain::Options {
let opts = if self.currently_syncing.load(Ordering::Relaxed) { let opts = if self.sync_state.is_syncing() {
chain::Options::SYNC chain::Options::SYNC
} else { } else {
chain::Options::NONE chain::Options::NONE

View file

@ -20,6 +20,7 @@ use std::sync::{Arc, RwLock};
use std::time::SystemTime; use std::time::SystemTime;
use chain; use chain;
use common::types::SyncStatus;
use p2p; use p2p;
/// Server state info collection struct, to be passed around into internals /// Server state info collection struct, to be passed around into internals
@ -51,7 +52,7 @@ pub struct ServerStats {
/// sync header head /// sync header head
pub header_head: chain::Tip, pub header_head: chain::Tip,
/// Whether we're currently syncing /// Whether we're currently syncing
pub is_syncing: bool, pub sync_status: SyncStatus,
/// Whether we're awaiting peers /// Whether we're awaiting peers
pub awaiting_peers: bool, pub awaiting_peers: bool,
/// Handle to current stratum server stats /// Handle to current stratum server stats

View file

@ -15,6 +15,7 @@
//! Server types //! Server types
use std::convert::From; use std::convert::From;
use std::sync::RwLock;
use api; use api;
use chain; use chain;
@ -253,3 +254,120 @@ impl Default for StratumServerConfig {
} }
} }
} }
/// Various status sync can be in, whether it's fast sync or archival.
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
#[allow(missing_docs)]
pub enum SyncStatus {
/// Not syncing
NoSync,
/// Downloading block headers
HeaderSync {
current_height: u64,
highest_height: u64,
},
/// Downloading the various txhashsets
TxHashsetDownload,
/// Setting up before validation
TxHashsetSetup,
/// Validating the full state
TxHashsetValidation {
kernels: u64,
kernel_total: u64,
rproofs: u64,
rproof_total: u64,
},
/// Finalizing the new state
TxHashsetSave,
/// Downloading blocks
BodySync {
current_height: u64,
highest_height: u64,
},
}
/// Current sync state. Encapsulates the current SyncStatus.
pub struct SyncState {
current: RwLock<SyncStatus>,
}
impl SyncState {
/// Return a new SyncState initialize to NoSync
pub fn new() -> SyncState {
SyncState {
current: RwLock::new(SyncStatus::NoSync),
}
}
/// Whether the current state matches any active syncing operation
pub fn is_syncing(&self) -> bool {
*self.current.read().unwrap() != SyncStatus::NoSync
}
/// Current syncing status
pub fn status(&self) -> SyncStatus {
*self.current.read().unwrap()
}
/// Update the syncing status
pub fn update(&self, new_status: SyncStatus) {
let mut status = self.current.write().unwrap();
*status = new_status;
}
}
impl chain::TxHashsetWriteStatus for SyncState {
fn on_setup(&self) {
self.update(SyncStatus::TxHashsetSetup);
}
fn on_validation(&self, vkernels: u64, vkernel_total: u64, vrproofs: u64, vrproof_total: u64) {
let mut status = self.current.write().unwrap();
match *status {
SyncStatus::TxHashsetValidation {
kernels,
kernel_total,
rproofs,
rproof_total,
} => {
let ks = if vkernels > 0 { vkernels } else { kernels };
let kt = if vkernel_total > 0 {
vkernel_total
} else {
kernel_total
};
let rps = if vrproofs > 0 { vrproofs } else { rproofs };
let rpt = if vrproof_total > 0 {
vrproof_total
} else {
rproof_total
};
*status = SyncStatus::TxHashsetValidation {
kernels: ks,
kernel_total: kt,
rproofs: rps,
rproof_total: rpt,
};
}
_ => {
*status = SyncStatus::TxHashsetValidation {
kernels: 0,
kernel_total: 0,
rproofs: 0,
rproof_total: 0,
}
}
}
}
fn on_save(&self) {
self.update(SyncStatus::TxHashsetSave);
}
fn on_done(&self) {
self.update(SyncStatus::BodySync {
current_height: 0,
highest_height: 0,
});
}
}

View file

@ -26,7 +26,7 @@ use chain;
use common::adapters::{ChainToPoolAndNetAdapter, NetToChainAdapter, PoolToChainAdapter, use common::adapters::{ChainToPoolAndNetAdapter, NetToChainAdapter, PoolToChainAdapter,
PoolToNetAdapter}; PoolToNetAdapter};
use common::stats::{DiffBlock, DiffStats, PeerStats, ServerStateInfo, ServerStats}; use common::stats::{DiffBlock, DiffStats, PeerStats, ServerStateInfo, ServerStats};
use common::types::{Error, Seeding, ServerConfig, StratumServerConfig}; use common::types::{Error, Seeding, ServerConfig, StratumServerConfig, SyncState};
use core::core::hash::Hashed; use core::core::hash::Hashed;
use core::core::target::Difficulty; use core::core::target::Difficulty;
use core::{consensus, genesis, global, pow}; use core::{consensus, genesis, global, pow};
@ -48,7 +48,7 @@ pub struct Server {
/// in-memory transaction pool /// in-memory transaction pool
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>, tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
/// Whether we're currently syncing /// Whether we're currently syncing
currently_syncing: Arc<AtomicBool>, sync_state: Arc<SyncState>,
/// To be passed around to collect stats and info /// To be passed around to collect stats and info
state_info: ServerStateInfo, state_info: ServerStateInfo,
/// Stop flag /// Stop flag
@ -139,11 +139,11 @@ impl Server {
pool_adapter.set_chain(Arc::downgrade(&shared_chain)); pool_adapter.set_chain(Arc::downgrade(&shared_chain));
let currently_syncing = Arc::new(AtomicBool::new(true)); let sync_state = Arc::new(SyncState::new());
let awaiting_peers = Arc::new(AtomicBool::new(false)); let awaiting_peers = Arc::new(AtomicBool::new(false));
let net_adapter = Arc::new(NetToChainAdapter::new( let net_adapter = Arc::new(NetToChainAdapter::new(
currently_syncing.clone(), sync_state.clone(),
archive_mode, archive_mode,
Arc::downgrade(&shared_chain), Arc::downgrade(&shared_chain),
tx_pool.clone(), tx_pool.clone(),
@ -202,7 +202,7 @@ impl Server {
let syncer = sync::Syncer::new(); let syncer = sync::Syncer::new();
syncer.run_sync( syncer.run_sync(
currently_syncing.clone(), sync_state.clone(),
awaiting_peers.clone(), awaiting_peers.clone(),
p2p_server.peers.clone(), p2p_server.peers.clone(),
shared_chain.clone(), shared_chain.clone(),
@ -241,7 +241,7 @@ impl Server {
p2p: p2p_server, p2p: p2p_server,
chain: shared_chain, chain: shared_chain,
tx_pool: tx_pool, tx_pool: tx_pool,
currently_syncing: currently_syncing, sync_state,
state_info: ServerStateInfo { state_info: ServerStateInfo {
awaiting_peers: awaiting_peers, awaiting_peers: awaiting_peers,
..Default::default() ..Default::default()
@ -265,7 +265,7 @@ impl Server {
pub fn start_stratum_server(&self, config: StratumServerConfig) { pub fn start_stratum_server(&self, config: StratumServerConfig) {
let cuckoo_size = global::sizeshift(); let cuckoo_size = global::sizeshift();
let proof_size = global::proofsize(); let proof_size = global::proofsize();
let currently_syncing = self.currently_syncing.clone(); let sync_state = self.sync_state.clone();
let mut stratum_server = stratumserver::StratumServer::new( let mut stratum_server = stratumserver::StratumServer::new(
config.clone(), config.clone(),
@ -280,7 +280,7 @@ impl Server {
stratum_stats, stratum_stats,
cuckoo_size as u32, cuckoo_size as u32,
proof_size, proof_size,
currently_syncing, sync_state,
); );
}); });
} }
@ -289,7 +289,7 @@ impl Server {
/// internal miner, and should only be used for automated testing. Burns /// internal miner, and should only be used for automated testing. Burns
/// reward if wallet_listener_url is 'None' /// reward if wallet_listener_url is 'None'
pub fn start_test_miner(&self, wallet_listener_url: Option<String>) { pub fn start_test_miner(&self, wallet_listener_url: Option<String>) {
let currently_syncing = self.currently_syncing.clone(); let sync_state = self.sync_state.clone();
let config_wallet_url = match wallet_listener_url.clone() { let config_wallet_url = match wallet_listener_url.clone() {
Some(u) => u, Some(u) => u,
None => String::from("http://127.0.0.1:13415"), None => String::from("http://127.0.0.1:13415"),
@ -317,7 +317,7 @@ impl Server {
// TODO push this down in the run loop so miner gets paused anytime we // TODO push this down in the run loop so miner gets paused anytime we
// decide to sync again // decide to sync again
let secs_5 = time::Duration::from_secs(5); let secs_5 = time::Duration::from_secs(5);
while currently_syncing.load(Ordering::Relaxed) { while sync_state.is_syncing() {
thread::sleep(secs_5); thread::sleep(secs_5);
} }
miner.run_loop(wallet_listener_url); miner.run_loop(wallet_listener_url);
@ -405,7 +405,7 @@ impl Server {
peer_count: self.peer_count(), peer_count: self.peer_count(),
head: self.head(), head: self.head(),
header_head: self.header_head(), header_head: self.header_head(),
is_syncing: self.currently_syncing.load(Ordering::Relaxed), sync_status: self.sync_state.status(),
awaiting_peers: awaiting_peers, awaiting_peers: awaiting_peers,
stratum_stats: stratum_stats, stratum_stats: stratum_stats,
peer_stats: peer_stats, peer_stats: peer_stats,

View file

@ -19,7 +19,7 @@ use std::{cmp, thread};
use time; use time;
use chain; use chain;
use common::types::Error; use common::types::{Error, SyncState, SyncStatus};
use core::core::hash::{Hash, Hashed}; use core::core::hash::{Hash, Hashed};
use core::core::target::Difficulty; use core::core::target::Difficulty;
use core::global; use core::global;
@ -36,7 +36,7 @@ impl Syncer {
pub fn run_sync( pub fn run_sync(
&self, &self,
currently_syncing: Arc<AtomicBool>, sync_state: Arc<SyncState>,
awaiting_peers: Arc<AtomicBool>, awaiting_peers: Arc<AtomicBool>,
peers: Arc<p2p::Peers>, peers: Arc<p2p::Peers>,
chain: Arc<chain::Chain>, chain: Arc<chain::Chain>,
@ -45,7 +45,7 @@ impl Syncer {
stop: Arc<AtomicBool>, stop: Arc<AtomicBool>,
) { ) {
sync::run_sync( sync::run_sync(
currently_syncing, sync_state,
awaiting_peers, awaiting_peers,
peers, peers,
chain, chain,
@ -58,7 +58,7 @@ impl Syncer {
/// Starts the syncing loop, just spawns two threads that loop forever /// Starts the syncing loop, just spawns two threads that loop forever
pub fn run_sync( pub fn run_sync(
currently_syncing: Arc<AtomicBool>, sync_state: Arc<SyncState>,
awaiting_peers: Arc<AtomicBool>, awaiting_peers: Arc<AtomicBool>,
peers: Arc<p2p::Peers>, peers: Arc<p2p::Peers>,
chain: Arc<chain::Chain>, chain: Arc<chain::Chain>,
@ -98,7 +98,7 @@ pub fn run_sync(
// is syncing generally needed when we compare our state with others // is syncing generally needed when we compare our state with others
let (syncing, most_work_height) = let (syncing, most_work_height) =
needs_syncing(currently_syncing.as_ref(), peers.clone(), chain.clone()); needs_syncing(sync_state.as_ref(), peers.clone(), chain.clone());
if most_work_height > 0 { if most_work_height > 0 {
// we can occasionally get a most work height of 0 if read locks fail // we can occasionally get a most work height of 0 if read locks fail
@ -112,23 +112,26 @@ pub fn run_sync(
// run the header sync every 10s // run the header sync every 10s
if si.header_sync_due(&header_head) { if si.header_sync_due(&header_head) {
header_sync(peers.clone(), chain.clone()); header_sync(peers.clone(), chain.clone());
sync_state.update(SyncStatus::HeaderSync{current_height: header_head.height, highest_height: si.highest_height});
} }
if fast_sync_enabled { if fast_sync_enabled {
// run fast sync if applicable, every 5 min // run fast sync if applicable, every 5 min
if header_head.height == si.highest_height && si.fast_sync_due() { if header_head.height == si.highest_height && si.fast_sync_due() {
fast_sync(peers.clone(), chain.clone(), &header_head); fast_sync(peers.clone(), chain.clone(), &header_head);
sync_state.update(SyncStatus::TxHashsetDownload);
} }
} else { } else {
// run the body_sync every 5s // run the body_sync every 5s
if si.body_sync_due(&head) { if si.body_sync_due(&head) {
body_sync(peers.clone(), chain.clone()); body_sync(peers.clone(), chain.clone());
sync_state.update(SyncStatus::BodySync{current_height: head.height, highest_height: si.highest_height});
} }
} }
} else {
sync_state.update(SyncStatus::NoSync);
} }
currently_syncing.store(syncing, Ordering::Relaxed);
thread::sleep(Duration::from_secs(1)); thread::sleep(Duration::from_secs(1));
if stop.load(Ordering::Relaxed) { if stop.load(Ordering::Relaxed) {
@ -275,13 +278,13 @@ fn request_headers(peer: &Peer, chain: Arc<chain::Chain>) {
/// Whether we're currently syncing the chain or we're fully caught up and /// Whether we're currently syncing the chain or we're fully caught up and
/// just receiving blocks through gossip. /// just receiving blocks through gossip.
fn needs_syncing( fn needs_syncing(
currently_syncing: &AtomicBool, sync_state: &SyncState,
peers: Arc<Peers>, peers: Arc<Peers>,
chain: Arc<chain::Chain>, chain: Arc<chain::Chain>,
) -> (bool, u64) { ) -> (bool, u64) {
let local_diff = chain.total_difficulty(); let local_diff = chain.total_difficulty();
let peer = peers.most_work_peer(); let peer = peers.most_work_peer();
let is_syncing = currently_syncing.load(Ordering::Relaxed); let is_syncing = sync_state.is_syncing();
let mut most_work_height = 0; let mut most_work_height = 0;
// if we're already syncing, we're caught up if no peer has a higher // if we're already syncing, we're caught up if no peer has a higher

View file

@ -19,7 +19,6 @@ use serde_json::Value;
use std::error::Error; use std::error::Error;
use std::io::{BufRead, ErrorKind, Write}; use std::io::{BufRead, ErrorKind, Write};
use std::net::{TcpListener, TcpStream}; use std::net::{TcpListener, TcpStream};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, SystemTime}; use std::time::{Duration, SystemTime};
use std::{cmp, thread}; use std::{cmp, thread};
@ -28,7 +27,7 @@ use time;
use chain; use chain;
use common::adapters::PoolToChainAdapter; use common::adapters::PoolToChainAdapter;
use common::stats::{StratumStats, WorkerStats}; use common::stats::{StratumStats, WorkerStats};
use common::types::StratumServerConfig; use common::types::{StratumServerConfig, SyncState};
use core::core::{Block, BlockHeader}; use core::core::{Block, BlockHeader};
use core::{consensus, pow}; use core::{consensus, pow};
use keychain; use keychain;
@ -236,7 +235,7 @@ pub struct StratumServer {
minimum_share_difficulty: u64, minimum_share_difficulty: u64,
current_key_id: Option<keychain::Identifier>, current_key_id: Option<keychain::Identifier>,
workers: Arc<Mutex<Vec<Worker>>>, workers: Arc<Mutex<Vec<Worker>>>,
currently_syncing: Arc<AtomicBool>, sync_state: Arc<SyncState>,
} }
impl StratumServer { impl StratumServer {
@ -256,7 +255,7 @@ impl StratumServer {
current_difficulty: <u64>::max_value(), current_difficulty: <u64>::max_value(),
current_key_id: None, current_key_id: None,
workers: Arc::new(Mutex::new(Vec::new())), workers: Arc::new(Mutex::new(Vec::new())),
currently_syncing: Arc::new(AtomicBool::new(false)), sync_state: Arc::new(SyncState::new()),
} }
} }
@ -322,7 +321,7 @@ impl StratumServer {
} }
"keepalive" => self.handle_keepalive(), "keepalive" => self.handle_keepalive(),
"getjobtemplate" => { "getjobtemplate" => {
if self.currently_syncing.load(Ordering::Relaxed) { if self.sync_state.is_syncing() {
let e = RpcError { let e = RpcError {
code: -32701, code: -32701,
message: "Node is syncing - Please wait".to_string(), message: "Node is syncing - Please wait".to_string(),
@ -625,7 +624,7 @@ impl StratumServer {
stratum_stats: Arc<RwLock<StratumStats>>, stratum_stats: Arc<RwLock<StratumStats>>,
cuckoo_size: u32, cuckoo_size: u32,
proof_size: usize, proof_size: usize,
currently_syncing: Arc<AtomicBool>, sync_state: Arc<SyncState>,
) { ) {
info!( info!(
LOGGER, LOGGER,
@ -635,7 +634,7 @@ impl StratumServer {
proof_size proof_size
); );
self.currently_syncing = currently_syncing; self.sync_state = sync_state;
// "globals" for this function // "globals" for this function
let attempt_time_per_block = self.config.attempt_time_per_block; let attempt_time_per_block = self.config.attempt_time_per_block;
@ -675,7 +674,7 @@ impl StratumServer {
loop { loop {
// If we're fallen into sync mode, (or are just starting up, // If we're fallen into sync mode, (or are just starting up,
// tell connected clients to stop what they're doing // tell connected clients to stop what they're doing
let mining_stopped = self.currently_syncing.load(Ordering::Relaxed); let mining_stopped = self.sync_state.is_syncing();
// Remove workers with failed connections // Remove workers with failed connections
num_workers = self.clean_workers(&mut stratum_stats.clone()); num_workers = self.clean_workers(&mut stratum_stats.clone());

View file

@ -23,6 +23,7 @@ use cursive::Cursive;
use tui::constants::VIEW_BASIC_STATUS; use tui::constants::VIEW_BASIC_STATUS;
use tui::types::TUIStatusListener; use tui::types::TUIStatusListener;
use servers::common::types::SyncStatus;
use servers::ServerStats; use servers::ServerStats;
pub struct TUIStatusView; pub struct TUIStatusView;
@ -76,14 +77,63 @@ impl TUIStatusListener for TUIStatusView {
fn update(c: &mut Cursive, stats: &ServerStats) { fn update(c: &mut Cursive, stats: &ServerStats) {
//find and update here as needed //find and update here as needed
let basic_status = { let basic_status = {
if stats.is_syncing { if stats.awaiting_peers {
if stats.awaiting_peers { "Waiting for peers".to_string()
"Waiting for peers".to_string()
} else {
format!("Syncing - Latest header: {}", stats.header_head.height).to_string()
}
} else { } else {
"Running".to_string() match stats.sync_status {
SyncStatus::NoSync => "Running".to_string(),
SyncStatus::HeaderSync {
current_height,
highest_height,
} => {
let percent = if highest_height == 0 {
0
} else {
current_height * 100 / highest_height
};
format!("Downloading headers: {}%, step 1/4", percent)
}
SyncStatus::TxHashsetDownload => {
"Downloading chain state for fast sync, step 2/4".to_string()
}
SyncStatus::TxHashsetSetup => {
"Preparing chain state for validation, step 3/4".to_string()
}
SyncStatus::TxHashsetValidation {
kernels,
kernel_total,
rproofs,
rproof_total,
} => {
// 10% of overall progress is attributed to kernel validation
// 90% to range proofs (which are much longer)
let mut percent = if kernel_total > 0 {
kernels * 10 / kernel_total
} else {
0
};
percent += if rproof_total > 0 {
rproofs * 90 / rproof_total
} else {
0
};
format!("Validating chain state: {}%, step 3/4", percent)
}
SyncStatus::TxHashsetSave => {
"Finalizing chain state for fast sync, step 3/4".to_string()
}
SyncStatus::BodySync {
current_height,
highest_height,
} => {
let percent = if highest_height == 0 {
0
} else {
current_height * 100 / highest_height
};
format!("Downloading blocks: {}%, step 4/4", percent)
}
}
} }
}; };
/*let basic_mining_config_status = { /*let basic_mining_config_status = {