Cleanup sync loop (#1811)

* Add a struct to encapsulate common references and avoid passing
them around on every function.
* Consolidate `skip_sync_wait` and `awaiting_peers` in an
additional sync status.
* New awaiting peer status is initial too
This commit is contained in:
Ignotus Peverell 2018-10-22 22:50:13 -07:00 committed by GitHub
parent 7f60e2076d
commit a4a4c5610f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 243 additions and 246 deletions

View file

@ -15,7 +15,6 @@
//! Server stat collection types, to be used by tests, logging or GUI/TUI //! Server stat collection types, to be used by tests, logging or GUI/TUI
//! to collect information about server status //! to collect information about server status
use std::sync::atomic::AtomicBool;
use std::sync::Arc; use std::sync::Arc;
use std::time::SystemTime; use std::time::SystemTime;
use util::RwLock; use util::RwLock;
@ -32,8 +31,6 @@ use p2p;
/// and populated when required /// and populated when required
#[derive(Clone)] #[derive(Clone)]
pub struct ServerStateInfo { pub struct ServerStateInfo {
/// whether we're in a state of waiting for peers at startup
pub awaiting_peers: Arc<AtomicBool>,
/// Stratum stats /// Stratum stats
pub stratum_stats: Arc<RwLock<StratumStats>>, pub stratum_stats: Arc<RwLock<StratumStats>>,
} }
@ -41,7 +38,6 @@ pub struct ServerStateInfo {
impl Default for ServerStateInfo { impl Default for ServerStateInfo {
fn default() -> ServerStateInfo { fn default() -> ServerStateInfo {
ServerStateInfo { ServerStateInfo {
awaiting_peers: Arc::new(AtomicBool::new(false)),
stratum_stats: Arc::new(RwLock::new(StratumStats::default())), stratum_stats: Arc::new(RwLock::new(StratumStats::default())),
} }
} }
@ -58,8 +54,6 @@ pub struct ServerStats {
pub header_head: chain::Tip, pub header_head: chain::Tip,
/// Whether we're currently syncing /// Whether we're currently syncing
pub sync_status: SyncStatus, pub sync_status: SyncStatus,
/// Whether we're awaiting peers
pub awaiting_peers: bool,
/// Handle to current stratum server stats /// Handle to current stratum server stats
pub stratum_stats: StratumStats, pub stratum_stats: StratumStats,
/// Peer stats /// Peer stats

View file

@ -250,6 +250,9 @@ pub enum SyncStatus {
Initial, Initial,
/// Not syncing /// Not syncing
NoSync, NoSync,
/// Not enough peers to do anything yet, boolean indicates whether
/// we should wait at all or ignore and start ASAP
AwaitingPeers(bool),
/// Downloading block headers /// Downloading block headers
HeaderSync { HeaderSync {
current_height: u64, current_height: u64,

View file

@ -28,7 +28,7 @@ use common::adapters::{
ChainToPoolAndNetAdapter, NetToChainAdapter, PoolToChainAdapter, PoolToNetAdapter, ChainToPoolAndNetAdapter, NetToChainAdapter, PoolToChainAdapter, PoolToNetAdapter,
}; };
use common::stats::{DiffBlock, DiffStats, PeerStats, ServerStateInfo, ServerStats}; use common::stats::{DiffBlock, DiffStats, PeerStats, ServerStateInfo, ServerStats};
use common::types::{Error, ServerConfig, StratumServerConfig, SyncState}; use common::types::{Error, ServerConfig, StratumServerConfig, SyncState, SyncStatus};
use core::core::hash::Hashed; use core::core::hash::Hashed;
use core::core::verifier_cache::{LruVerifierCache, VerifierCache}; use core::core::verifier_cache::{LruVerifierCache, VerifierCache};
use core::{consensus, genesis, global, pow}; use core::{consensus, genesis, global, pow};
@ -170,8 +170,6 @@ impl Server {
pool_adapter.set_chain(shared_chain.clone()); pool_adapter.set_chain(shared_chain.clone());
let awaiting_peers = Arc::new(AtomicBool::new(false));
let net_adapter = Arc::new(NetToChainAdapter::new( let net_adapter = Arc::new(NetToChainAdapter::new(
sync_state.clone(), sync_state.clone(),
archive_mode, archive_mode,
@ -231,17 +229,13 @@ impl Server {
// Defaults to None (optional) in config file. // Defaults to None (optional) in config file.
// This translates to false here so we do not skip by default. // This translates to false here so we do not skip by default.
let skip_sync_wait = match config.skip_sync_wait { let skip_sync_wait = config.skip_sync_wait.unwrap_or(false);
None => false, sync_state.update(SyncStatus::AwaitingPeers(!skip_sync_wait));
Some(b) => b,
};
sync::run_sync( sync::run_sync(
sync_state.clone(), sync_state.clone(),
awaiting_peers.clone(),
p2p_server.peers.clone(), p2p_server.peers.clone(),
shared_chain.clone(), shared_chain.clone(),
skip_sync_wait,
archive_mode, archive_mode,
stop.clone(), stop.clone(),
); );
@ -279,7 +273,6 @@ impl Server {
verifier_cache, verifier_cache,
sync_state, sync_state,
state_info: ServerStateInfo { state_info: ServerStateInfo {
awaiting_peers: awaiting_peers,
..Default::default() ..Default::default()
}, },
stop, stop,
@ -383,7 +376,6 @@ impl Server {
/// consumers /// consumers
pub fn get_server_stats(&self) -> Result<ServerStats, Error> { pub fn get_server_stats(&self) -> Result<ServerStats, Error> {
let stratum_stats = self.state_info.stratum_stats.read().clone(); let stratum_stats = self.state_info.stratum_stats.read().clone();
let awaiting_peers = self.state_info.awaiting_peers.load(Ordering::Relaxed);
// Fill out stats on our current difficulty calculation // Fill out stats on our current difficulty calculation
// TODO: check the overhead of calculating this again isn't too much // TODO: check the overhead of calculating this again isn't too much
@ -443,7 +435,6 @@ impl Server {
head: self.head(), head: self.head(),
header_head: self.header_head(), header_head: self.header_head(),
sync_status: self.sync_state.status(), sync_status: self.sync_state.status(),
awaiting_peers: awaiting_peers,
stratum_stats: stratum_stats, stratum_stats: stratum_stats,
peer_stats: peer_stats, peer_stats: peer_stats,
diff_stats: diff_stats, diff_stats: diff_stats,

View file

@ -54,7 +54,7 @@ impl HeaderSync {
let enable_header_sync = match status { let enable_header_sync = match status {
SyncStatus::BodySync { .. } | SyncStatus::HeaderSync { .. } => true, SyncStatus::BodySync { .. } | SyncStatus::HeaderSync { .. } => true,
SyncStatus::NoSync | SyncStatus::Initial => { SyncStatus::NoSync | SyncStatus::Initial | SyncStatus::AwaitingPeers(_) => {
// Reset sync_head to header_head on transition to HeaderSync, // Reset sync_head to header_head on transition to HeaderSync,
// but ONLY on initial transition to HeaderSync state. // but ONLY on initial transition to HeaderSync state.
let sync_head = self.chain.get_sync_head().unwrap(); let sync_head = self.chain.get_sync_head().unwrap();
@ -102,7 +102,7 @@ impl HeaderSync {
// always enable header sync on initial state transition from NoSync / Initial // always enable header sync on initial state transition from NoSync / Initial
let force_sync = match self.sync_state.status() { let force_sync = match self.sync_state.status() {
SyncStatus::NoSync | SyncStatus::Initial => true, SyncStatus::NoSync | SyncStatus::Initial | SyncStatus::AwaitingPeers(_) => true,
_ => false, _ => false,
}; };

View file

@ -23,56 +23,72 @@ use core::pow::Difficulty;
use grin::sync::body_sync::BodySync; use grin::sync::body_sync::BodySync;
use grin::sync::header_sync::HeaderSync; use grin::sync::header_sync::HeaderSync;
use grin::sync::state_sync::StateSync; use grin::sync::state_sync::StateSync;
use p2p::{self, Peers}; use p2p;
pub fn run_sync( pub fn run_sync(
sync_state: Arc<SyncState>, sync_state: Arc<SyncState>,
awaiting_peers: Arc<AtomicBool>,
peers: Arc<p2p::Peers>, peers: Arc<p2p::Peers>,
chain: Arc<chain::Chain>, chain: Arc<chain::Chain>,
skip_sync_wait: bool,
archive_mode: bool, archive_mode: bool,
stop: Arc<AtomicBool>, stop: Arc<AtomicBool>,
) { ) {
let _ = thread::Builder::new() let _ = thread::Builder::new()
.name("sync".to_string()) .name("sync".to_string())
.spawn(move || { .spawn(move || {
sync_loop( let runner = SyncRunner::new(sync_state, peers, chain, archive_mode, stop);
sync_state, runner.sync_loop();
awaiting_peers,
peers,
chain,
skip_sync_wait,
archive_mode,
stop,
)
}); });
} }
fn wait_for_min_peers( pub struct SyncRunner {
awaiting_peers: Arc<AtomicBool>, sync_state: Arc<SyncState>,
peers: Arc<p2p::Peers>, peers: Arc<p2p::Peers>,
chain: Arc<chain::Chain>, chain: Arc<chain::Chain>,
skip_sync_wait: bool, archive_mode: bool,
) { stop: Arc<AtomicBool>,
}
impl SyncRunner {
fn new(
sync_state: Arc<SyncState>,
peers: Arc<p2p::Peers>,
chain: Arc<chain::Chain>,
archive_mode: bool,
stop: Arc<AtomicBool>,
) -> SyncRunner {
SyncRunner {
sync_state,
peers,
chain,
archive_mode,
stop,
}
}
fn wait_for_min_peers(&self) {
// Initial sleep to give us time to peer with some nodes. // Initial sleep to give us time to peer with some nodes.
// Note: Even if we have "skip_sync_wait" we need to wait a // Note: Even if we have skip peer wait we need to wait a
// short period of time for tests to do the right thing. // short period of time for tests to do the right thing.
let wait_secs = if skip_sync_wait { 3 } else { 30 }; let wait_secs = if let SyncStatus::AwaitingPeers(true) = self.sync_state.status() {
30
} else {
3
};
let head = chain.head().unwrap(); let head = self.chain.head().unwrap();
awaiting_peers.store(true, Ordering::Relaxed);
let mut n = 0; let mut n = 0;
const MIN_PEERS: usize = 3; const MIN_PEERS: usize = 3;
loop { loop {
let wp = peers.more_work_peers(); let wp = self.peers.more_work_peers();
// exit loop when: // exit loop when:
// * we have more than MIN_PEERS more_work peers // * we have more than MIN_PEERS more_work peers
// * we are synced already, e.g. grin was quickly restarted // * we are synced already, e.g. grin was quickly restarted
// * timeout // * timeout
if wp.len() > MIN_PEERS if wp.len() > MIN_PEERS
|| (wp.len() == 0 && peers.enough_peers() && head.total_difficulty > Difficulty::zero()) || (wp.len() == 0
&& self.peers.enough_peers()
&& head.total_difficulty > Difficulty::zero())
|| n > wait_secs || n > wait_secs
{ {
break; break;
@ -80,30 +96,29 @@ fn wait_for_min_peers(
thread::sleep(time::Duration::from_secs(1)); thread::sleep(time::Duration::from_secs(1));
n += 1; n += 1;
} }
awaiting_peers.store(false, Ordering::Relaxed);
} }
/// Starts the syncing loop, just spawns two threads that loop forever /// Starts the syncing loop, just spawns two threads that loop forever
fn sync_loop( fn sync_loop(&self) {
sync_state: Arc<SyncState>,
awaiting_peers: Arc<AtomicBool>,
peers: Arc<p2p::Peers>,
chain: Arc<chain::Chain>,
skip_sync_wait: bool,
archive_mode: bool,
stop: Arc<AtomicBool>,
) {
// Wait for connections reach at least MIN_PEERS // Wait for connections reach at least MIN_PEERS
wait_for_min_peers(awaiting_peers, peers.clone(), chain.clone(), skip_sync_wait); self.wait_for_min_peers();
// Our 3 main sync stages // Our 3 main sync stages
let mut header_sync = HeaderSync::new(sync_state.clone(), peers.clone(), chain.clone()); let mut header_sync = HeaderSync::new(
let mut body_sync = BodySync::new(sync_state.clone(), peers.clone(), chain.clone()); self.sync_state.clone(),
self.peers.clone(),
self.chain.clone(),
);
let mut body_sync = BodySync::new(
self.sync_state.clone(),
self.peers.clone(),
self.chain.clone(),
);
let mut state_sync = StateSync::new( let mut state_sync = StateSync::new(
sync_state.clone(), self.sync_state.clone(),
peers.clone(), self.peers.clone(),
chain.clone(), self.chain.clone(),
archive_mode, self.archive_mode,
); );
// Highest height seen on the network, generally useful for a fast test on // Highest height seen on the network, generally useful for a fast test on
@ -111,12 +126,11 @@ fn sync_loop(
let mut highest_height = 0; let mut highest_height = 0;
// Main syncing loop // Main syncing loop
while !stop.load(Ordering::Relaxed) { while !self.stop.load(Ordering::Relaxed) {
thread::sleep(time::Duration::from_millis(10)); thread::sleep(time::Duration::from_millis(10));
// check whether syncing is generally needed, when we compare our state with others // check whether syncing is generally needed, when we compare our state with others
let (syncing, most_work_height) = let (syncing, most_work_height) = self.needs_syncing();
needs_syncing(sync_state.as_ref(), peers.clone(), chain.clone());
if most_work_height > 0 { if most_work_height > 0 {
// we can occasionally get a most work height of 0 if read locks fail // we can occasionally get a most work height of 0 if read locks fail
@ -125,14 +139,14 @@ fn sync_loop(
// quick short-circuit (and a decent sleep) if no syncing is needed // quick short-circuit (and a decent sleep) if no syncing is needed
if !syncing { if !syncing {
sync_state.update(SyncStatus::NoSync); self.sync_state.update(SyncStatus::NoSync);
thread::sleep(time::Duration::from_secs(10)); thread::sleep(time::Duration::from_secs(10));
continue; continue;
} }
// if syncing is needed // if syncing is needed
let head = chain.head().unwrap(); let head = self.chain.head().unwrap();
let header_head = chain.header_head().unwrap(); let header_head = self.chain.header_head().unwrap();
// run each sync stage, each of them deciding whether they're needed // run each sync stage, each of them deciding whether they're needed
// except for body sync that only runs if state sync is off or done // except for body sync that only runs if state sync is off or done
@ -145,14 +159,10 @@ fn sync_loop(
/// Whether we're currently syncing the chain or we're fully caught up and /// Whether we're currently syncing the chain or we're fully caught up and
/// just receiving blocks through gossip. /// just receiving blocks through gossip.
fn needs_syncing( fn needs_syncing(&self) -> (bool, u64) {
sync_state: &SyncState, let local_diff = self.chain.head().unwrap().total_difficulty;
peers: Arc<Peers>, let peer = self.peers.most_work_peer();
chain: Arc<chain::Chain>, let is_syncing = self.sync_state.is_syncing();
) -> (bool, u64) {
let local_diff = chain.head().unwrap().total_difficulty;
let peer = peers.most_work_peer();
let is_syncing = sync_state.is_syncing();
let mut most_work_height = 0; let mut most_work_height = 0;
// if we're already syncing, we're caught up if no peer has a higher // if we're already syncing, we're caught up if no peer has a higher
@ -161,7 +171,7 @@ fn needs_syncing(
if let Some(peer) = peer { if let Some(peer) = peer {
most_work_height = peer.info.height(); most_work_height = peer.info.height();
if peer.info.total_difficulty() <= local_diff { if peer.info.total_difficulty() <= local_diff {
let ch = chain.head().unwrap(); let ch = self.chain.head().unwrap();
info!( info!(
"synchronized at {} @ {} [{}]", "synchronized at {} @ {} [{}]",
local_diff.to_num(), local_diff.to_num(),
@ -169,7 +179,7 @@ fn needs_syncing(
ch.last_block_h ch.last_block_h
); );
let _ = chain.reset_head(); let _ = self.chain.reset_head();
return (false, most_work_height); return (false, most_work_height);
} }
} else { } else {
@ -181,7 +191,8 @@ fn needs_syncing(
most_work_height = peer.info.height(); most_work_height = peer.info.height();
// sum the last 5 difficulties to give us the threshold // sum the last 5 difficulties to give us the threshold
let threshold = chain let threshold = self
.chain
.difficulty_iter() .difficulty_iter()
.map(|x| x.difficulty) .map(|x| x.difficulty)
.take(5) .take(5)
@ -201,3 +212,4 @@ fn needs_syncing(
} }
(is_syncing, most_work_height) (is_syncing, most_work_height)
} }
}

View file

@ -87,12 +87,10 @@ impl TUIStatusListener for TUIStatusView {
fn update(c: &mut Cursive, stats: &ServerStats) { fn update(c: &mut Cursive, stats: &ServerStats) {
//find and update here as needed //find and update here as needed
let basic_status = { let basic_status = {
if stats.awaiting_peers {
"Waiting for peers".to_string()
} else {
match stats.sync_status { match stats.sync_status {
SyncStatus::Initial => "Initializing".to_string(), SyncStatus::Initial => "Initializing".to_string(),
SyncStatus::NoSync => "Running".to_string(), SyncStatus::NoSync => "Running".to_string(),
SyncStatus::AwaitingPeers(_) => "Waiting for peers".to_string(),
SyncStatus::HeaderSync { SyncStatus::HeaderSync {
current_height, current_height,
highest_height, highest_height,
@ -172,7 +170,6 @@ impl TUIStatusListener for TUIStatusView {
format!("Downloading blocks: {}%, step 4/4", percent) format!("Downloading blocks: {}%, step 4/4", percent)
} }
} }
}
}; };
/*let basic_mining_config_status = { /*let basic_mining_config_status = {
if stats.mining_stats.is_enabled { if stats.mining_stats.is_enabled {