From 5ba0dbf38d94ace200aade9f6a8bb3f5a88df4a9 Mon Sep 17 00:00:00 2001 From: Ignotus Peverell Date: Fri, 30 Mar 2018 06:02:40 +0000 Subject: [PATCH] Fix and cleanup of fast sync triggering logic (#916) * Fix and cleanup of fast sync triggering logic * New txhashset on fast sync has to be applied, not rolled back * Do not block if peer send buffer is full, fixes #912 --- chain/src/chain.rs | 9 ++- chain/src/txhashset.rs | 11 +++- grin/src/server.rs | 2 +- grin/src/sync.rs | 125 ++++++++++++++++++++++------------------- p2p/src/conn.rs | 2 +- p2p/src/types.rs | 7 ++- 6 files changed, 90 insertions(+), 66 deletions(-) diff --git a/chain/src/chain.rs b/chain/src/chain.rs index 026544a91..9ddf20108 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -533,11 +533,18 @@ impl Chain { self.store .save_block_marker(&h, &(rewind_to_output, rewind_to_kernel))?; + debug!( + LOGGER, + "Going to validate new txhashset, might take some time..." + ); let mut txhashset = txhashset::TxHashSet::open(self.db_root.clone(), self.store.clone(), None)?; txhashset::extending(&mut txhashset, |extension| { extension.validate(&header, false)?; - // TODO validate kernels and their sums with Outputs + + // validate rewinds and rollbacks, in this specific case we want to + // apply the rewind + extension.cancel_rollback(); extension.rebuild_index()?; Ok(()) })?; diff --git a/chain/src/txhashset.rs b/chain/src/txhashset.rs index 62c7929ca..0c1202eea 100644 --- a/chain/src/txhashset.rs +++ b/chain/src/txhashset.rs @@ -146,7 +146,7 @@ impl TxHashSet { let output_pmmr: PMMR = PMMR::at(&mut self.output_pmmr_h.backend, self.output_pmmr_h.last_pos); if let Some(hash) = output_pmmr.get_hash(pos) { - if hash == output_id.hash_with_index(pos-1) { + if hash == output_id.hash_with_index(pos - 1) { Ok(hash) } else { Err(Error::TxHashSetErr(format!("txhashset hash mismatch"))) @@ -386,7 +386,7 @@ impl<'a> Extension<'a> { let commit = input.commitment(); let pos_res = self.get_output_pos(&commit); if let Ok(pos) = pos_res { - let output_id_hash = OutputIdentifier::from_input(input).hash_with_index(pos-1); + let output_id_hash = OutputIdentifier::from_input(input).hash_with_index(pos - 1); if let Some(read_hash) = self.output_pmmr.get_hash(pos) { // check hash from pmmr matches hash from input (or corresponding output) // if not then the input is not being honest about @@ -398,7 +398,7 @@ impl<'a> Extension<'a> { || output_id_hash != read_elem .expect("no output at position") - .hash_with_index(pos-1) + .hash_with_index(pos - 1) { return Err(Error::TxHashSetErr(format!("output pmmr hash mismatch"))); } @@ -650,6 +650,11 @@ impl<'a> Extension<'a> { self.rollback = true; } + /// Cancel a previous rollback, to apply this extension + pub fn cancel_rollback(&mut self) { + self.rollback = false; + } + /// Dumps the output MMR. /// We use this after compacting for visual confirmation that it worked. pub fn dump_output_pmmr(&self) { diff --git a/grin/src/server.rs b/grin/src/server.rs index 765fdf722..c3e5a91d2 100644 --- a/grin/src/server.rs +++ b/grin/src/server.rs @@ -178,7 +178,7 @@ impl Server { p2p_server.peers.clone(), shared_chain.clone(), skip_sync_wait, - !archive_mode, + archive_mode, stop.clone(), ); diff --git a/grin/src/sync.rs b/grin/src/sync.rs index 0041e8f53..8a5fe636c 100644 --- a/grin/src/sync.rs +++ b/grin/src/sync.rs @@ -33,7 +33,7 @@ pub fn run_sync( peers: Arc, chain: Arc, skip_sync_wait: bool, - fast_sync: bool, + archive_mode: bool, stop: Arc, ) { let chain = chain.clone(); @@ -42,7 +42,8 @@ pub fn run_sync( .spawn(move || { let mut prev_body_sync = time::now_utc(); let mut prev_header_sync = prev_body_sync.clone(); - let mut prev_state_sync = prev_body_sync.clone() - time::Duration::seconds(5 * 60); + let mut prev_fast_sync = prev_body_sync.clone() - time::Duration::seconds(5 * 60); + let mut highest_height = 0; // initial sleep to give us time to peer with some nodes if !skip_sync_wait { @@ -64,16 +65,25 @@ pub fn run_sync( let head = chain.head().unwrap(); let header_head = chain.get_header_head().unwrap(); - // in archival nodes (no fast sync) we just consider we have the whole - // state already - let have_txhashset = - !fast_sync || header_head.height.saturating_sub(head.height) <= horizon; + // is syncing generally needed when we compare our state with others + let (syncing, most_work_height) = + needs_syncing(currently_syncing.as_ref(), peers.clone(), chain.clone()); - let mut syncing = needs_syncing( - currently_syncing.as_ref(), - peers.clone(), - chain.clone(), - !have_txhashset, + if most_work_height > 0 { + // we can occasionally get a most work height of 0 if read locks fail + highest_height = most_work_height; + } + + // in archival nodes (no fast sync) we just consider we have the whole + // state already, then fast sync triggers if other peers are much + // further ahead + let fast_sync_enabled = + !archive_mode && highest_height.saturating_sub(head.height) > horizon; + + debug!(LOGGER, "syncing: {}, fast: {}", syncing, fast_sync_enabled); + debug!( + LOGGER, + "heights: {}, vs local {}", highest_height, header_head.height ); let current_time = time::now_utc(); @@ -85,42 +95,20 @@ pub fn run_sync( } // run the body_sync every 5s - if have_txhashset && current_time - prev_body_sync > time::Duration::seconds(5) + if !fast_sync_enabled + && current_time - prev_body_sync > time::Duration::seconds(5) { body_sync(peers.clone(), chain.clone()); prev_body_sync = current_time; } - } else if !have_txhashset && header_head.height > 0 { - if current_time - prev_state_sync > time::Duration::seconds(5 * 60) { - if let Some(peer) = peers.most_work_peer() { - if let Ok(p) = peer.try_read() { - // just to handle corner case of a too early start - if header_head.height > horizon { - debug!( - LOGGER, - "Header head before txhashset request: {} / {}", - header_head.height, - header_head.last_block_h - ); - // ask for txhashset at horizon - let mut txhashset_head = - chain.get_block_header(&header_head.prev_block_h).unwrap(); - for _ in 0..horizon - 2 { - txhashset_head = chain - .get_block_header(&txhashset_head.previous) - .unwrap(); - } - p.send_txhashset_request( - txhashset_head.height, - txhashset_head.hash(), - ).unwrap(); - prev_state_sync = current_time; - } - } + // run fast sync if applicable, every 5 min + if fast_sync_enabled && header_head.height == highest_height { + if current_time - prev_fast_sync > time::Duration::seconds(5 * 60) { + fast_sync(peers.clone(), chain.clone(), &header_head); + prev_fast_sync = current_time; } } - syncing = true; } currently_syncing.store(syncing, Ordering::Relaxed); @@ -198,14 +186,16 @@ fn body_sync(peers: Arc, chain: Arc) { let peer = peers.more_work_peer(); if let Some(peer) = peer { if let Ok(peer) = peer.try_read() { - let _ = peer.send_block_request(hash); + if let Err(e) = peer.send_block_request(hash) { + debug!(LOGGER, "Skipped request to {}: {:?}", peer.info.addr, e); + } } } } } } -pub fn header_sync(peers: Arc, chain: Arc) { +fn header_sync(peers: Arc, chain: Arc) { if let Ok(header_head) = chain.get_header_head() { let difficulty = header_head.total_difficulty; @@ -220,6 +210,29 @@ pub fn header_sync(peers: Arc, chain: Arc) { } } +fn fast_sync(peers: Arc, chain: Arc, header_head: &chain::Tip) { + let horizon = global::cut_through_horizon() as u64; + + if let Some(peer) = peers.most_work_peer() { + if let Ok(p) = peer.try_read() { + debug!( + LOGGER, + "Header head before txhashset request: {} / {}", + header_head.height, + header_head.last_block_h + ); + + // ask for txhashset at horizon + let mut txhashset_head = chain.get_block_header(&header_head.prev_block_h).unwrap(); + for _ in 0..horizon.saturating_sub(20) { + txhashset_head = chain.get_block_header(&txhashset_head.previous).unwrap(); + } + p.send_txhashset_request(txhashset_head.height, txhashset_head.hash()) + .unwrap(); + } + } +} + /// Request some block headers from a peer to advance us. fn request_headers(peer: Arc>, chain: Arc) -> Result<(), Error> { let locator = get_locator(chain)?; @@ -245,15 +258,11 @@ fn needs_syncing( currently_syncing: &AtomicBool, peers: Arc, chain: Arc, - header_only: bool, -) -> bool { - let local_diff = if header_only { - chain.total_header_difficulty().unwrap() - } else { - chain.total_difficulty() - }; +) -> (bool, u64) { + let local_diff = chain.total_difficulty(); let peer = peers.most_work_peer(); let is_syncing = currently_syncing.load(Ordering::Relaxed); + let mut most_work_height = 0; // if we're already syncing, we're caught up if no peer has a higher // difficulty than us @@ -262,8 +271,9 @@ fn needs_syncing( if let Ok(peer) = peer.try_read() { debug!( LOGGER, - "needs_syncing {} {} {}", local_diff, peer.info.total_difficulty, header_only + "needs_syncing {} {}", local_diff, peer.info.total_difficulty ); + most_work_height = peer.info.height; if peer.info.total_difficulty <= local_diff { let ch = chain.head().unwrap(); @@ -274,19 +284,20 @@ fn needs_syncing( ch.height, ch.last_block_h ); - if !header_only { - let _ = chain.reset_head(); - } - return false; + + let _ = chain.reset_head(); + return (false, 0); } } } else { warn!(LOGGER, "sync: no peers available, disabling sync"); - return false; + return (false, 0); } } else { if let Some(peer) = peer { if let Ok(peer) = peer.try_read() { + most_work_height = peer.info.height; + // sum the last 5 difficulties to give us the threshold let threshold = chain .difficulty_iter() @@ -302,12 +313,12 @@ fn needs_syncing( peer.info.total_difficulty, threshold, ); - return true; + return (true, most_work_height); } } } } - is_syncing + (is_syncing, most_work_height) } /// We build a locator based on sync_head. diff --git a/p2p/src/conn.rs b/p2p/src/conn.rs index a7ed47405..f1c99978c 100644 --- a/p2p/src/conn.rs +++ b/p2p/src/conn.rs @@ -155,7 +155,7 @@ impl Tracker { T: ser::Writeable, { let buf = write_to_buf(body, msg_type); - self.send_channel.send(buf)?; + self.send_channel.try_send(buf)?; Ok(()) } } diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 78f0695ce..2a93ff6e1 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -65,6 +65,7 @@ pub enum Error { us: Hash, peer: Hash, }, + Send(String), } impl From for Error { @@ -82,9 +83,9 @@ impl From for Error { Error::Connection(e) } } -impl From> for Error { - fn from(_e: mpsc::SendError) -> Error { - Error::ConnectionClose +impl From> for Error { + fn from(e: mpsc::TrySendError) -> Error { + Error::Send(e.to_string()) } } // impl From for Error {