diff --git a/servers/src/grin/sync/body_sync.rs b/servers/src/grin/sync/body_sync.rs index 913b296c4..17b368d54 100644 --- a/servers/src/grin/sync/body_sync.rs +++ b/servers/src/grin/sync/body_sync.rs @@ -14,8 +14,8 @@ use chrono::prelude::{DateTime, Utc}; use chrono::Duration; -use std::sync::Arc; use std::cmp; +use std::sync::Arc; use chain; use common::types::{SyncState, SyncStatus}; @@ -38,7 +38,11 @@ pub struct BodySync { } impl BodySync { - pub fn new(sync_state: Arc, peers: Arc, chain: Arc) -> BodySync { + pub fn new( + sync_state: Arc, + peers: Arc, + chain: Arc, + ) -> BodySync { BodySync { sync_state, peers, @@ -186,7 +190,8 @@ impl BodySync { match self.prev_body_received { Some(prev_ts) => { if tip.last_block_h == self.prev_tip.last_block_h - && self.chain.orphans_len() + self.chain.orphans_evicted_len() == self.prev_orphans_len + && self.chain.orphans_len() + self.chain.orphans_evicted_len() + == self.prev_orphans_len && Utc::now() - prev_ts > Duration::milliseconds(200) { let hashes_not_get = self @@ -226,4 +231,3 @@ impl BodySync { return false; } } - diff --git a/servers/src/grin/sync/header_sync.rs b/servers/src/grin/sync/header_sync.rs index 6ce6db6fb..a9679e29e 100644 --- a/servers/src/grin/sync/header_sync.rs +++ b/servers/src/grin/sync/header_sync.rs @@ -32,7 +32,11 @@ pub struct HeaderSync { } impl HeaderSync { - pub fn new(sync_state: Arc, peers: Arc, chain: Arc) -> HeaderSync { + pub fn new( + sync_state: Arc, + peers: Arc, + chain: Arc, + ) -> HeaderSync { HeaderSync { sync_state, peers, @@ -130,14 +134,13 @@ impl HeaderSync { } } - /// Request some block headers from a peer to advance us. fn request_headers(&mut self, peer: &Peer) { if let Ok(locator) = self.get_locator() { debug!( LOGGER, "sync: request_headers: asking {} for headers, {:?}", peer.info.addr, locator, - ); + ); let _ = peer.send_header_request(locator); } @@ -169,10 +172,10 @@ impl HeaderSync { new_heights.push(header.height); if self.history_locators.len() > 0 && tip.height - header.height + 1 >= p2p::MAX_BLOCK_HEADERS as u64 - 1 - { - this_height = header.height; - break; - } + { + this_height = header.height; + break; + } } current = self.chain.get_block_header(&header.previous); } @@ -197,7 +200,8 @@ impl HeaderSync { let this_height_index = heights.iter().position(|&r| r == this_height).unwrap(); let next_height = heights[this_height_index + 1]; - let reuse_index = self.history_locators + let reuse_index = self + .history_locators .iter() .position(|&r| r.0 >= next_height) .unwrap(); @@ -221,7 +225,11 @@ impl HeaderSync { // push height 0 if it's not there if new_heights[new_heights.len() - 1] != 0 { - locator.push(self.history_locators[self.history_locators.len() - 1].1.clone()); + locator.push( + self.history_locators[self.history_locators.len() - 1] + .1 + .clone(), + ); new_heights.push(0); } } @@ -232,7 +240,8 @@ impl HeaderSync { if heights.len() > 1 { let shrink_height = heights[heights.len() - 2]; let mut shrunk_size = 0; - let shrink_index = self.history_locators + let shrink_index = self + .history_locators .iter() .position(|&r| r.0 > shrink_height) .unwrap(); @@ -249,7 +258,7 @@ impl HeaderSync { "sync: history locators: len={}, shrunk={}", self.history_locators.len(), shrunk_size - ); + ); } debug!(LOGGER, "sync: locator: {:?}", locator); diff --git a/servers/src/grin/sync/state_sync.rs b/servers/src/grin/sync/state_sync.rs index 57451b7ba..d9f8486ba 100644 --- a/servers/src/grin/sync/state_sync.rs +++ b/servers/src/grin/sync/state_sync.rs @@ -23,7 +23,6 @@ use core::global; use p2p::{self, Peer}; use util::LOGGER; - /// Fast sync has 3 "states": /// * syncing headers /// * once all headers are sync'd, requesting the txhashset state @@ -41,7 +40,12 @@ pub struct StateSync { } impl StateSync { - pub fn new(sync_state: Arc, peers: Arc, chain: Arc, archive_mode: bool) -> StateSync { + pub fn new( + sync_state: Arc, + peers: Arc, + chain: Arc, + archive_mode: bool, + ) -> StateSync { StateSync { sync_state, peers, @@ -52,7 +56,15 @@ impl StateSync { } } - pub fn check_run(&mut self, header_head: &chain::Tip, head: &chain::Tip, highest_height: u64) -> bool { + /// Check whether state sync should run and triggers a state download when + /// it's time (we have all headers). Returns true as long as state sync + /// needs monitoring, false when it's either done or turned off. + pub fn check_run( + &mut self, + header_head: &chain::Tip, + head: &chain::Tip, + highest_height: u64, + ) -> bool { let need_state_sync = !self.archive_mode && highest_height.saturating_sub(head.height) > global::cut_through_horizon() as u64; if !need_state_sync { @@ -68,7 +80,7 @@ impl StateSync { error!( LOGGER, "fast_sync: error = {:?}. restart fast sync", sync_error - ); + ); sync_need_restart = true; } drop(clone); @@ -82,7 +94,7 @@ impl StateSync { info!( LOGGER, "fast_sync: peer connection lost: {:?}. restart", p.info.addr, - ); + ); } } } @@ -100,8 +112,9 @@ impl StateSync { error!( LOGGER, "fast_sync: TxHashsetDownload status timeout in 10 minutes!" - ); - self.sync_state.set_sync_error(Error::P2P(p2p::Error::Timeout)); + ); + self.sync_state + .set_sync_error(Error::P2P(p2p::Error::Timeout)); } if go { @@ -114,10 +127,9 @@ impl StateSync { } self.sync_state.update(SyncStatus::TxHashsetDownload); - return true; } } - false + true } fn request_state(&self, header_head: &chain::Tip) -> Result>, p2p::Error> { @@ -127,9 +139,15 @@ impl StateSync { if let Ok(p) = peer.try_read() { // ask for txhashset at 90% of horizon, this still leaves time for download // and validation to happen and stay within horizon - let mut txhashset_head = self.chain.get_block_header(&header_head.prev_block_h).unwrap(); + let mut txhashset_head = self + .chain + .get_block_header(&header_head.prev_block_h) + .unwrap(); for _ in 0..(horizon - horizon / 10) { - txhashset_head = self.chain.get_block_header(&txhashset_head.previous).unwrap(); + txhashset_head = self + .chain + .get_block_header(&txhashset_head.previous) + .unwrap(); } let bhash = txhashset_head.hash(); debug!( diff --git a/servers/src/grin/sync/syncer.rs b/servers/src/grin/sync/syncer.rs index 15314c534..3f998fb9b 100644 --- a/servers/src/grin/sync/syncer.rs +++ b/servers/src/grin/sync/syncer.rs @@ -14,15 +14,15 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use std::time; use std::thread; +use std::time; -use grin::sync::body_sync::BodySync; -use grin::sync::header_sync::HeaderSync; -use grin::sync::state_sync::StateSync; use chain; use common::types::{SyncState, SyncStatus}; use core::pow::Difficulty; +use grin::sync::body_sync::BodySync; +use grin::sync::header_sync::HeaderSync; +use grin::sync::state_sync::StateSync; use p2p::{self, Peers}; use util::LOGGER; @@ -34,7 +34,7 @@ pub fn run_sync( skip_sync_wait: bool, archive_mode: bool, stop: Arc, - ) { +) { let _ = thread::Builder::new() .name("sync".to_string()) .spawn(move || { @@ -46,7 +46,7 @@ pub fn run_sync( skip_sync_wait, archive_mode, stop, - ) + ) }); } @@ -100,7 +100,12 @@ fn sync_loop( // Our 3 main sync stages let mut header_sync = HeaderSync::new(sync_state.clone(), peers.clone(), chain.clone()); let mut body_sync = BodySync::new(sync_state.clone(), peers.clone(), chain.clone()); - let mut state_sync = StateSync::new(sync_state.clone(), peers.clone(), chain.clone(), archive_mode); + let mut state_sync = StateSync::new( + sync_state.clone(), + peers.clone(), + chain.clone(), + archive_mode, + ); // Highest height seen on the network, generally useful for a fast test on // whether some sync is needed @@ -130,9 +135,11 @@ fn sync_loop( let header_head = chain.get_header_head().unwrap(); // run each sync stage, each of them deciding whether they're needed + // except for body sync that only runs if state sync is off or done header_sync.check_run(&header_head, highest_height); - state_sync.check_run(&header_head, &head, highest_height); - body_sync.check_run(&head, highest_height); + if !state_sync.check_run(&header_head, &head, highest_height) { + body_sync.check_run(&head, highest_height); + } } }