[PIBD_IMPL] PIBD Stats + Retry on validation errors (#3694)

* start to add stats and reset chain state after errors detected

* add functions to reset prune list when resetting chain pibd state

* debug statement

* remove test function
This commit is contained in:
Yeastplume 2022-02-17 14:10:56 +00:00 committed by GitHub
parent 3ea233d5eb
commit 5630cf2e10
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 164 additions and 24 deletions

View file

@ -81,7 +81,7 @@ impl ChainResetHandler {
pub fn reset_chain_head(&self, hash: Hash) -> Result<(), Error> {
let chain = w(&self.chain)?;
let header = chain.get_block_header(&hash)?;
chain.reset_chain_head(&header)?;
chain.reset_chain_head(&header, true)?;
// Reset the sync status and clear out any sync error.
w(&self.sync_state)?.reset();

View file

@ -227,7 +227,11 @@ impl Chain {
/// Reset both head and header_head to the provided header.
/// Handles simple rewind and more complex fork scenarios.
/// Used by the reset_chain_head owner api endpoint.
pub fn reset_chain_head<T: Into<Tip>>(&self, head: T) -> Result<(), Error> {
pub fn reset_chain_head<T: Into<Tip>>(
&self,
head: T,
rewind_headers: bool,
) -> Result<(), Error> {
let head = head.into();
let mut header_pmmr = self.header_pmmr.write();
@ -248,19 +252,42 @@ impl Chain {
},
)?;
// If the rewind of full blocks was successful then we can rewind the header MMR.
// Rewind and reapply headers to reset the header MMR.
txhashset::header_extending(&mut header_pmmr, &mut batch, |ext, batch| {
self.rewind_and_apply_header_fork(&header, ext, batch)?;
batch.save_header_head(&head)?;
Ok(())
})?;
if rewind_headers {
// If the rewind of full blocks was successful then we can rewind the header MMR.
// Rewind and reapply headers to reset the header MMR.
txhashset::header_extending(&mut header_pmmr, &mut batch, |ext, batch| {
self.rewind_and_apply_header_fork(&header, ext, batch)?;
batch.save_header_head(&head)?;
Ok(())
})?;
}
batch.commit()?;
Ok(())
}
/// Reset prune lists (when PIBD resets)
pub fn reset_prune_lists(&self) -> Result<(), Error> {
let mut header_pmmr = self.header_pmmr.write();
let mut txhashset = self.txhashset.write();
let mut batch = self.store.batch()?;
txhashset::extending(&mut header_pmmr, &mut txhashset, &mut batch, |ext, _| {
let extension = &mut ext.extension;
extension.reset_prune_lists();
Ok(())
})?;
Ok(())
}
/// Reset PIBD head
pub fn reset_pibd_head(&self) -> Result<(), Error> {
let batch = self.store.batch()?;
batch.save_pibd_head(&self.genesis().into())?;
Ok(())
}
/// Are we running with archive_mode enabled?
pub fn archive_mode(&self) -> bool {
self.archive_mode
@ -276,6 +303,11 @@ impl Chain {
self.txhashset.clone()
}
/// return genesis header
pub fn genesis(&self) -> BlockHeader {
self.genesis.clone()
}
/// Shared store instance.
pub fn store(&self) -> Arc<store::ChainStore> {
self.store.clone()

View file

@ -110,6 +110,20 @@ impl Desegmenter {
retval
}
/// Reset all state
pub fn reset(&mut self) {
self.all_segments_complete = false;
self.bitmap_segment_cache = vec![];
self.output_segment_cache = vec![];
self.rangeproof_segment_cache = vec![];
self.kernel_segment_cache = vec![];
self.bitmap_mmr_leaf_count = 0;
self.bitmap_mmr_size = 0;
self.bitmap_cache = None;
self.bitmap_accumulator = BitmapAccumulator::new();
self.calc_bitmap_mmr_sizes();
}
/// Return reference to the header used for validation
pub fn header(&self) -> &BlockHeader {
&self.archive_header
@ -225,6 +239,12 @@ impl Desegmenter {
let batch = store.batch().unwrap();
batch.save_pibd_head(&tip).unwrap();
batch.commit().unwrap();
status.update_pibd_progress(
false,
false,
latest_block_height,
header_head.height,
);
if h == header_head {
// get out of this loop and move on to validation
break;
@ -240,10 +260,10 @@ impl Desegmenter {
header_pmmr,
&header_head,
genesis,
status,
status.clone(),
) {
error!("Error validating pibd hashset: {}", e);
// TODO: Set state appropriately, state sync can rewind and start again, etc
status.update_pibd_progress(false, true, latest_block_height, header_head.height);
}
stop_state.stop();
}
@ -267,13 +287,6 @@ impl Desegmenter {
txhashset.roots().validate(header_head)?;
}
//debug!("desegmenter validation: compacting");
/*{
let mut txhashset = txhashset.write();
let batch = store.batch()?;
txhashset.compact(header_head, &batch)?;
}*/
status.on_setup();
// Validate kernel history

View file

@ -1201,6 +1201,12 @@ impl<'a> Extension<'a> {
self.rproof_pmmr.readonly_pmmr()
}
/// Reset prune lists
pub fn reset_prune_lists(&mut self) {
self.output_pmmr.reset_prune_list();
self.rproof_pmmr.reset_prune_list();
}
/// Apply a new block to the current txhashet extension (output, rangeproof, kernel MMRs).
/// Returns a vec of commit_pos representing the pos and height of the outputs spent
/// by this block.

View file

@ -63,6 +63,13 @@ pub enum SyncStatus {
/// Whether the syncer has determined there's not enough
/// data to continue via PIBD
aborted: bool,
/// whether we got an error anywhere (in which case restart the process)
errored: bool,
/// 'height', i.e. last 'block' for which there is complete
/// pmmr data
completed_to_height: u64,
/// Total 'height' needed
required_height: u64,
},
/// Downloading the various txhashsets
TxHashsetDownload(TxHashsetDownloadStats),
@ -217,6 +224,22 @@ impl SyncState {
*self.current.write() = SyncStatus::TxHashsetDownload(stats);
}
/// Update PIBD progress
pub fn update_pibd_progress(
&self,
aborted: bool,
errored: bool,
completed_to_height: u64,
required_height: u64,
) {
*self.current.write() = SyncStatus::TxHashsetPibd {
aborted,
errored,
completed_to_height,
required_height,
};
}
/// Update PIBD segment list
pub fn add_pibd_segment(&self, id: &SegmentTypeIdentifier) {
self.requested_pibd_segments.write().push(id.clone());

View file

@ -84,6 +84,9 @@ pub trait Backend<T: PMMRable> {
/// Release underlying datafiles and locks
fn release_files(&mut self);
/// Reset prune list, used when PIBD is reset
fn reset_prune_list(&mut self);
/// Saves a snapshot of the rewound utxo file with the block hash as
/// filename suffix. We need this when sending a txhashset zip file to a
/// node for fast sync.

View file

@ -275,6 +275,11 @@ where
Ok(())
}
/// Reset prune list
pub fn reset_prune_list(&mut self) {
self.backend.reset_prune_list();
}
/// Remove the specified position from the leaf set
pub fn remove_from_leaf_set(&mut self, pos0: u64) {
self.backend.remove_from_leaf_set(pos0);

View file

@ -119,6 +119,10 @@ impl<T: PMMRable> Backend<T> for VecBackend<T> {
unimplemented!()
}
fn reset_prune_list(&mut self) {
unimplemented!()
}
fn rewind(&mut self, position: u64, _rewind_rm_pos: &Bitmap) -> Result<(), String> {
if let Some(data) = &mut self.data {
let idx = pmmr::n_leaves(position);

View file

@ -595,7 +595,7 @@ where
{
let res = d.add_bitmap_segment(segment, output_root);
if let Err(e) = res {
debug!(
error!(
"Validation of incoming bitmap segment failed: {:?}, reason: {}",
identifier, e
);

View file

@ -89,6 +89,34 @@ impl StateSync {
}
};
// Check whether we've errored and should restart pibd
if using_pibd {
if let SyncStatus::TxHashsetPibd { errored: true, .. } = self.sync_state.status() {
let archive_header = self.chain.txhashset_archive_header_header_only().unwrap();
error!("PIBD Reported Failure - Restarting Sync");
// reset desegmenter state
let desegmenter = self
.chain
.desegmenter(&archive_header, self.sync_state.clone())
.unwrap();
if let Some(d) = desegmenter.write().as_mut() {
d.reset();
};
if let Err(e) = self.chain.reset_chain_head(self.chain.genesis(), false) {
error!("pibd_sync restart: chain reset error = {}", e);
}
if let Err(e) = self.chain.reset_pibd_head() {
error!("pibd_sync restart: reset pibd_head error = {}", e);
}
if let Err(e) = self.chain.reset_prune_lists() {
error!("pibd_sync restart: reset prune lists error = {}", e);
}
self.sync_state.update_pibd_progress(false, false, 1, 1);
sync_need_restart = true;
}
}
// check peer connection status of this sync
if !using_pibd {
if let Some(ref peer) = self.state_sync_peer {
@ -129,11 +157,14 @@ impl StateSync {
// run fast sync if applicable, normally only run one-time, except restart in error
if sync_need_restart || header_head.height == highest_height {
if using_pibd {
if sync_need_restart {
return true;
}
let (launch, _download_timeout) = self.state_sync_due();
if launch {
self.sync_state
.update(SyncStatus::TxHashsetPibd { aborted: false });
let archive_header = self.chain.txhashset_archive_header_header_only().unwrap();
self.sync_state
.update_pibd_progress(false, false, 1, archive_header.height);
let desegmenter = self
.chain
.desegmenter(&archive_header, self.sync_state.clone())
@ -195,7 +226,9 @@ impl StateSync {
if let Some(d) = de.as_mut() {
let res = d.apply_next_segments();
if let Err(e) = res {
debug!("error applying segment, continuing: {}", e);
debug!("error applying segment: {}", e);
self.sync_state.update_pibd_progress(false, true, 1, 1);
return false;
}
}
}

View file

@ -50,8 +50,21 @@ impl TUIStatusView {
};
Cow::Owned(format!("Sync step 1/7: Downloading headers: {}%", percent))
}
SyncStatus::TxHashsetPibd { .. } => {
Cow::Borrowed("Sync step 2/7: Performing PIBD Body Sync (experimental)")
SyncStatus::TxHashsetPibd {
aborted: _,
errored: _,
completed_to_height,
required_height,
} => {
let percent = if required_height == 0 {
0
} else {
completed_to_height * 100 / required_height
};
Cow::Owned(format!(
"Sync step 2/7: Downloading chain state - {} / {} Blocks - {}%",
completed_to_height, required_height, percent
))
}
SyncStatus::TxHashsetDownload(stat) => {
if stat.total_size > 0 {

View file

@ -229,6 +229,14 @@ impl<T: PMMRable> Backend<T> for PMMRBackend<T> {
Ok(())
}
fn reset_prune_list(&mut self) {
let bitmap = Bitmap::create();
self.prune_list = PruneList::new(Some(self.data_dir.join(PMMR_PRUN_FILE)), bitmap);
if let Err(e) = self.prune_list.flush() {
error!("Flushing reset prune list: {}", e);
}
}
/// Remove by insertion position.
fn remove(&mut self, pos0: u64) -> Result<(), String> {
assert!(self.prunable, "Remove on non-prunable MMR");