diff --git a/api/src/handlers/server_api.rs b/api/src/handlers/server_api.rs index 6e47f1c30..73770caad 100644 --- a/api/src/handlers/server_api.rs +++ b/api/src/handlers/server_api.rs @@ -79,11 +79,12 @@ fn sync_status_to_api(sync_status: SyncStatus) -> (String, Option ("no_sync".to_string(), None), SyncStatus::AwaitingPeers(_) => ("awaiting_peers".to_string(), None), SyncStatus::HeaderSync { - current_height, + sync_head, highest_height, + .. } => ( "header_sync".to_string(), - Some(json!({ "current_height": current_height, "highest_height": highest_height })), + Some(json!({ "current_height": sync_head.height, "highest_height": highest_height })), ), SyncStatus::TxHashsetDownload(stats) => ( "txhashset_download".to_string(), diff --git a/chain/src/chain.rs b/chain/src/chain.rs index 1d44ec9b0..627fedc9f 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -404,19 +404,23 @@ impl Chain { /// Attempt to add new headers to the header chain (or fork). /// This is only ever used during sync and is based on sync_head. /// We update header_head here if our total work increases. - pub fn sync_block_headers(&self, headers: &[BlockHeader], opts: Options) -> Result<(), Error> { + /// Returns the new sync_head (may temporarily diverge from header_head when syncing a long fork). + pub fn sync_block_headers( + &self, + headers: &[BlockHeader], + sync_head: Tip, + opts: Options, + ) -> Result, Error> { let mut header_pmmr = self.header_pmmr.write(); let mut txhashset = self.txhashset.write(); + let batch = self.store.batch()?; // Sync the chunk of block headers, updating header_head if total work increases. - { - let batch = self.store.batch()?; - let mut ctx = self.new_ctx(opts, batch, &mut header_pmmr, &mut txhashset)?; - pipe::process_block_headers(headers, &mut ctx)?; - ctx.batch.commit()?; - } + let mut ctx = self.new_ctx(opts, batch, &mut header_pmmr, &mut txhashset)?; + let sync_head = pipe::process_block_headers(headers, sync_head, &mut ctx)?; + ctx.batch.commit()?; - Ok(()) + Ok(sync_head) } /// Build a new block processing context. @@ -1410,12 +1414,20 @@ impl Chain { } /// Gets multiple headers at the provided heights. - pub fn get_locator_hashes(&self, heights: &[u64]) -> Result, Error> { - let pmmr = self.header_pmmr.read(); - heights - .iter() - .map(|h| pmmr.get_header_hash_by_height(*h)) - .collect() + /// Note: This is based on the provided sync_head to support syncing against a fork. + pub fn get_locator_hashes(&self, sync_head: Tip, heights: &[u64]) -> Result, Error> { + let mut header_pmmr = self.header_pmmr.write(); + txhashset::header_extending_readonly(&mut header_pmmr, &self.store(), |ext, batch| { + let header = batch.get_block_header(&sync_head.hash())?; + pipe::rewind_and_apply_header_fork(&header, ext, batch)?; + + let hashes = heights + .iter() + .filter_map(|h| ext.get_header_hash_by_height(*h)) + .collect(); + + Ok(hashes) + }) } /// Builds an iterator on blocks starting from the current chain head and diff --git a/chain/src/pipe.rs b/chain/src/pipe.rs index b87a2783d..689e4104b 100644 --- a/chain/src/pipe.rs +++ b/chain/src/pipe.rs @@ -176,48 +176,52 @@ pub fn process_block( /// Process a batch of sequential block headers. /// This is only used during header sync. +/// Will update header_head locally if this batch of headers increases total work. +/// Returns the updated sync_head, which may be on a fork. pub fn process_block_headers( headers: &[BlockHeader], + sync_head: Tip, ctx: &mut BlockContext<'_>, -) -> Result<(), Error> { +) -> Result, Error> { if headers.is_empty() { - return Ok(()); + return Ok(None); } let last_header = headers.last().expect("last header"); - // Check if we know about all these headers. If so we can accept them quickly. - // If they *do not* increase total work on the sync chain we are done. - // If they *do* increase total work then we should process them to update sync_head. - let head = { - let hash = ctx.header_pmmr.head_hash()?; - let header = ctx.batch.get_block_header(&hash)?; - Tip::from_header(&header) - }; - - if let Ok(existing) = ctx.batch.get_block_header(&last_header.hash()) { - if !has_more_work(&existing, &head) { - return Ok(()); - } - } + let head = ctx.batch.header_head()?; // Validate each header in the chunk and add to our db. // Note: This batch may be rolled back later if the MMR does not validate successfully. + // Note: This batch may later be committed even if the MMR itself is rollbacked. for header in headers { validate_header(header, ctx)?; add_block_header(header, &ctx.batch)?; } - // Now apply this entire chunk of headers to the sync MMR (ctx is sync MMR specific). + // Now apply this entire chunk of headers to the header MMR. txhashset::header_extending(&mut ctx.header_pmmr, &mut ctx.batch, |ext, batch| { rewind_and_apply_header_fork(&last_header, ext, batch)?; - Ok(()) - })?; - if has_more_work(last_header, &head) { - update_header_head(&Tip::from_header(last_header), &mut ctx.batch)?; - } + // If previous sync_head is not on the "current" chain then + // these headers are on an alternative fork to sync_head. + let alt_fork = !ext.is_on_current_chain(sync_head, batch)?; - Ok(()) + // Update our "header_head" if this batch results in an increase in total work. + // Otherwise rollback this header extension. + // Note the outer batch may still be committed to db assuming no errors occur in the extension. + if has_more_work(last_header, &head) { + let header_head = last_header.into(); + update_header_head(&header_head, &batch)?; + } else { + ext.force_rollback(); + }; + + if alt_fork || has_more_work(last_header, &sync_head) { + Ok(Some(last_header.into())) + } else { + Ok(None) + } + }) } /// Process a block header. Update the header MMR and corresponding header_head if this header @@ -500,7 +504,7 @@ fn add_block_header(bh: &BlockHeader, batch: &store::Batch<'_>) -> Result<(), Er Ok(()) } -fn update_header_head(head: &Tip, batch: &mut store::Batch<'_>) -> Result<(), Error> { +fn update_header_head(head: &Tip, batch: &store::Batch<'_>) -> Result<(), Error> { batch .save_header_head(&head) .map_err(|e| ErrorKind::StoreErr(e, "pipe save header head".to_owned()))?; @@ -513,7 +517,7 @@ fn update_header_head(head: &Tip, batch: &mut store::Batch<'_>) -> Result<(), Er Ok(()) } -fn update_head(head: &Tip, batch: &mut store::Batch<'_>) -> Result<(), Error> { +fn update_head(head: &Tip, batch: &store::Batch<'_>) -> Result<(), Error> { batch .save_body_head(&head) .map_err(|e| ErrorKind::StoreErr(e, "pipe save body".to_owned()))?; @@ -536,7 +540,7 @@ pub fn rewind_and_apply_header_fork( ) -> Result<(), Error> { let mut fork_hashes = vec![]; let mut current = header.clone(); - while current.height > 0 && ext.is_on_current_chain(¤t, batch).is_err() { + while current.height > 0 && !ext.is_on_current_chain(¤t, batch)? { fork_hashes.push(current.hash()); current = batch.get_previous_header(¤t)?; } @@ -577,11 +581,7 @@ pub fn rewind_and_apply_fork( // Rewind the txhashset extension back to common ancestor based on header MMR. let mut current = batch.head_header()?; - while current.height > 0 - && header_extension - .is_on_current_chain(¤t, batch) - .is_err() - { + while current.height > 0 && !header_extension.is_on_current_chain(¤t, batch)? { current = batch.get_previous_header(¤t)?; } let fork_point = current; diff --git a/chain/src/txhashset/txhashset.rs b/chain/src/txhashset/txhashset.rs index f4893ef8c..3358a8dc8 100644 --- a/chain/src/txhashset/txhashset.rs +++ b/chain/src/txhashset/txhashset.rs @@ -915,6 +915,13 @@ impl<'a> HeaderExtension<'a> { self.head.clone() } + /// Get header hash by height. + /// Based on current header MMR. + pub fn get_header_hash_by_height(&self, height: u64) -> Option { + let pos = pmmr::insertion_to_pmmr_index(height + 1); + self.get_header_hash(pos) + } + /// Get the header at the specified height based on the current state of the header extension. /// Derives the MMR pos from the height (insertion index) and retrieves the header hash. /// Looks the header up in the db by hash. @@ -923,8 +930,7 @@ impl<'a> HeaderExtension<'a> { height: u64, batch: &Batch<'_>, ) -> Result { - let pos = pmmr::insertion_to_pmmr_index(height + 1); - if let Some(hash) = self.get_header_hash(pos) { + if let Some(hash) = self.get_header_hash_by_height(height) { Ok(batch.get_block_header(&hash)?) } else { Err(ErrorKind::Other("get header by height".to_string()).into()) @@ -933,20 +939,17 @@ impl<'a> HeaderExtension<'a> { /// Compares the provided header to the header in the header MMR at that height. /// If these match we know the header is on the current chain. - pub fn is_on_current_chain( + pub fn is_on_current_chain>( &self, - header: &BlockHeader, + t: T, batch: &Batch<'_>, - ) -> Result<(), Error> { - if header.height > self.head.height { - return Err(ErrorKind::Other("not on current chain, out beyond".to_string()).into()); - } - let chain_header = self.get_header_by_height(header.height, batch)?; - if chain_header.hash() == header.hash() { - Ok(()) - } else { - Err(ErrorKind::Other("not on current chain".to_string()).into()) + ) -> Result { + let t = t.into(); + if t.height > self.head.height { + return Ok(false); } + let chain_header = self.get_header_by_height(t.height, batch)?; + Ok(chain_header.hash() == t.hash()) } /// Force the rollback of this extension, no matter the result. diff --git a/chain/src/types.rs b/chain/src/types.rs index 4dc4900a2..1c0331dc3 100644 --- a/chain/src/types.rs +++ b/chain/src/types.rs @@ -38,7 +38,7 @@ bitflags! { } /// Various status sync can be in, whether it's fast sync or archival. -#[derive(Debug, Clone, Copy, Eq, PartialEq, Deserialize, Serialize)] +#[derive(Debug, Clone, Copy, PartialEq, Deserialize, Serialize)] pub enum SyncStatus { /// Initial State (we do not yet know if we are/should be syncing) Initial, @@ -49,10 +49,12 @@ pub enum SyncStatus { AwaitingPeers(bool), /// Downloading block headers HeaderSync { - /// current node height - current_height: u64, + /// current sync head + sync_head: Tip, /// height of the most advanced peer highest_height: u64, + /// diff of the most advanced peer + highest_diff: Difficulty, }, /// Downloading the various txhashsets TxHashsetDownload(TxHashsetDownloadStats), @@ -176,6 +178,17 @@ impl SyncState { } } + /// Update sync_head if state is currently HeaderSync. + pub fn update_header_sync(&self, new_sync_head: Tip) { + let status: &mut SyncStatus = &mut self.current.write(); + match status { + SyncStatus::HeaderSync { sync_head, .. } => { + *sync_head = new_sync_head; + } + _ => (), + } + } + /// Update txhashset downloading progress pub fn update_txhashset_download(&self, stats: TxHashsetDownloadStats) { *self.current.write() = SyncStatus::TxHashsetDownload(stats); @@ -346,12 +359,7 @@ pub struct Tip { impl Tip { /// Creates a new tip based on provided header. pub fn from_header(header: &BlockHeader) -> Tip { - Tip { - height: header.height, - last_block_h: header.hash(), - prev_block_h: header.prev_hash, - total_difficulty: header.total_difficulty(), - } + header.into() } } @@ -372,6 +380,16 @@ impl Default for Tip { } } } +impl From<&BlockHeader> for Tip { + fn from(header: &BlockHeader) -> Tip { + Tip { + height: header.height, + last_block_h: header.hash(), + prev_block_h: header.prev_hash, + total_difficulty: header.total_difficulty(), + } + } +} /// Serialization of a tip, required to save to datastore. impl ser::Writeable for Tip { diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index d0b008a9f..7e749be8f 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -321,9 +321,28 @@ where return Ok(false); } - // try to add headers to our header chain - match self.chain().sync_block_headers(bhs, chain::Options::SYNC) { - Ok(_) => Ok(true), + // Read our sync_head if we are in header_sync. + // If not then we can ignore this batch of headers. + let sync_head = match self.sync_state.status() { + SyncStatus::HeaderSync { sync_head, .. } => sync_head, + _ => { + debug!("headers_received: ignoring as not in header_sync"); + return Ok(true); + } + }; + + match self + .chain() + .sync_block_headers(bhs, sync_head, chain::Options::SYNC) + { + Ok(sync_head) => { + // If we have an updated sync_head after processing this batch of headers + // then update our sync_state so we can request relevant headers in the next batch. + if let Some(sync_head) = sync_head { + self.sync_state.update_header_sync(sync_head); + } + Ok(true) + } Err(e) => { debug!("Block headers refused by chain: {:?}", e); if e.is_bad_data() { diff --git a/servers/src/grin/sync/header_sync.rs b/servers/src/grin/sync/header_sync.rs index ad27dffae..ed2cc1ffc 100644 --- a/servers/src/grin/sync/header_sync.rs +++ b/servers/src/grin/sync/header_sync.rs @@ -47,16 +47,9 @@ impl HeaderSync { } } - pub fn check_run( - &mut self, - header_head: &chain::Tip, - highest_height: u64, - ) -> Result { - if !self.header_sync_due(header_head) { - return Ok(false); - } - - let enable_header_sync = match self.sync_state.status() { + pub fn check_run(&mut self, sync_head: chain::Tip) -> Result { + // We only want to run header_sync for some sync states. + let do_run = match self.sync_state.status() { SyncStatus::BodySync { .. } | SyncStatus::HeaderSync { .. } | SyncStatus::TxHashsetDone @@ -66,19 +59,41 @@ impl HeaderSync { _ => false, }; - if enable_header_sync { + if !do_run { + return Ok(false); + } + + // TODO - can we safely reuse the peer here across multiple runs? + let sync_peer = self.choose_sync_peer(); + + if let Some(sync_peer) = sync_peer { + let (peer_height, peer_diff) = { + let info = sync_peer.info.live_info.read(); + (info.height, info.total_difficulty) + }; + + // Quick check - nothing to sync if we are caught up with the peer. + if peer_diff <= sync_head.total_difficulty { + return Ok(false); + } + + if !self.header_sync_due(sync_head) { + return Ok(false); + } + self.sync_state.update(SyncStatus::HeaderSync { - current_height: header_head.height, - highest_height: highest_height, + sync_head, + highest_height: peer_height, + highest_diff: peer_diff, }); - self.syncing_peer = self.header_sync(); - return Ok(true); + self.header_sync(sync_head, sync_peer.clone()); + self.syncing_peer = Some(sync_peer.clone()); } - Ok(false) + Ok(true) } - fn header_sync_due(&mut self, header_head: &chain::Tip) -> bool { + fn header_sync_due(&mut self, header_head: chain::Tip) -> bool { let now = Utc::now(); let (timeout, latest_height, prev_height) = self.prev_header_sync; @@ -151,53 +166,47 @@ impl HeaderSync { } } - fn header_sync(&mut self) -> Option> { - if let Ok(header_head) = self.chain.header_head() { - let peers_iter = || { - self.peers - .iter() - .with_capabilities(Capabilities::HEADER_HIST) - .connected() - }; + fn choose_sync_peer(&self) -> Option> { + let peers_iter = || { + self.peers + .iter() + .with_capabilities(Capabilities::HEADER_HIST) + .connected() + }; - // Filter peers further based on max difficulty. - let max_diff = peers_iter().max_difficulty().unwrap_or(Difficulty::zero()); - let peers_iter = || peers_iter().with_difficulty(|x| x >= max_diff); + // Filter peers further based on max difficulty. + let max_diff = peers_iter().max_difficulty().unwrap_or(Difficulty::zero()); + let peers_iter = || peers_iter().with_difficulty(|x| x >= max_diff); - // Choose a random "most work" peer, preferring outbound if at all possible. - let peer = peers_iter().outbound().choose_random().or_else(|| { - warn!("no suitable outbound peer for header sync, considering inbound"); - peers_iter().inbound().choose_random() - }); + // Choose a random "most work" peer, preferring outbound if at all possible. + peers_iter().outbound().choose_random().or_else(|| { + warn!("no suitable outbound peer for header sync, considering inbound"); + peers_iter().inbound().choose_random() + }) + } - if let Some(peer) = peer { - if peer.info.total_difficulty() > header_head.total_difficulty { - return self.request_headers(peer); - } - } + fn header_sync(&self, sync_head: chain::Tip, peer: Arc) { + if peer.info.total_difficulty() > sync_head.total_difficulty { + self.request_headers(sync_head, peer); } - return None; } /// Request some block headers from a peer to advance us. - fn request_headers(&mut self, peer: Arc) -> Option> { - if let Ok(locator) = self.get_locator() { + fn request_headers(&self, sync_head: chain::Tip, peer: Arc) { + if let Ok(locator) = self.get_locator(sync_head) { debug!( "sync: request_headers: asking {} for headers, {:?}", peer.info.addr, locator, ); let _ = peer.send_header_request(locator); - return Some(peer); } - return None; } /// Build a locator based on header_head. - fn get_locator(&mut self) -> Result, Error> { - let tip = self.chain.header_head()?; - let heights = get_locator_heights(tip.height); - let locator = self.chain.get_locator_hashes(&heights)?; + fn get_locator(&self, sync_head: chain::Tip) -> Result, Error> { + let heights = get_locator_heights(sync_head.height); + let locator = self.chain.get_locator_hashes(sync_head, &heights)?; Ok(locator) } } diff --git a/servers/src/grin/sync/state_sync.rs b/servers/src/grin/sync/state_sync.rs index 9883cb51c..3511d2a26 100644 --- a/servers/src/grin/sync/state_sync.rs +++ b/servers/src/grin/sync/state_sync.rs @@ -133,19 +133,6 @@ impl StateSync { .set_sync_error(chain::ErrorKind::SyncError(format!("{:?}", e)).into()), } - // to avoid the confusing log, - // update the final HeaderSync state mainly for 'current_height' - self.sync_state.update_if( - SyncStatus::HeaderSync { - current_height: header_head.height, - highest_height, - }, - |s| match s { - SyncStatus::HeaderSync { .. } => true, - _ => false, - }, - ); - self.sync_state .update(SyncStatus::TxHashsetDownload(Default::default())); } diff --git a/servers/src/grin/sync/syncer.rs b/servers/src/grin/sync/syncer.rs index 19c797e57..a351297ce 100644 --- a/servers/src/grin/sync/syncer.rs +++ b/servers/src/grin/sync/syncer.rs @@ -192,9 +192,18 @@ impl SyncRunner { let tail = self.chain.tail().unwrap_or_else(|_| head.clone()); let header_head = unwrap_or_restart_loop!(self.chain.header_head()); + // "sync_head" allows us to sync against a large fork on the header chain + // we track this during an extended header sync + let sync_status = self.sync_state.status(); + + let sync_head = match sync_status { + SyncStatus::HeaderSync { sync_head, .. } => sync_head, + _ => header_head, + }; + // run each sync stage, each of them deciding whether they're needed // except for state sync that only runs if body sync return true (means txhashset is needed) - unwrap_or_restart_loop!(header_sync.check_run(&header_head, highest_height)); + unwrap_or_restart_loop!(header_sync.check_run(sync_head)); let mut check_state_sync = false; match self.sync_state.status() { @@ -206,7 +215,7 @@ impl SyncRunner { | SyncStatus::TxHashsetDone => check_state_sync = true, _ => { // skip body sync if header chain is not synced. - if header_head.height < highest_height { + if sync_head.height < highest_height { continue; } diff --git a/src/bin/tui/status.rs b/src/bin/tui/status.rs index 4d18cf9e0..0dead0731 100644 --- a/src/bin/tui/status.rs +++ b/src/bin/tui/status.rs @@ -39,13 +39,14 @@ impl TUIStatusView { SyncStatus::NoSync => Cow::Borrowed("Running"), SyncStatus::AwaitingPeers(_) => Cow::Borrowed("Waiting for peers"), SyncStatus::HeaderSync { - current_height, + sync_head, highest_height, + .. } => { let percent = if highest_height == 0 { 0 } else { - current_height * 100 / highest_height + sync_head.height * 100 / highest_height }; Cow::Owned(format!("Sync step 1/7: Downloading headers: {}%", percent)) }