diff --git a/servers/src/grin/sync.rs b/servers/src/grin/sync.rs index 9dedd1269..5499f3b9b 100644 --- a/servers/src/grin/sync.rs +++ b/servers/src/grin/sync.rs @@ -45,18 +45,56 @@ impl Syncer { archive_mode: bool, stop: Arc, ) { - sync::run_sync( - sync_state, - awaiting_peers, - peers, - chain, - skip_sync_wait, - archive_mode, - stop, - ) + let _ = thread::Builder::new() + .name("sync".to_string()) + .spawn(move || { + sync::run_sync( + sync_state, + awaiting_peers, + peers, + chain, + skip_sync_wait, + archive_mode, + stop, + ) + }); } } +fn wait_for_min_peers( + awaiting_peers: Arc, + peers: Arc, + chain: Arc, + skip_sync_wait: bool, +) { + // Initial sleep to give us time to peer with some nodes. + // Note: Even if we have "skip_sync_wait" we need to wait a + // short period of time for tests to do the right thing. + let wait_secs = if skip_sync_wait { 3 } else { 30 }; + + let head = chain.head().unwrap(); + + awaiting_peers.store(true, Ordering::Relaxed); + let mut n = 0; + const MIN_PEERS: usize = 3; + loop { + let wp = peers.more_work_peers(); + // exit loop when: + // * we have more than MIN_PEERS more_work peers + // * we are synced already, e.g. grin was quickly restarted + // * timeout + if wp.len() > MIN_PEERS + || (wp.len() == 0 && peers.enough_peers() && head.total_difficulty > Difficulty::zero()) + || n > wait_secs + { + break; + } + thread::sleep(time::Duration::from_secs(1)); + n += 1; + } + awaiting_peers.store(false, Ordering::Relaxed); +} + /// Starts the syncing loop, just spawns two threads that loop forever pub fn run_sync( sync_state: Arc, @@ -67,193 +105,194 @@ pub fn run_sync( archive_mode: bool, stop: Arc, ) { - let chain = chain.clone(); - let _ = - thread::Builder::new() - .name("sync".to_string()) - .spawn(move || { - let mut si = SyncInfo::new(); + let mut si = SyncInfo::new(); - { - // Initial sleep to give us time to peer with some nodes. - // Note: Even if we have "skip_sync_wait" we need to wait a - // short period of time for tests to do the right thing. - let wait_secs = if skip_sync_wait { 3 } else { 30 }; + // Wait for connections reach at least MIN_PEERS + wait_for_min_peers(awaiting_peers, peers.clone(), chain.clone(), skip_sync_wait); - let head = chain.head().unwrap(); + // fast sync has 3 "states": + // * syncing headers + // * once all headers are sync'd, requesting the txhashset state + // * once we have the state, get blocks after that + // + // full sync gets rid of the middle step and just starts from + // the genesis state - awaiting_peers.store(true, Ordering::Relaxed); - let mut n = 0; - const MIN_PEERS: usize = 3; - loop { - let wp = peers.more_work_peers(); - // exit loop when: - // * we have more than MIN_PEERS more_work peers - // * we are synced already, e.g. grin was quickly restarted - // * timeout - if wp.len() > MIN_PEERS - || (wp.len() == 0 - && peers.enough_peers() - && head.total_difficulty > Difficulty::zero()) || n > wait_secs - { - break; - } - thread::sleep(time::Duration::from_secs(1)); - n += 1; - } - awaiting_peers.store(false, Ordering::Relaxed); - } + let mut history_locators: Vec<(u64, Hash)> = vec![]; + let mut body_sync_info = BodySyncInfo { + sync_start_ts: Utc::now(), + body_sync_hashes: vec![], + prev_body_received: None, + prev_tip: chain.head().unwrap(), + prev_orphans_len: 0, + }; - // fast sync has 3 "states": - // * syncing headers - // * once all headers are sync'd, requesting the txhashset state - // * once we have the state, get blocks after that - // - // full sync gets rid of the middle step and just starts from - // the genesis state + // Main syncing loop + while !stop.load(Ordering::Relaxed) { + thread::sleep(time::Duration::from_millis(10)); - let mut history_locators: Vec<(u64, Hash)> = vec![]; - let mut body_sync_info = BodySyncInfo { - sync_start_ts: Utc::now(), - body_sync_hashes: vec![], - prev_body_received: None, - prev_tip: chain.head().unwrap(), - prev_orphans_len: 0, - }; + // check whether syncing is generally needed, when we compare our state with others + let (syncing, most_work_height) = + needs_syncing(sync_state.as_ref(), peers.clone(), chain.clone()); - loop { - let horizon = global::cut_through_horizon() as u64; - let head = chain.head().unwrap(); - let header_head = chain.get_header_head().unwrap(); + if most_work_height > 0 { + // we can occasionally get a most work height of 0 if read locks fail + si.highest_height = most_work_height; + } - // is syncing generally needed when we compare our state with others - let (syncing, most_work_height) = - needs_syncing(sync_state.as_ref(), peers.clone(), chain.clone()); + // if no syncing is needed + if !syncing { + sync_state.update(SyncStatus::NoSync); + continue; + } - if most_work_height > 0 { - // we can occasionally get a most work height of 0 if read locks fail - si.highest_height = most_work_height; - } + // if syncing is needed + let head = chain.head().unwrap(); + let header_head = chain.get_header_head().unwrap(); - if syncing { - let fast_sync_enabled = !archive_mode - && si.highest_height.saturating_sub(head.height) > horizon; + // run the header sync in every 10s at least + if si.header_sync_due(&header_head) { + do_header_sync( + sync_state.as_ref(), + header_head.clone(), + peers.clone(), + chain.clone(), + &si, + &mut history_locators, + ); + } - // run the header sync every 10s - if si.header_sync_due(&header_head) { - let status = sync_state.status(); - let update_sync_state = match status { - SyncStatus::TxHashsetDownload => false, - SyncStatus::NoSync | SyncStatus::Initial => { - // Reset sync_head to header_head on transition to HeaderSync, - // but ONLY on initial transition to HeaderSync state. - let sync_head = chain.get_sync_head().unwrap(); - debug!( - LOGGER, - "sync: initial transition to HeaderSync. sync_head: {} at {}, reset to: {} at {}", - sync_head.hash(), - sync_head.height, - header_head.hash(), - header_head.height, - ); - chain.init_sync_head(&header_head).unwrap(); - history_locators.clear(); - true - } - _ => true, - }; - if update_sync_state { - sync_state.update(SyncStatus::HeaderSync { - current_height: header_head.height, - highest_height: si.highest_height, - }); - } - header_sync(peers.clone(), chain.clone(), &mut history_locators); - } + // if fast_sync is enabled and needed + let need_fast_sync = !archive_mode + && si.highest_height.saturating_sub(head.height) > global::cut_through_horizon() as u64; + if need_fast_sync { + do_fast_sync( + sync_state.as_ref(), + header_head, + peers.clone(), + chain.clone(), + &mut si, + ); - if fast_sync_enabled { - { - let mut sync_need_restart = false; - // check sync error - { - let clone = sync_state.sync_error(); - if let Some(ref sync_error) = *clone.read().unwrap() { - error!( - LOGGER, - "fast_sync: error = {:?}. restart fast sync", - sync_error - ); - sync_need_restart = true; - } - drop(clone); - } + continue; + } - // check peer connection status of this sync - if let Some(ref peer) = si.fast_sync_peer { - if let Ok(p) = peer.try_read() { - if !p.is_connected() - && SyncStatus::TxHashsetDownload - == sync_state.status() - { - sync_need_restart = true; - info!( - LOGGER, - "fast_sync: peer connection lost: {:?}. restart", - p.info.addr, - ); - } - } - } + // if fast_sync disabled or not needed, run the body_sync every 5s + if si.body_sync_due(&head, chain.clone(), &mut body_sync_info) { + body_sync(peers.clone(), chain.clone(), &mut body_sync_info); - if sync_need_restart { - si.fast_sync_reset(); - sync_state.clear_sync_error(); - } - } - - // run fast sync if applicable, normally only run one-time, except restart in error - if header_head.height == si.highest_height { - let (go, download_timeout) = si.fast_sync_due(); - - if go { - si.fast_sync_peer = None; - match fast_sync(peers.clone(), chain.clone(), &header_head) { - Ok(peer) => { - si.fast_sync_peer = Some(peer); - } - Err(e) => sync_state.set_sync_error(Error::P2P(e)), - } - sync_state.update(SyncStatus::TxHashsetDownload); - } - - if SyncStatus::TxHashsetDownload == sync_state.status() - && download_timeout - { - error!(LOGGER, "fast_sync: TxHashsetDownload status timeout in 10 minutes!"); - sync_state.set_sync_error(Error::P2P(p2p::Error::Timeout)); - } - } - } else { - // run the body_sync every 5s - if si.body_sync_due(&head, chain.clone(), &mut body_sync_info) { - body_sync(peers.clone(), chain.clone(), &mut body_sync_info); - sync_state.update(SyncStatus::BodySync { - current_height: head.height, - highest_height: si.highest_height, - }); - } - } - } else { - sync_state.update(SyncStatus::NoSync); - } - - thread::sleep(time::Duration::from_millis(10)); - - if stop.load(Ordering::Relaxed) { - break; - } - } + sync_state.update(SyncStatus::BodySync { + current_height: head.height, + highest_height: si.highest_height, }); + } + } +} + +fn do_header_sync( + sync_state: &SyncState, + header_head: chain::Tip, + peers: Arc, + chain: Arc, + si: &SyncInfo, + history_locators: &mut Vec<(u64, Hash)>, +) { + let status = sync_state.status(); + + let update_sync_state = match status { + SyncStatus::TxHashsetDownload => false, + SyncStatus::NoSync | SyncStatus::Initial => { + // Reset sync_head to header_head on transition to HeaderSync, + // but ONLY on initial transition to HeaderSync state. + let sync_head = chain.get_sync_head().unwrap(); + debug!( + LOGGER, + "sync: initial transition to HeaderSync. sync_head: {} at {}, reset to: {} at {}", + sync_head.hash(), + sync_head.height, + header_head.hash(), + header_head.height, + ); + chain.init_sync_head(&header_head).unwrap(); + history_locators.clear(); + true + } + _ => true, + }; + + if update_sync_state { + sync_state.update(SyncStatus::HeaderSync { + current_height: header_head.height, + highest_height: si.highest_height, + }); + } + + header_sync(peers, chain, history_locators); +} + +fn do_fast_sync( + sync_state: &SyncState, + header_head: chain::Tip, + peers: Arc, + chain: Arc, + si: &mut SyncInfo, +) { + let mut sync_need_restart = false; + + // check sync error + { + let clone = sync_state.sync_error(); + if let Some(ref sync_error) = *clone.read().unwrap() { + error!( + LOGGER, + "fast_sync: error = {:?}. restart fast sync", sync_error + ); + sync_need_restart = true; + } + drop(clone); + } + + // check peer connection status of this sync + if let Some(ref peer) = si.fast_sync_peer { + if let Ok(p) = peer.try_read() { + if !p.is_connected() && SyncStatus::TxHashsetDownload == sync_state.status() { + sync_need_restart = true; + info!( + LOGGER, + "fast_sync: peer connection lost: {:?}. restart", p.info.addr, + ); + } + } + } + + if sync_need_restart { + si.fast_sync_reset(); + sync_state.clear_sync_error(); + } + + // run fast sync if applicable, normally only run one-time, except restart in error + if header_head.height == si.highest_height { + let (go, download_timeout) = si.fast_sync_due(); + + if go { + si.fast_sync_peer = None; + match fast_sync(peers, chain, &header_head) { + Ok(peer) => { + si.fast_sync_peer = Some(peer); + } + Err(e) => sync_state.set_sync_error(Error::P2P(e)), + } + sync_state.update(SyncStatus::TxHashsetDownload); + } + + if download_timeout && SyncStatus::TxHashsetDownload == sync_state.status() { + error!( + LOGGER, + "fast_sync: TxHashsetDownload status timeout in 10 minutes!" + ); + sync_state.set_sync_error(Error::P2P(p2p::Error::Timeout)); + } + } } struct BodySyncInfo { @@ -376,8 +415,7 @@ fn body_sync(peers: Arc, chain: Arc, body_sync_info: &mut B // only ask for blocks that we have not yet processed // either successfully stored or in our orphan list !chain.get_block(x).is_ok() && !chain.is_orphan(x) - }) - .take(block_count) + }).take(block_count) .collect::>(); if hashes_to_get.len() > 0 { @@ -788,9 +826,7 @@ mod test { // headers assert_eq!( get_locator_heights(10000), - vec![ - 10000, 9998, 9994, 9986, 9970, 9938, 9874, 9746, 9490, 8978, 7954, 5906, 1810, 0, - ] + vec![10000, 9998, 9994, 9986, 9970, 9938, 9874, 9746, 9490, 8978, 7954, 5906, 1810, 0,] ); } }