Refactor: run_sync() ballooning, extract 3 utility functions (#1562)

This commit is contained in:
Gary Yu 2018-09-21 01:00:09 +08:00 committed by Ignotus Peverell
parent 0d7fa06fe0
commit 3adddfba76

View file

@ -45,18 +45,56 @@ impl Syncer {
archive_mode: bool,
stop: Arc<AtomicBool>,
) {
sync::run_sync(
sync_state,
awaiting_peers,
peers,
chain,
skip_sync_wait,
archive_mode,
stop,
)
let _ = thread::Builder::new()
.name("sync".to_string())
.spawn(move || {
sync::run_sync(
sync_state,
awaiting_peers,
peers,
chain,
skip_sync_wait,
archive_mode,
stop,
)
});
}
}
fn wait_for_min_peers(
awaiting_peers: Arc<AtomicBool>,
peers: Arc<p2p::Peers>,
chain: Arc<chain::Chain>,
skip_sync_wait: bool,
) {
// Initial sleep to give us time to peer with some nodes.
// Note: Even if we have "skip_sync_wait" we need to wait a
// short period of time for tests to do the right thing.
let wait_secs = if skip_sync_wait { 3 } else { 30 };
let head = chain.head().unwrap();
awaiting_peers.store(true, Ordering::Relaxed);
let mut n = 0;
const MIN_PEERS: usize = 3;
loop {
let wp = peers.more_work_peers();
// exit loop when:
// * we have more than MIN_PEERS more_work peers
// * we are synced already, e.g. grin was quickly restarted
// * timeout
if wp.len() > MIN_PEERS
|| (wp.len() == 0 && peers.enough_peers() && head.total_difficulty > Difficulty::zero())
|| n > wait_secs
{
break;
}
thread::sleep(time::Duration::from_secs(1));
n += 1;
}
awaiting_peers.store(false, Ordering::Relaxed);
}
/// Starts the syncing loop, just spawns two threads that loop forever
pub fn run_sync(
sync_state: Arc<SyncState>,
@ -67,193 +105,194 @@ pub fn run_sync(
archive_mode: bool,
stop: Arc<AtomicBool>,
) {
let chain = chain.clone();
let _ =
thread::Builder::new()
.name("sync".to_string())
.spawn(move || {
let mut si = SyncInfo::new();
let mut si = SyncInfo::new();
{
// Initial sleep to give us time to peer with some nodes.
// Note: Even if we have "skip_sync_wait" we need to wait a
// short period of time for tests to do the right thing.
let wait_secs = if skip_sync_wait { 3 } else { 30 };
// Wait for connections reach at least MIN_PEERS
wait_for_min_peers(awaiting_peers, peers.clone(), chain.clone(), skip_sync_wait);
let head = chain.head().unwrap();
// fast sync has 3 "states":
// * syncing headers
// * once all headers are sync'd, requesting the txhashset state
// * once we have the state, get blocks after that
//
// full sync gets rid of the middle step and just starts from
// the genesis state
awaiting_peers.store(true, Ordering::Relaxed);
let mut n = 0;
const MIN_PEERS: usize = 3;
loop {
let wp = peers.more_work_peers();
// exit loop when:
// * we have more than MIN_PEERS more_work peers
// * we are synced already, e.g. grin was quickly restarted
// * timeout
if wp.len() > MIN_PEERS
|| (wp.len() == 0
&& peers.enough_peers()
&& head.total_difficulty > Difficulty::zero()) || n > wait_secs
{
break;
}
thread::sleep(time::Duration::from_secs(1));
n += 1;
}
awaiting_peers.store(false, Ordering::Relaxed);
}
let mut history_locators: Vec<(u64, Hash)> = vec![];
let mut body_sync_info = BodySyncInfo {
sync_start_ts: Utc::now(),
body_sync_hashes: vec![],
prev_body_received: None,
prev_tip: chain.head().unwrap(),
prev_orphans_len: 0,
};
// fast sync has 3 "states":
// * syncing headers
// * once all headers are sync'd, requesting the txhashset state
// * once we have the state, get blocks after that
//
// full sync gets rid of the middle step and just starts from
// the genesis state
// Main syncing loop
while !stop.load(Ordering::Relaxed) {
thread::sleep(time::Duration::from_millis(10));
let mut history_locators: Vec<(u64, Hash)> = vec![];
let mut body_sync_info = BodySyncInfo {
sync_start_ts: Utc::now(),
body_sync_hashes: vec![],
prev_body_received: None,
prev_tip: chain.head().unwrap(),
prev_orphans_len: 0,
};
// check whether syncing is generally needed, when we compare our state with others
let (syncing, most_work_height) =
needs_syncing(sync_state.as_ref(), peers.clone(), chain.clone());
loop {
let horizon = global::cut_through_horizon() as u64;
let head = chain.head().unwrap();
let header_head = chain.get_header_head().unwrap();
if most_work_height > 0 {
// we can occasionally get a most work height of 0 if read locks fail
si.highest_height = most_work_height;
}
// is syncing generally needed when we compare our state with others
let (syncing, most_work_height) =
needs_syncing(sync_state.as_ref(), peers.clone(), chain.clone());
// if no syncing is needed
if !syncing {
sync_state.update(SyncStatus::NoSync);
continue;
}
if most_work_height > 0 {
// we can occasionally get a most work height of 0 if read locks fail
si.highest_height = most_work_height;
}
// if syncing is needed
let head = chain.head().unwrap();
let header_head = chain.get_header_head().unwrap();
if syncing {
let fast_sync_enabled = !archive_mode
&& si.highest_height.saturating_sub(head.height) > horizon;
// run the header sync in every 10s at least
if si.header_sync_due(&header_head) {
do_header_sync(
sync_state.as_ref(),
header_head.clone(),
peers.clone(),
chain.clone(),
&si,
&mut history_locators,
);
}
// run the header sync every 10s
if si.header_sync_due(&header_head) {
let status = sync_state.status();
let update_sync_state = match status {
SyncStatus::TxHashsetDownload => false,
SyncStatus::NoSync | SyncStatus::Initial => {
// Reset sync_head to header_head on transition to HeaderSync,
// but ONLY on initial transition to HeaderSync state.
let sync_head = chain.get_sync_head().unwrap();
debug!(
LOGGER,
"sync: initial transition to HeaderSync. sync_head: {} at {}, reset to: {} at {}",
sync_head.hash(),
sync_head.height,
header_head.hash(),
header_head.height,
);
chain.init_sync_head(&header_head).unwrap();
history_locators.clear();
true
}
_ => true,
};
if update_sync_state {
sync_state.update(SyncStatus::HeaderSync {
current_height: header_head.height,
highest_height: si.highest_height,
});
}
header_sync(peers.clone(), chain.clone(), &mut history_locators);
}
// if fast_sync is enabled and needed
let need_fast_sync = !archive_mode
&& si.highest_height.saturating_sub(head.height) > global::cut_through_horizon() as u64;
if need_fast_sync {
do_fast_sync(
sync_state.as_ref(),
header_head,
peers.clone(),
chain.clone(),
&mut si,
);
if fast_sync_enabled {
{
let mut sync_need_restart = false;
// check sync error
{
let clone = sync_state.sync_error();
if let Some(ref sync_error) = *clone.read().unwrap() {
error!(
LOGGER,
"fast_sync: error = {:?}. restart fast sync",
sync_error
);
sync_need_restart = true;
}
drop(clone);
}
continue;
}
// check peer connection status of this sync
if let Some(ref peer) = si.fast_sync_peer {
if let Ok(p) = peer.try_read() {
if !p.is_connected()
&& SyncStatus::TxHashsetDownload
== sync_state.status()
{
sync_need_restart = true;
info!(
LOGGER,
"fast_sync: peer connection lost: {:?}. restart",
p.info.addr,
);
}
}
}
// if fast_sync disabled or not needed, run the body_sync every 5s
if si.body_sync_due(&head, chain.clone(), &mut body_sync_info) {
body_sync(peers.clone(), chain.clone(), &mut body_sync_info);
if sync_need_restart {
si.fast_sync_reset();
sync_state.clear_sync_error();
}
}
// run fast sync if applicable, normally only run one-time, except restart in error
if header_head.height == si.highest_height {
let (go, download_timeout) = si.fast_sync_due();
if go {
si.fast_sync_peer = None;
match fast_sync(peers.clone(), chain.clone(), &header_head) {
Ok(peer) => {
si.fast_sync_peer = Some(peer);
}
Err(e) => sync_state.set_sync_error(Error::P2P(e)),
}
sync_state.update(SyncStatus::TxHashsetDownload);
}
if SyncStatus::TxHashsetDownload == sync_state.status()
&& download_timeout
{
error!(LOGGER, "fast_sync: TxHashsetDownload status timeout in 10 minutes!");
sync_state.set_sync_error(Error::P2P(p2p::Error::Timeout));
}
}
} else {
// run the body_sync every 5s
if si.body_sync_due(&head, chain.clone(), &mut body_sync_info) {
body_sync(peers.clone(), chain.clone(), &mut body_sync_info);
sync_state.update(SyncStatus::BodySync {
current_height: head.height,
highest_height: si.highest_height,
});
}
}
} else {
sync_state.update(SyncStatus::NoSync);
}
thread::sleep(time::Duration::from_millis(10));
if stop.load(Ordering::Relaxed) {
break;
}
}
sync_state.update(SyncStatus::BodySync {
current_height: head.height,
highest_height: si.highest_height,
});
}
}
}
fn do_header_sync(
sync_state: &SyncState,
header_head: chain::Tip,
peers: Arc<p2p::Peers>,
chain: Arc<chain::Chain>,
si: &SyncInfo,
history_locators: &mut Vec<(u64, Hash)>,
) {
let status = sync_state.status();
let update_sync_state = match status {
SyncStatus::TxHashsetDownload => false,
SyncStatus::NoSync | SyncStatus::Initial => {
// Reset sync_head to header_head on transition to HeaderSync,
// but ONLY on initial transition to HeaderSync state.
let sync_head = chain.get_sync_head().unwrap();
debug!(
LOGGER,
"sync: initial transition to HeaderSync. sync_head: {} at {}, reset to: {} at {}",
sync_head.hash(),
sync_head.height,
header_head.hash(),
header_head.height,
);
chain.init_sync_head(&header_head).unwrap();
history_locators.clear();
true
}
_ => true,
};
if update_sync_state {
sync_state.update(SyncStatus::HeaderSync {
current_height: header_head.height,
highest_height: si.highest_height,
});
}
header_sync(peers, chain, history_locators);
}
fn do_fast_sync(
sync_state: &SyncState,
header_head: chain::Tip,
peers: Arc<p2p::Peers>,
chain: Arc<chain::Chain>,
si: &mut SyncInfo,
) {
let mut sync_need_restart = false;
// check sync error
{
let clone = sync_state.sync_error();
if let Some(ref sync_error) = *clone.read().unwrap() {
error!(
LOGGER,
"fast_sync: error = {:?}. restart fast sync", sync_error
);
sync_need_restart = true;
}
drop(clone);
}
// check peer connection status of this sync
if let Some(ref peer) = si.fast_sync_peer {
if let Ok(p) = peer.try_read() {
if !p.is_connected() && SyncStatus::TxHashsetDownload == sync_state.status() {
sync_need_restart = true;
info!(
LOGGER,
"fast_sync: peer connection lost: {:?}. restart", p.info.addr,
);
}
}
}
if sync_need_restart {
si.fast_sync_reset();
sync_state.clear_sync_error();
}
// run fast sync if applicable, normally only run one-time, except restart in error
if header_head.height == si.highest_height {
let (go, download_timeout) = si.fast_sync_due();
if go {
si.fast_sync_peer = None;
match fast_sync(peers, chain, &header_head) {
Ok(peer) => {
si.fast_sync_peer = Some(peer);
}
Err(e) => sync_state.set_sync_error(Error::P2P(e)),
}
sync_state.update(SyncStatus::TxHashsetDownload);
}
if download_timeout && SyncStatus::TxHashsetDownload == sync_state.status() {
error!(
LOGGER,
"fast_sync: TxHashsetDownload status timeout in 10 minutes!"
);
sync_state.set_sync_error(Error::P2P(p2p::Error::Timeout));
}
}
}
struct BodySyncInfo {
@ -376,8 +415,7 @@ fn body_sync(peers: Arc<Peers>, chain: Arc<chain::Chain>, body_sync_info: &mut B
// only ask for blocks that we have not yet processed
// either successfully stored or in our orphan list
!chain.get_block(x).is_ok() && !chain.is_orphan(x)
})
.take(block_count)
}).take(block_count)
.collect::<Vec<_>>();
if hashes_to_get.len() > 0 {
@ -788,9 +826,7 @@ mod test {
// headers
assert_eq!(
get_locator_heights(10000),
vec![
10000, 9998, 9994, 9986, 9970, 9938, 9874, 9746, 9490, 8978, 7954, 5906, 1810, 0,
]
vec![10000, 9998, 9994, 9986, 9970, 9938, 9874, 9746, 9490, 8978, 7954, 5906, 1810, 0,]
);
}
}