Fix for body sync to only run when state sync is off or done

This commit is contained in:
Ignotus Peverell 2018-10-05 04:34:47 +00:00
parent d7d56733e5
commit 8925d1e48d
No known key found for this signature in database
GPG key ID: 99CD25F39F8F8211
4 changed files with 73 additions and 35 deletions

View file

@ -14,8 +14,8 @@
use chrono::prelude::{DateTime, Utc};
use chrono::Duration;
use std::sync::Arc;
use std::cmp;
use std::sync::Arc;
use chain;
use common::types::{SyncState, SyncStatus};
@ -38,7 +38,11 @@ pub struct BodySync {
}
impl BodySync {
pub fn new(sync_state: Arc<SyncState>, peers: Arc<p2p::Peers>, chain: Arc<chain::Chain>) -> BodySync {
pub fn new(
sync_state: Arc<SyncState>,
peers: Arc<p2p::Peers>,
chain: Arc<chain::Chain>,
) -> BodySync {
BodySync {
sync_state,
peers,
@ -186,7 +190,8 @@ impl BodySync {
match self.prev_body_received {
Some(prev_ts) => {
if tip.last_block_h == self.prev_tip.last_block_h
&& self.chain.orphans_len() + self.chain.orphans_evicted_len() == self.prev_orphans_len
&& self.chain.orphans_len() + self.chain.orphans_evicted_len()
== self.prev_orphans_len
&& Utc::now() - prev_ts > Duration::milliseconds(200)
{
let hashes_not_get = self
@ -226,4 +231,3 @@ impl BodySync {
return false;
}
}

View file

@ -32,7 +32,11 @@ pub struct HeaderSync {
}
impl HeaderSync {
pub fn new(sync_state: Arc<SyncState>, peers: Arc<p2p::Peers>, chain: Arc<chain::Chain>) -> HeaderSync {
pub fn new(
sync_state: Arc<SyncState>,
peers: Arc<p2p::Peers>,
chain: Arc<chain::Chain>,
) -> HeaderSync {
HeaderSync {
sync_state,
peers,
@ -130,14 +134,13 @@ impl HeaderSync {
}
}
/// Request some block headers from a peer to advance us.
fn request_headers(&mut self, peer: &Peer) {
if let Ok(locator) = self.get_locator() {
debug!(
LOGGER,
"sync: request_headers: asking {} for headers, {:?}", peer.info.addr, locator,
);
);
let _ = peer.send_header_request(locator);
}
@ -169,10 +172,10 @@ impl HeaderSync {
new_heights.push(header.height);
if self.history_locators.len() > 0
&& tip.height - header.height + 1 >= p2p::MAX_BLOCK_HEADERS as u64 - 1
{
this_height = header.height;
break;
}
{
this_height = header.height;
break;
}
}
current = self.chain.get_block_header(&header.previous);
}
@ -197,7 +200,8 @@ impl HeaderSync {
let this_height_index = heights.iter().position(|&r| r == this_height).unwrap();
let next_height = heights[this_height_index + 1];
let reuse_index = self.history_locators
let reuse_index = self
.history_locators
.iter()
.position(|&r| r.0 >= next_height)
.unwrap();
@ -221,7 +225,11 @@ impl HeaderSync {
// push height 0 if it's not there
if new_heights[new_heights.len() - 1] != 0 {
locator.push(self.history_locators[self.history_locators.len() - 1].1.clone());
locator.push(
self.history_locators[self.history_locators.len() - 1]
.1
.clone(),
);
new_heights.push(0);
}
}
@ -232,7 +240,8 @@ impl HeaderSync {
if heights.len() > 1 {
let shrink_height = heights[heights.len() - 2];
let mut shrunk_size = 0;
let shrink_index = self.history_locators
let shrink_index = self
.history_locators
.iter()
.position(|&r| r.0 > shrink_height)
.unwrap();
@ -249,7 +258,7 @@ impl HeaderSync {
"sync: history locators: len={}, shrunk={}",
self.history_locators.len(),
shrunk_size
);
);
}
debug!(LOGGER, "sync: locator: {:?}", locator);

View file

@ -23,7 +23,6 @@ use core::global;
use p2p::{self, Peer};
use util::LOGGER;
/// Fast sync has 3 "states":
/// * syncing headers
/// * once all headers are sync'd, requesting the txhashset state
@ -41,7 +40,12 @@ pub struct StateSync {
}
impl StateSync {
pub fn new(sync_state: Arc<SyncState>, peers: Arc<p2p::Peers>, chain: Arc<chain::Chain>, archive_mode: bool) -> StateSync {
pub fn new(
sync_state: Arc<SyncState>,
peers: Arc<p2p::Peers>,
chain: Arc<chain::Chain>,
archive_mode: bool,
) -> StateSync {
StateSync {
sync_state,
peers,
@ -52,7 +56,15 @@ impl StateSync {
}
}
pub fn check_run(&mut self, header_head: &chain::Tip, head: &chain::Tip, highest_height: u64) -> bool {
/// Check whether state sync should run and triggers a state download when
/// it's time (we have all headers). Returns true as long as state sync
/// needs monitoring, false when it's either done or turned off.
pub fn check_run(
&mut self,
header_head: &chain::Tip,
head: &chain::Tip,
highest_height: u64,
) -> bool {
let need_state_sync = !self.archive_mode
&& highest_height.saturating_sub(head.height) > global::cut_through_horizon() as u64;
if !need_state_sync {
@ -68,7 +80,7 @@ impl StateSync {
error!(
LOGGER,
"fast_sync: error = {:?}. restart fast sync", sync_error
);
);
sync_need_restart = true;
}
drop(clone);
@ -82,7 +94,7 @@ impl StateSync {
info!(
LOGGER,
"fast_sync: peer connection lost: {:?}. restart", p.info.addr,
);
);
}
}
}
@ -100,8 +112,9 @@ impl StateSync {
error!(
LOGGER,
"fast_sync: TxHashsetDownload status timeout in 10 minutes!"
);
self.sync_state.set_sync_error(Error::P2P(p2p::Error::Timeout));
);
self.sync_state
.set_sync_error(Error::P2P(p2p::Error::Timeout));
}
if go {
@ -114,10 +127,9 @@ impl StateSync {
}
self.sync_state.update(SyncStatus::TxHashsetDownload);
return true;
}
}
false
true
}
fn request_state(&self, header_head: &chain::Tip) -> Result<Arc<RwLock<Peer>>, p2p::Error> {
@ -127,9 +139,15 @@ impl StateSync {
if let Ok(p) = peer.try_read() {
// ask for txhashset at 90% of horizon, this still leaves time for download
// and validation to happen and stay within horizon
let mut txhashset_head = self.chain.get_block_header(&header_head.prev_block_h).unwrap();
let mut txhashset_head = self
.chain
.get_block_header(&header_head.prev_block_h)
.unwrap();
for _ in 0..(horizon - horizon / 10) {
txhashset_head = self.chain.get_block_header(&txhashset_head.previous).unwrap();
txhashset_head = self
.chain
.get_block_header(&txhashset_head.previous)
.unwrap();
}
let bhash = txhashset_head.hash();
debug!(

View file

@ -14,15 +14,15 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time;
use std::thread;
use std::time;
use grin::sync::body_sync::BodySync;
use grin::sync::header_sync::HeaderSync;
use grin::sync::state_sync::StateSync;
use chain;
use common::types::{SyncState, SyncStatus};
use core::pow::Difficulty;
use grin::sync::body_sync::BodySync;
use grin::sync::header_sync::HeaderSync;
use grin::sync::state_sync::StateSync;
use p2p::{self, Peers};
use util::LOGGER;
@ -34,7 +34,7 @@ pub fn run_sync(
skip_sync_wait: bool,
archive_mode: bool,
stop: Arc<AtomicBool>,
) {
) {
let _ = thread::Builder::new()
.name("sync".to_string())
.spawn(move || {
@ -46,7 +46,7 @@ pub fn run_sync(
skip_sync_wait,
archive_mode,
stop,
)
)
});
}
@ -100,7 +100,12 @@ fn sync_loop(
// Our 3 main sync stages
let mut header_sync = HeaderSync::new(sync_state.clone(), peers.clone(), chain.clone());
let mut body_sync = BodySync::new(sync_state.clone(), peers.clone(), chain.clone());
let mut state_sync = StateSync::new(sync_state.clone(), peers.clone(), chain.clone(), archive_mode);
let mut state_sync = StateSync::new(
sync_state.clone(),
peers.clone(),
chain.clone(),
archive_mode,
);
// Highest height seen on the network, generally useful for a fast test on
// whether some sync is needed
@ -130,9 +135,11 @@ fn sync_loop(
let header_head = chain.get_header_head().unwrap();
// run each sync stage, each of them deciding whether they're needed
// except for body sync that only runs if state sync is off or done
header_sync.check_run(&header_head, highest_height);
state_sync.check_run(&header_head, &head, highest_height);
body_sync.check_run(&head, highest_height);
if !state_sync.check_run(&header_head, &head, highest_height) {
body_sync.check_run(&head, highest_height);
}
}
}