diff --git a/servers/src/common/stats.rs b/servers/src/common/stats.rs index 571b6a9ca..b5e0a4c26 100644 --- a/servers/src/common/stats.rs +++ b/servers/src/common/stats.rs @@ -15,7 +15,6 @@ //! Server stat collection types, to be used by tests, logging or GUI/TUI //! to collect information about server status -use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::time::SystemTime; use util::RwLock; @@ -32,8 +31,6 @@ use p2p; /// and populated when required #[derive(Clone)] pub struct ServerStateInfo { - /// whether we're in a state of waiting for peers at startup - pub awaiting_peers: Arc, /// Stratum stats pub stratum_stats: Arc>, } @@ -41,7 +38,6 @@ pub struct ServerStateInfo { impl Default for ServerStateInfo { fn default() -> ServerStateInfo { ServerStateInfo { - awaiting_peers: Arc::new(AtomicBool::new(false)), stratum_stats: Arc::new(RwLock::new(StratumStats::default())), } } @@ -58,8 +54,6 @@ pub struct ServerStats { pub header_head: chain::Tip, /// Whether we're currently syncing pub sync_status: SyncStatus, - /// Whether we're awaiting peers - pub awaiting_peers: bool, /// Handle to current stratum server stats pub stratum_stats: StratumStats, /// Peer stats diff --git a/servers/src/common/types.rs b/servers/src/common/types.rs index e492f0fcc..6ee10cd19 100644 --- a/servers/src/common/types.rs +++ b/servers/src/common/types.rs @@ -250,6 +250,9 @@ pub enum SyncStatus { Initial, /// Not syncing NoSync, + /// Not enough peers to do anything yet, boolean indicates whether + /// we should wait at all or ignore and start ASAP + AwaitingPeers(bool), /// Downloading block headers HeaderSync { current_height: u64, diff --git a/servers/src/grin/server.rs b/servers/src/grin/server.rs index 36cb854bc..1a4f1acae 100644 --- a/servers/src/grin/server.rs +++ b/servers/src/grin/server.rs @@ -28,7 +28,7 @@ use common::adapters::{ ChainToPoolAndNetAdapter, NetToChainAdapter, PoolToChainAdapter, PoolToNetAdapter, }; use common::stats::{DiffBlock, DiffStats, PeerStats, ServerStateInfo, ServerStats}; -use common::types::{Error, ServerConfig, StratumServerConfig, SyncState}; +use common::types::{Error, ServerConfig, StratumServerConfig, SyncState, SyncStatus}; use core::core::hash::Hashed; use core::core::verifier_cache::{LruVerifierCache, VerifierCache}; use core::{consensus, genesis, global, pow}; @@ -170,8 +170,6 @@ impl Server { pool_adapter.set_chain(shared_chain.clone()); - let awaiting_peers = Arc::new(AtomicBool::new(false)); - let net_adapter = Arc::new(NetToChainAdapter::new( sync_state.clone(), archive_mode, @@ -231,17 +229,13 @@ impl Server { // Defaults to None (optional) in config file. // This translates to false here so we do not skip by default. - let skip_sync_wait = match config.skip_sync_wait { - None => false, - Some(b) => b, - }; + let skip_sync_wait = config.skip_sync_wait.unwrap_or(false); + sync_state.update(SyncStatus::AwaitingPeers(!skip_sync_wait)); sync::run_sync( sync_state.clone(), - awaiting_peers.clone(), p2p_server.peers.clone(), shared_chain.clone(), - skip_sync_wait, archive_mode, stop.clone(), ); @@ -279,7 +273,6 @@ impl Server { verifier_cache, sync_state, state_info: ServerStateInfo { - awaiting_peers: awaiting_peers, ..Default::default() }, stop, @@ -383,7 +376,6 @@ impl Server { /// consumers pub fn get_server_stats(&self) -> Result { let stratum_stats = self.state_info.stratum_stats.read().clone(); - let awaiting_peers = self.state_info.awaiting_peers.load(Ordering::Relaxed); // Fill out stats on our current difficulty calculation // TODO: check the overhead of calculating this again isn't too much @@ -443,7 +435,6 @@ impl Server { head: self.head(), header_head: self.header_head(), sync_status: self.sync_state.status(), - awaiting_peers: awaiting_peers, stratum_stats: stratum_stats, peer_stats: peer_stats, diff_stats: diff_stats, diff --git a/servers/src/grin/sync/header_sync.rs b/servers/src/grin/sync/header_sync.rs index e56f28448..1989a9e9b 100644 --- a/servers/src/grin/sync/header_sync.rs +++ b/servers/src/grin/sync/header_sync.rs @@ -54,7 +54,7 @@ impl HeaderSync { let enable_header_sync = match status { SyncStatus::BodySync { .. } | SyncStatus::HeaderSync { .. } => true, - SyncStatus::NoSync | SyncStatus::Initial => { + SyncStatus::NoSync | SyncStatus::Initial | SyncStatus::AwaitingPeers(_) => { // Reset sync_head to header_head on transition to HeaderSync, // but ONLY on initial transition to HeaderSync state. let sync_head = self.chain.get_sync_head().unwrap(); @@ -102,7 +102,7 @@ impl HeaderSync { // always enable header sync on initial state transition from NoSync / Initial let force_sync = match self.sync_state.status() { - SyncStatus::NoSync | SyncStatus::Initial => true, + SyncStatus::NoSync | SyncStatus::Initial | SyncStatus::AwaitingPeers(_) => true, _ => false, }; diff --git a/servers/src/grin/sync/syncer.rs b/servers/src/grin/sync/syncer.rs index 91ae2ed6a..17f18089e 100644 --- a/servers/src/grin/sync/syncer.rs +++ b/servers/src/grin/sync/syncer.rs @@ -23,181 +23,193 @@ use core::pow::Difficulty; use grin::sync::body_sync::BodySync; use grin::sync::header_sync::HeaderSync; use grin::sync::state_sync::StateSync; -use p2p::{self, Peers}; +use p2p; pub fn run_sync( sync_state: Arc, - awaiting_peers: Arc, peers: Arc, chain: Arc, - skip_sync_wait: bool, archive_mode: bool, stop: Arc, ) { let _ = thread::Builder::new() .name("sync".to_string()) .spawn(move || { - sync_loop( - sync_state, - awaiting_peers, - peers, - chain, - skip_sync_wait, - archive_mode, - stop, - ) + let runner = SyncRunner::new(sync_state, peers, chain, archive_mode, stop); + runner.sync_loop(); }); } -fn wait_for_min_peers( - awaiting_peers: Arc, - peers: Arc, - chain: Arc, - skip_sync_wait: bool, -) { - // Initial sleep to give us time to peer with some nodes. - // Note: Even if we have "skip_sync_wait" we need to wait a - // short period of time for tests to do the right thing. - let wait_secs = if skip_sync_wait { 3 } else { 30 }; - - let head = chain.head().unwrap(); - - awaiting_peers.store(true, Ordering::Relaxed); - let mut n = 0; - const MIN_PEERS: usize = 3; - loop { - let wp = peers.more_work_peers(); - // exit loop when: - // * we have more than MIN_PEERS more_work peers - // * we are synced already, e.g. grin was quickly restarted - // * timeout - if wp.len() > MIN_PEERS - || (wp.len() == 0 && peers.enough_peers() && head.total_difficulty > Difficulty::zero()) - || n > wait_secs - { - break; - } - thread::sleep(time::Duration::from_secs(1)); - n += 1; - } - awaiting_peers.store(false, Ordering::Relaxed); -} - -/// Starts the syncing loop, just spawns two threads that loop forever -fn sync_loop( +pub struct SyncRunner { sync_state: Arc, - awaiting_peers: Arc, peers: Arc, chain: Arc, - skip_sync_wait: bool, archive_mode: bool, stop: Arc, -) { - // Wait for connections reach at least MIN_PEERS - wait_for_min_peers(awaiting_peers, peers.clone(), chain.clone(), skip_sync_wait); - - // Our 3 main sync stages - let mut header_sync = HeaderSync::new(sync_state.clone(), peers.clone(), chain.clone()); - let mut body_sync = BodySync::new(sync_state.clone(), peers.clone(), chain.clone()); - let mut state_sync = StateSync::new( - sync_state.clone(), - peers.clone(), - chain.clone(), - archive_mode, - ); - - // Highest height seen on the network, generally useful for a fast test on - // whether some sync is needed - let mut highest_height = 0; - - // Main syncing loop - while !stop.load(Ordering::Relaxed) { - thread::sleep(time::Duration::from_millis(10)); - - // check whether syncing is generally needed, when we compare our state with others - let (syncing, most_work_height) = - needs_syncing(sync_state.as_ref(), peers.clone(), chain.clone()); - - if most_work_height > 0 { - // we can occasionally get a most work height of 0 if read locks fail - highest_height = most_work_height; - } - - // quick short-circuit (and a decent sleep) if no syncing is needed - if !syncing { - sync_state.update(SyncStatus::NoSync); - thread::sleep(time::Duration::from_secs(10)); - continue; - } - - // if syncing is needed - let head = chain.head().unwrap(); - let header_head = chain.header_head().unwrap(); - - // run each sync stage, each of them deciding whether they're needed - // except for body sync that only runs if state sync is off or done - header_sync.check_run(&header_head, highest_height); - if !state_sync.check_run(&header_head, &head, highest_height) { - body_sync.check_run(&head, highest_height); - } - } } -/// Whether we're currently syncing the chain or we're fully caught up and -/// just receiving blocks through gossip. -fn needs_syncing( - sync_state: &SyncState, - peers: Arc, - chain: Arc, -) -> (bool, u64) { - let local_diff = chain.head().unwrap().total_difficulty; - let peer = peers.most_work_peer(); - let is_syncing = sync_state.is_syncing(); - let mut most_work_height = 0; +impl SyncRunner { + fn new( + sync_state: Arc, + peers: Arc, + chain: Arc, + archive_mode: bool, + stop: Arc, + ) -> SyncRunner { + SyncRunner { + sync_state, + peers, + chain, + archive_mode, + stop, + } + } - // if we're already syncing, we're caught up if no peer has a higher - // difficulty than us - if is_syncing { - if let Some(peer) = peer { - most_work_height = peer.info.height(); - if peer.info.total_difficulty() <= local_diff { - let ch = chain.head().unwrap(); - info!( - "synchronized at {} @ {} [{}]", - local_diff.to_num(), - ch.height, - ch.last_block_h - ); + fn wait_for_min_peers(&self) { + // Initial sleep to give us time to peer with some nodes. + // Note: Even if we have skip peer wait we need to wait a + // short period of time for tests to do the right thing. + let wait_secs = if let SyncStatus::AwaitingPeers(true) = self.sync_state.status() { + 30 + } else { + 3 + }; - let _ = chain.reset_head(); - return (false, most_work_height); + let head = self.chain.head().unwrap(); + + let mut n = 0; + const MIN_PEERS: usize = 3; + loop { + let wp = self.peers.more_work_peers(); + // exit loop when: + // * we have more than MIN_PEERS more_work peers + // * we are synced already, e.g. grin was quickly restarted + // * timeout + if wp.len() > MIN_PEERS + || (wp.len() == 0 + && self.peers.enough_peers() + && head.total_difficulty > Difficulty::zero()) + || n > wait_secs + { + break; + } + thread::sleep(time::Duration::from_secs(1)); + n += 1; + } + } + + /// Starts the syncing loop, just spawns two threads that loop forever + fn sync_loop(&self) { + // Wait for connections reach at least MIN_PEERS + self.wait_for_min_peers(); + + // Our 3 main sync stages + let mut header_sync = HeaderSync::new( + self.sync_state.clone(), + self.peers.clone(), + self.chain.clone(), + ); + let mut body_sync = BodySync::new( + self.sync_state.clone(), + self.peers.clone(), + self.chain.clone(), + ); + let mut state_sync = StateSync::new( + self.sync_state.clone(), + self.peers.clone(), + self.chain.clone(), + self.archive_mode, + ); + + // Highest height seen on the network, generally useful for a fast test on + // whether some sync is needed + let mut highest_height = 0; + + // Main syncing loop + while !self.stop.load(Ordering::Relaxed) { + thread::sleep(time::Duration::from_millis(10)); + + // check whether syncing is generally needed, when we compare our state with others + let (syncing, most_work_height) = self.needs_syncing(); + + if most_work_height > 0 { + // we can occasionally get a most work height of 0 if read locks fail + highest_height = most_work_height; + } + + // quick short-circuit (and a decent sleep) if no syncing is needed + if !syncing { + self.sync_state.update(SyncStatus::NoSync); + thread::sleep(time::Duration::from_secs(10)); + continue; + } + + // if syncing is needed + let head = self.chain.head().unwrap(); + let header_head = self.chain.header_head().unwrap(); + + // run each sync stage, each of them deciding whether they're needed + // except for body sync that only runs if state sync is off or done + header_sync.check_run(&header_head, highest_height); + if !state_sync.check_run(&header_head, &head, highest_height) { + body_sync.check_run(&head, highest_height); + } + } + } + + /// Whether we're currently syncing the chain or we're fully caught up and + /// just receiving blocks through gossip. + fn needs_syncing(&self) -> (bool, u64) { + let local_diff = self.chain.head().unwrap().total_difficulty; + let peer = self.peers.most_work_peer(); + let is_syncing = self.sync_state.is_syncing(); + let mut most_work_height = 0; + + // if we're already syncing, we're caught up if no peer has a higher + // difficulty than us + if is_syncing { + if let Some(peer) = peer { + most_work_height = peer.info.height(); + if peer.info.total_difficulty() <= local_diff { + let ch = self.chain.head().unwrap(); + info!( + "synchronized at {} @ {} [{}]", + local_diff.to_num(), + ch.height, + ch.last_block_h + ); + + let _ = self.chain.reset_head(); + return (false, most_work_height); + } + } else { + warn!("sync: no peers available, disabling sync"); + return (false, 0); } } else { - warn!("sync: no peers available, disabling sync"); - return (false, 0); - } - } else { - if let Some(peer) = peer { - most_work_height = peer.info.height(); + if let Some(peer) = peer { + most_work_height = peer.info.height(); - // sum the last 5 difficulties to give us the threshold - let threshold = chain - .difficulty_iter() - .map(|x| x.difficulty) - .take(5) - .fold(Difficulty::zero(), |sum, val| sum + val); + // sum the last 5 difficulties to give us the threshold + let threshold = self + .chain + .difficulty_iter() + .map(|x| x.difficulty) + .take(5) + .fold(Difficulty::zero(), |sum, val| sum + val); - let peer_diff = peer.info.total_difficulty(); - if peer_diff > local_diff.clone() + threshold.clone() { - info!( - "sync: total_difficulty {}, peer_difficulty {}, threshold {} (last 5 blocks), enabling sync", - local_diff, - peer_diff, - threshold, - ); - return (true, most_work_height); + let peer_diff = peer.info.total_difficulty(); + if peer_diff > local_diff.clone() + threshold.clone() { + info!( + "sync: total_difficulty {}, peer_difficulty {}, threshold {} (last 5 blocks), enabling sync", + local_diff, + peer_diff, + threshold, + ); + return (true, most_work_height); + } } } + (is_syncing, most_work_height) } - (is_syncing, most_work_height) } diff --git a/src/bin/tui/status.rs b/src/bin/tui/status.rs index 0b0db2420..1a9fc093b 100644 --- a/src/bin/tui/status.rs +++ b/src/bin/tui/status.rs @@ -87,91 +87,88 @@ impl TUIStatusListener for TUIStatusView { fn update(c: &mut Cursive, stats: &ServerStats) { //find and update here as needed let basic_status = { - if stats.awaiting_peers { - "Waiting for peers".to_string() - } else { - match stats.sync_status { - SyncStatus::Initial => "Initializing".to_string(), - SyncStatus::NoSync => "Running".to_string(), - SyncStatus::HeaderSync { - current_height, - highest_height, - } => { - let percent = if highest_height == 0 { - 0 + match stats.sync_status { + SyncStatus::Initial => "Initializing".to_string(), + SyncStatus::NoSync => "Running".to_string(), + SyncStatus::AwaitingPeers(_) => "Waiting for peers".to_string(), + SyncStatus::HeaderSync { + current_height, + highest_height, + } => { + let percent = if highest_height == 0 { + 0 + } else { + current_height * 100 / highest_height + }; + format!("Downloading headers: {}%, step 1/4", percent) + } + SyncStatus::TxHashsetDownload { + start_time, + downloaded_size, + total_size, + } => { + if total_size > 0 { + let percent = if total_size > 0 { + downloaded_size * 100 / total_size } else { - current_height * 100 / highest_height + 0 }; - format!("Downloading headers: {}%, step 1/4", percent) - } - SyncStatus::TxHashsetDownload { - start_time, - downloaded_size, - total_size, - } => { - if total_size > 0 { - let percent = if total_size > 0 { - downloaded_size * 100 / total_size - } else { - 0 - }; - let start = start_time.timestamp_nanos(); - let fin = Utc::now().timestamp_nanos(); - let dur_ms = (fin - start) as f64 * NANO_TO_MILLIS; + let start = start_time.timestamp_nanos(); + let fin = Utc::now().timestamp_nanos(); + let dur_ms = (fin - start) as f64 * NANO_TO_MILLIS; - format!("Downloading {}(MB) chain state for fast sync: {}% at {:.1?}(kB/s), step 2/4", - total_size / 1_000_000, - percent, - if dur_ms > 1.0f64 { downloaded_size as f64 / dur_ms as f64 } else { 0f64 }, - ) - } else { - let start = start_time.timestamp_millis(); - let fin = Utc::now().timestamp_millis(); - let dur_secs = (fin - start) / 1000; + format!("Downloading {}(MB) chain state for fast sync: {}% at {:.1?}(kB/s), step 2/4", + total_size / 1_000_000, + percent, + if dur_ms > 1.0f64 { downloaded_size as f64 / dur_ms as f64 } else { 0f64 }, + ) + } else { + let start = start_time.timestamp_millis(); + let fin = Utc::now().timestamp_millis(); + let dur_secs = (fin - start) / 1000; - format!("Downloading chain state for fast sync. Waiting remote peer to start: {}s, step 2/4", - dur_secs, - ) - } - } - SyncStatus::TxHashsetSetup => { - "Preparing chain state for validation, step 3/4".to_string() - } - SyncStatus::TxHashsetValidation { - kernels, - kernel_total, - rproofs, - rproof_total, - } => { - // 10% of overall progress is attributed to kernel validation - // 90% to range proofs (which are much longer) - let mut percent = if kernel_total > 0 { - kernels * 10 / kernel_total - } else { - 0 - }; - percent += if rproof_total > 0 { - rproofs * 90 / rproof_total - } else { - 0 - }; - format!("Validating chain state: {}%, step 3/4", percent) - } - SyncStatus::TxHashsetSave => { - "Finalizing chain state for fast sync, step 3/4".to_string() - } - SyncStatus::BodySync { - current_height, - highest_height, - } => { - let percent = if highest_height == 0 { - 0 - } else { - current_height * 100 / highest_height - }; - format!("Downloading blocks: {}%, step 4/4", percent) + format!("Downloading chain state for fast sync. Waiting remote peer to start: {}s, step 2/4", + dur_secs, + ) } } + SyncStatus::TxHashsetSetup => { + "Preparing chain state for validation, step 3/4".to_string() + } + SyncStatus::TxHashsetValidation { + kernels, + kernel_total, + rproofs, + rproof_total, + } => { + // 10% of overall progress is attributed to kernel validation + // 90% to range proofs (which are much longer) + let mut percent = if kernel_total > 0 { + kernels * 10 / kernel_total + } else { + 0 + }; + percent += if rproof_total > 0 { + rproofs * 90 / rproof_total + } else { + 0 + }; + format!("Validating chain state: {}%, step 3/4", percent) + } + SyncStatus::TxHashsetSave => { + "Finalizing chain state for fast sync, step 3/4".to_string() + } + SyncStatus::BodySync { + current_height, + highest_height, + } => { + let percent = if highest_height == 0 { + 0 + } else { + current_height * 100 / highest_height + }; + format!("Downloading blocks: {}%, step 4/4", percent) + } } }; /*let basic_mining_config_status = {