From 5630cf2e10ef788fe055f7b7151c3af5593b527b Mon Sep 17 00:00:00 2001 From: Yeastplume Date: Thu, 17 Feb 2022 14:10:56 +0000 Subject: [PATCH] [PIBD_IMPL] PIBD Stats + Retry on validation errors (#3694) * start to add stats and reset chain state after errors detected * add functions to reset prune list when resetting chain pibd state * debug statement * remove test function --- api/src/handlers/chain_api.rs | 2 +- chain/src/chain.rs | 48 ++++++++++++++++++++++++----- chain/src/txhashset/desegmenter.rs | 31 +++++++++++++------ chain/src/txhashset/txhashset.rs | 6 ++++ chain/src/types.rs | 23 ++++++++++++++ core/src/core/pmmr/backend.rs | 3 ++ core/src/core/pmmr/pmmr.rs | 5 +++ core/src/core/pmmr/vec_backend.rs | 4 +++ servers/src/common/adapters.rs | 2 +- servers/src/grin/sync/state_sync.rs | 39 +++++++++++++++++++++-- src/bin/tui/status.rs | 17 ++++++++-- store/src/pmmr.rs | 8 +++++ 12 files changed, 164 insertions(+), 24 deletions(-) diff --git a/api/src/handlers/chain_api.rs b/api/src/handlers/chain_api.rs index 65b5edf96..edcf3c38c 100644 --- a/api/src/handlers/chain_api.rs +++ b/api/src/handlers/chain_api.rs @@ -81,7 +81,7 @@ impl ChainResetHandler { pub fn reset_chain_head(&self, hash: Hash) -> Result<(), Error> { let chain = w(&self.chain)?; let header = chain.get_block_header(&hash)?; - chain.reset_chain_head(&header)?; + chain.reset_chain_head(&header, true)?; // Reset the sync status and clear out any sync error. w(&self.sync_state)?.reset(); diff --git a/chain/src/chain.rs b/chain/src/chain.rs index 0be58f106..96d46ce92 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -227,7 +227,11 @@ impl Chain { /// Reset both head and header_head to the provided header. /// Handles simple rewind and more complex fork scenarios. /// Used by the reset_chain_head owner api endpoint. - pub fn reset_chain_head>(&self, head: T) -> Result<(), Error> { + pub fn reset_chain_head>( + &self, + head: T, + rewind_headers: bool, + ) -> Result<(), Error> { let head = head.into(); let mut header_pmmr = self.header_pmmr.write(); @@ -248,19 +252,42 @@ impl Chain { }, )?; - // If the rewind of full blocks was successful then we can rewind the header MMR. - // Rewind and reapply headers to reset the header MMR. - txhashset::header_extending(&mut header_pmmr, &mut batch, |ext, batch| { - self.rewind_and_apply_header_fork(&header, ext, batch)?; - batch.save_header_head(&head)?; - Ok(()) - })?; + if rewind_headers { + // If the rewind of full blocks was successful then we can rewind the header MMR. + // Rewind and reapply headers to reset the header MMR. + txhashset::header_extending(&mut header_pmmr, &mut batch, |ext, batch| { + self.rewind_and_apply_header_fork(&header, ext, batch)?; + batch.save_header_head(&head)?; + Ok(()) + })?; + } batch.commit()?; Ok(()) } + /// Reset prune lists (when PIBD resets) + pub fn reset_prune_lists(&self) -> Result<(), Error> { + let mut header_pmmr = self.header_pmmr.write(); + let mut txhashset = self.txhashset.write(); + let mut batch = self.store.batch()?; + + txhashset::extending(&mut header_pmmr, &mut txhashset, &mut batch, |ext, _| { + let extension = &mut ext.extension; + extension.reset_prune_lists(); + Ok(()) + })?; + Ok(()) + } + + /// Reset PIBD head + pub fn reset_pibd_head(&self) -> Result<(), Error> { + let batch = self.store.batch()?; + batch.save_pibd_head(&self.genesis().into())?; + Ok(()) + } + /// Are we running with archive_mode enabled? pub fn archive_mode(&self) -> bool { self.archive_mode @@ -276,6 +303,11 @@ impl Chain { self.txhashset.clone() } + /// return genesis header + pub fn genesis(&self) -> BlockHeader { + self.genesis.clone() + } + /// Shared store instance. pub fn store(&self) -> Arc { self.store.clone() diff --git a/chain/src/txhashset/desegmenter.rs b/chain/src/txhashset/desegmenter.rs index e758bce06..43454ed86 100644 --- a/chain/src/txhashset/desegmenter.rs +++ b/chain/src/txhashset/desegmenter.rs @@ -110,6 +110,20 @@ impl Desegmenter { retval } + /// Reset all state + pub fn reset(&mut self) { + self.all_segments_complete = false; + self.bitmap_segment_cache = vec![]; + self.output_segment_cache = vec![]; + self.rangeproof_segment_cache = vec![]; + self.kernel_segment_cache = vec![]; + self.bitmap_mmr_leaf_count = 0; + self.bitmap_mmr_size = 0; + self.bitmap_cache = None; + self.bitmap_accumulator = BitmapAccumulator::new(); + self.calc_bitmap_mmr_sizes(); + } + /// Return reference to the header used for validation pub fn header(&self) -> &BlockHeader { &self.archive_header @@ -225,6 +239,12 @@ impl Desegmenter { let batch = store.batch().unwrap(); batch.save_pibd_head(&tip).unwrap(); batch.commit().unwrap(); + status.update_pibd_progress( + false, + false, + latest_block_height, + header_head.height, + ); if h == header_head { // get out of this loop and move on to validation break; @@ -240,10 +260,10 @@ impl Desegmenter { header_pmmr, &header_head, genesis, - status, + status.clone(), ) { error!("Error validating pibd hashset: {}", e); - // TODO: Set state appropriately, state sync can rewind and start again, etc + status.update_pibd_progress(false, true, latest_block_height, header_head.height); } stop_state.stop(); } @@ -267,13 +287,6 @@ impl Desegmenter { txhashset.roots().validate(header_head)?; } - //debug!("desegmenter validation: compacting"); - /*{ - let mut txhashset = txhashset.write(); - let batch = store.batch()?; - txhashset.compact(header_head, &batch)?; - }*/ - status.on_setup(); // Validate kernel history diff --git a/chain/src/txhashset/txhashset.rs b/chain/src/txhashset/txhashset.rs index c097d5b02..a9ad4a630 100644 --- a/chain/src/txhashset/txhashset.rs +++ b/chain/src/txhashset/txhashset.rs @@ -1201,6 +1201,12 @@ impl<'a> Extension<'a> { self.rproof_pmmr.readonly_pmmr() } + /// Reset prune lists + pub fn reset_prune_lists(&mut self) { + self.output_pmmr.reset_prune_list(); + self.rproof_pmmr.reset_prune_list(); + } + /// Apply a new block to the current txhashet extension (output, rangeproof, kernel MMRs). /// Returns a vec of commit_pos representing the pos and height of the outputs spent /// by this block. diff --git a/chain/src/types.rs b/chain/src/types.rs index e829e79d8..88b954db9 100644 --- a/chain/src/types.rs +++ b/chain/src/types.rs @@ -63,6 +63,13 @@ pub enum SyncStatus { /// Whether the syncer has determined there's not enough /// data to continue via PIBD aborted: bool, + /// whether we got an error anywhere (in which case restart the process) + errored: bool, + /// 'height', i.e. last 'block' for which there is complete + /// pmmr data + completed_to_height: u64, + /// Total 'height' needed + required_height: u64, }, /// Downloading the various txhashsets TxHashsetDownload(TxHashsetDownloadStats), @@ -217,6 +224,22 @@ impl SyncState { *self.current.write() = SyncStatus::TxHashsetDownload(stats); } + /// Update PIBD progress + pub fn update_pibd_progress( + &self, + aborted: bool, + errored: bool, + completed_to_height: u64, + required_height: u64, + ) { + *self.current.write() = SyncStatus::TxHashsetPibd { + aborted, + errored, + completed_to_height, + required_height, + }; + } + /// Update PIBD segment list pub fn add_pibd_segment(&self, id: &SegmentTypeIdentifier) { self.requested_pibd_segments.write().push(id.clone()); diff --git a/core/src/core/pmmr/backend.rs b/core/src/core/pmmr/backend.rs index 2d413a897..871ce4950 100644 --- a/core/src/core/pmmr/backend.rs +++ b/core/src/core/pmmr/backend.rs @@ -84,6 +84,9 @@ pub trait Backend { /// Release underlying datafiles and locks fn release_files(&mut self); + /// Reset prune list, used when PIBD is reset + fn reset_prune_list(&mut self); + /// Saves a snapshot of the rewound utxo file with the block hash as /// filename suffix. We need this when sending a txhashset zip file to a /// node for fast sync. diff --git a/core/src/core/pmmr/pmmr.rs b/core/src/core/pmmr/pmmr.rs index bba2721d9..1f486bfbf 100644 --- a/core/src/core/pmmr/pmmr.rs +++ b/core/src/core/pmmr/pmmr.rs @@ -275,6 +275,11 @@ where Ok(()) } + /// Reset prune list + pub fn reset_prune_list(&mut self) { + self.backend.reset_prune_list(); + } + /// Remove the specified position from the leaf set pub fn remove_from_leaf_set(&mut self, pos0: u64) { self.backend.remove_from_leaf_set(pos0); diff --git a/core/src/core/pmmr/vec_backend.rs b/core/src/core/pmmr/vec_backend.rs index 6accb767c..658169742 100644 --- a/core/src/core/pmmr/vec_backend.rs +++ b/core/src/core/pmmr/vec_backend.rs @@ -119,6 +119,10 @@ impl Backend for VecBackend { unimplemented!() } + fn reset_prune_list(&mut self) { + unimplemented!() + } + fn rewind(&mut self, position: u64, _rewind_rm_pos: &Bitmap) -> Result<(), String> { if let Some(data) = &mut self.data { let idx = pmmr::n_leaves(position); diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index 775d9a46a..dcf48aa3f 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -595,7 +595,7 @@ where { let res = d.add_bitmap_segment(segment, output_root); if let Err(e) = res { - debug!( + error!( "Validation of incoming bitmap segment failed: {:?}, reason: {}", identifier, e ); diff --git a/servers/src/grin/sync/state_sync.rs b/servers/src/grin/sync/state_sync.rs index fab8fb26a..75c0a74bf 100644 --- a/servers/src/grin/sync/state_sync.rs +++ b/servers/src/grin/sync/state_sync.rs @@ -89,6 +89,34 @@ impl StateSync { } }; + // Check whether we've errored and should restart pibd + if using_pibd { + if let SyncStatus::TxHashsetPibd { errored: true, .. } = self.sync_state.status() { + let archive_header = self.chain.txhashset_archive_header_header_only().unwrap(); + error!("PIBD Reported Failure - Restarting Sync"); + // reset desegmenter state + let desegmenter = self + .chain + .desegmenter(&archive_header, self.sync_state.clone()) + .unwrap(); + + if let Some(d) = desegmenter.write().as_mut() { + d.reset(); + }; + if let Err(e) = self.chain.reset_chain_head(self.chain.genesis(), false) { + error!("pibd_sync restart: chain reset error = {}", e); + } + if let Err(e) = self.chain.reset_pibd_head() { + error!("pibd_sync restart: reset pibd_head error = {}", e); + } + if let Err(e) = self.chain.reset_prune_lists() { + error!("pibd_sync restart: reset prune lists error = {}", e); + } + self.sync_state.update_pibd_progress(false, false, 1, 1); + sync_need_restart = true; + } + } + // check peer connection status of this sync if !using_pibd { if let Some(ref peer) = self.state_sync_peer { @@ -129,11 +157,14 @@ impl StateSync { // run fast sync if applicable, normally only run one-time, except restart in error if sync_need_restart || header_head.height == highest_height { if using_pibd { + if sync_need_restart { + return true; + } let (launch, _download_timeout) = self.state_sync_due(); if launch { - self.sync_state - .update(SyncStatus::TxHashsetPibd { aborted: false }); let archive_header = self.chain.txhashset_archive_header_header_only().unwrap(); + self.sync_state + .update_pibd_progress(false, false, 1, archive_header.height); let desegmenter = self .chain .desegmenter(&archive_header, self.sync_state.clone()) @@ -195,7 +226,9 @@ impl StateSync { if let Some(d) = de.as_mut() { let res = d.apply_next_segments(); if let Err(e) = res { - debug!("error applying segment, continuing: {}", e); + debug!("error applying segment: {}", e); + self.sync_state.update_pibd_progress(false, true, 1, 1); + return false; } } } diff --git a/src/bin/tui/status.rs b/src/bin/tui/status.rs index e096220cd..75022667f 100644 --- a/src/bin/tui/status.rs +++ b/src/bin/tui/status.rs @@ -50,8 +50,21 @@ impl TUIStatusView { }; Cow::Owned(format!("Sync step 1/7: Downloading headers: {}%", percent)) } - SyncStatus::TxHashsetPibd { .. } => { - Cow::Borrowed("Sync step 2/7: Performing PIBD Body Sync (experimental)") + SyncStatus::TxHashsetPibd { + aborted: _, + errored: _, + completed_to_height, + required_height, + } => { + let percent = if required_height == 0 { + 0 + } else { + completed_to_height * 100 / required_height + }; + Cow::Owned(format!( + "Sync step 2/7: Downloading chain state - {} / {} Blocks - {}%", + completed_to_height, required_height, percent + )) } SyncStatus::TxHashsetDownload(stat) => { if stat.total_size > 0 { diff --git a/store/src/pmmr.rs b/store/src/pmmr.rs index 101754d28..d69e9bb48 100644 --- a/store/src/pmmr.rs +++ b/store/src/pmmr.rs @@ -229,6 +229,14 @@ impl Backend for PMMRBackend { Ok(()) } + fn reset_prune_list(&mut self) { + let bitmap = Bitmap::create(); + self.prune_list = PruneList::new(Some(self.data_dir.join(PMMR_PRUN_FILE)), bitmap); + if let Err(e) = self.prune_list.flush() { + error!("Flushing reset prune list: {}", e); + } + } + /// Remove by insertion position. fn remove(&mut self, pos0: u64) -> Result<(), String> { assert!(self.prunable, "Remove on non-prunable MMR");