From 21b1ac50d702960d26686f18a2f178ad5591b40b Mon Sep 17 00:00:00 2001 From: Yeastplume Date: Tue, 1 Mar 2022 13:52:16 +0000 Subject: [PATCH] [PIBD_IMPL] Thread simplification + More TUI Updates + Stop State Propagation (#3698) * change pibd stat display to show progress as a percentage of downloaded leaves * attempt some inline rp validation * propagate shutdown state through kernel validation * change validation loop timing * simplify validator threading * add more detailed tracking of kernel history validation to tui, allow stop state during * adding more stop state + tui progress indication * remove progressive validate * test fix --- chain/src/chain.rs | 34 ++-- chain/src/txhashset/desegmenter.rs | 274 +++++++++++++--------------- chain/src/txhashset/txhashset.rs | 99 ++++++++-- chain/src/types.rs | 92 ++++++++-- chain/tests/test_pibd_copy.rs | 5 +- core/src/core/pmmr/backend.rs | 3 + core/src/core/pmmr/pmmr.rs | 7 + core/src/core/pmmr/readonly_pmmr.rs | 4 + core/src/core/pmmr/vec_backend.rs | 4 + servers/src/common/adapters.rs | 28 +-- servers/src/grin/sync/state_sync.rs | 61 ++++--- servers/src/grin/sync/syncer.rs | 10 +- src/bin/tui/status.rs | 41 ++++- store/src/leaf_set.rs | 5 + store/src/pmmr.rs | 8 + 15 files changed, 428 insertions(+), 247 deletions(-) diff --git a/chain/src/chain.rs b/chain/src/chain.rs index 96d46ce92..ff8bbaa13 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -37,7 +37,6 @@ use crate::{ core::core::hash::{Hash, Hashed}, store::Batch, txhashset::{ExtensionPair, HeaderExtension}, - SyncState, }; use grin_store::Error::NotFoundErr; use std::collections::HashMap; @@ -698,8 +697,15 @@ impl Chain { // ensure the view is consistent. txhashset::extending_readonly(&mut header_pmmr, &mut txhashset, |ext, batch| { self.rewind_and_apply_fork(&header, ext, batch)?; - ext.extension - .validate(&self.genesis, fast_validation, &NoStatus, &header)?; + ext.extension.validate( + &self.genesis, + fast_validation, + &NoStatus, + None, + None, + &header, + None, + )?; Ok(()) }) } @@ -903,7 +909,6 @@ impl Chain { pub fn desegmenter( &self, archive_header: &BlockHeader, - sync_state: Arc, ) -> Result>>, Error> { // Use our cached desegmenter if we have one and the associated header matches. if let Some(d) = self.pibd_desegmenter.write().as_ref() { @@ -911,14 +916,10 @@ impl Chain { return Ok(self.pibd_desegmenter.clone()); } } - // If no desegmenter or headers don't match init - // Stop previous thread if running - if let Some(d) = self.pibd_desegmenter.read().as_ref() { - d.stop_validation_thread(); - } + // TODO: (Check whether we can do this.. we *should* be able to modify this as the desegmenter // is in flight and we cross a horizon boundary, but needs more thinking) - let desegmenter = self.init_desegmenter(archive_header, sync_state)?; + let desegmenter = self.init_desegmenter(archive_header)?; let mut cache = self.pibd_desegmenter.write(); *cache = Some(desegmenter.clone()); @@ -928,11 +929,7 @@ impl Chain { /// initialize a desegmenter, which is capable of extending the hashset by appending /// PIBD segments of the three PMMR trees + Bitmap PMMR /// header should be the same header as selected for the txhashset.zip archive - fn init_desegmenter( - &self, - header: &BlockHeader, - sync_state: Arc, - ) -> Result { + fn init_desegmenter(&self, header: &BlockHeader) -> Result { debug!( "init_desegmenter: initializing new desegmenter for {} at {}", header.hash(), @@ -945,7 +942,6 @@ impl Chain { header.clone(), self.genesis.clone(), self.store.clone(), - sync_state, )) } @@ -1086,7 +1082,7 @@ impl Chain { txhashset_data: File, status: &dyn TxHashsetWriteStatus, ) -> Result { - status.on_setup(); + status.on_setup(None, None, None, None); // Initial check whether this txhashset is needed or not let fork_point = self.fork_point()?; @@ -1126,7 +1122,7 @@ impl Chain { let header_pmmr = self.header_pmmr.read(); let batch = self.store.batch()?; - txhashset.verify_kernel_pos_index(&self.genesis, &header_pmmr, &batch)?; + txhashset.verify_kernel_pos_index(&self.genesis, &header_pmmr, &batch, None, None)?; } // all good, prepare a new batch and update all the required records @@ -1145,7 +1141,7 @@ impl Chain { // Validate the extension, generating the utxo_sum and kernel_sum. // Full validation, including rangeproofs and kernel signature verification. let (utxo_sum, kernel_sum) = - extension.validate(&self.genesis, false, status, &header)?; + extension.validate(&self.genesis, false, status, None, None, &header, None)?; // Save the block_sums (utxo_sum, kernel_sum) to the db for use later. batch.save_block_sums( diff --git a/chain/src/txhashset/desegmenter.rs b/chain/src/txhashset/desegmenter.rs index 837170103..9a4b5dfd4 100644 --- a/chain/src/txhashset/desegmenter.rs +++ b/chain/src/txhashset/desegmenter.rs @@ -16,8 +16,6 @@ //! segmenter use std::sync::Arc; -use std::thread; -use std::time::Duration; use crate::core::core::hash::{Hash, Hashed}; use crate::core::core::{pmmr, pmmr::ReadablePMMR}; @@ -44,12 +42,9 @@ pub struct Desegmenter { header_pmmr: Arc>>, archive_header: BlockHeader, store: Arc, - sync_state: Arc, genesis: BlockHeader, - validator_stop_state: Arc, - default_bitmap_segment_height: u8, default_output_segment_height: u8, default_rangeproof_segment_height: u8, @@ -78,7 +73,6 @@ impl Desegmenter { archive_header: BlockHeader, genesis: BlockHeader, store: Arc, - sync_state: Arc, ) -> Desegmenter { trace!("Creating new desegmenter"); let mut retval = Desegmenter { @@ -86,9 +80,7 @@ impl Desegmenter { header_pmmr, archive_header, store, - sync_state, genesis, - validator_stop_state: Arc::new(StopState::new()), bitmap_accumulator: BitmapAccumulator::new(), default_bitmap_segment_height: 9, default_output_segment_height: 11, @@ -139,133 +131,94 @@ impl Desegmenter { self.all_segments_complete } - /// Launch a separate validation thread, which will update and validate the body head - /// as we go - pub fn launch_validation_thread(&self) { - let stop_state = self.validator_stop_state.clone(); - let txhashset = self.txhashset.clone(); - let header_pmmr = self.header_pmmr.clone(); - let store = self.store.clone(); - let genesis = self.genesis.clone(); - let status = self.sync_state.clone(); - let desegmenter = Arc::new(RwLock::new(self.clone())); - let _ = thread::Builder::new() - .name("pibd-validation".to_string()) - .spawn(move || { - Desegmenter::validation_loop( - stop_state, - txhashset, - store, - desegmenter, - header_pmmr, - genesis, - status, - ); - }); - } - - /// Stop the validation loop - pub fn stop_validation_thread(&self) { - self.validator_stop_state.stop(); - } - - /// Validation loop - fn validation_loop( - stop_state: Arc, - txhashset: Arc>, - store: Arc, - desegmenter: Arc>, - header_pmmr: Arc>>, - genesis: BlockHeader, - status: Arc, - ) { + /// Check progress, update status if needed, returns true if all required + /// segments are in place + pub fn check_progress(&self, status: Arc) -> bool { let mut latest_block_height = 0; - let header_head = { desegmenter.read().header().clone() }; - loop { - if stop_state.is_stopped() { - break; - } - thread::sleep(Duration::from_millis(5000)); - trace!("In Desegmenter Validation Loop"); - let local_output_mmr_size; - let local_kernel_mmr_size; - let local_rangeproof_mmr_size; + let local_output_mmr_size; + let local_kernel_mmr_size; + let local_rangeproof_mmr_size; + { + let txhashset = self.txhashset.read(); + local_output_mmr_size = txhashset.output_mmr_size(); + local_kernel_mmr_size = txhashset.kernel_mmr_size(); + local_rangeproof_mmr_size = txhashset.rangeproof_mmr_size(); + } + + // going to try presenting PIBD progress as total leaves downloaded + // total segments probably doesn't make much sense since the segment + // sizes will be able to change over time, and representative block height + // can be too lopsided if one pmmr completes faster, so perhaps just + // use total leaves downloaded and display as a percentage + let completed_leaves = pmmr::n_leaves(local_output_mmr_size) + + pmmr::n_leaves(local_rangeproof_mmr_size) + + pmmr::n_leaves(local_kernel_mmr_size); + + // Find latest 'complete' header. + // First take lesser of rangeproof and output mmr sizes + let latest_output_size = std::cmp::min(local_output_mmr_size, local_rangeproof_mmr_size); + + // Find first header in which 'output_mmr_size' and 'kernel_mmr_size' are greater than + // given sizes + + let res = { + let header_pmmr = self.header_pmmr.read(); + header_pmmr.get_first_header_with( + latest_output_size, + local_kernel_mmr_size, + latest_block_height, + self.store.clone(), + ) + }; + + if let Some(h) = res { + latest_block_height = h.height; + + // TODO: Unwraps + let tip = Tip::from_header(&h); + let batch = self.store.batch().unwrap(); + batch.save_pibd_head(&tip).unwrap(); + batch.commit().unwrap(); + + status.update_pibd_progress( + false, + false, + completed_leaves, + latest_block_height, + &self.archive_header, + ); + if local_kernel_mmr_size == self.archive_header.kernel_mmr_size + && local_output_mmr_size == self.archive_header.output_mmr_size + && local_rangeproof_mmr_size == self.archive_header.output_mmr_size { - let txhashset = txhashset.read(); - local_output_mmr_size = txhashset.output_mmr_size(); - local_kernel_mmr_size = txhashset.kernel_mmr_size(); - local_rangeproof_mmr_size = txhashset.rangeproof_mmr_size(); - } - - trace!( - "Desegmenter Validation: Output MMR Size: {}", - local_output_mmr_size - ); - trace!( - "Desegmenter Validation: Rangeproof MMR Size: {}", - local_rangeproof_mmr_size - ); - trace!( - "Desegmenter Validation: Kernel MMR Size: {}", - local_kernel_mmr_size - ); - - // Find latest 'complete' header. - // First take lesser of rangeproof and output mmr sizes - let latest_output_size = - std::cmp::min(local_output_mmr_size, local_rangeproof_mmr_size); - // Find first header in which 'output_mmr_size' and 'kernel_mmr_size' are greater than - // given sizes - - { - let header_pmmr = header_pmmr.read(); - let res = header_pmmr.get_first_header_with( - latest_output_size, - local_kernel_mmr_size, - latest_block_height, - store.clone(), - ); - if let Some(h) = res { - latest_block_height = h.height; - debug!( - "PIBD Desegmenter Validation Loop: PMMRs complete up to block {}: {:?}", - h.height, h - ); - // TODO: 'In-flight' validation. At the moment the entire tree - // will be presented for validation after all segments are downloaded - // TODO: Unwraps - let tip = Tip::from_header(&h); - let batch = store.batch().unwrap(); - batch.save_pibd_head(&tip).unwrap(); - batch.commit().unwrap(); - status.update_pibd_progress( - false, - false, - latest_block_height, - header_head.height, - ); - if h == header_head { - // get out of this loop and move on to validation - break; - } - } + // All is complete + return true; } } - // If all done, kick off validation, setting error state if necessary - if let Err(e) = Desegmenter::validate_complete_state( + false + + /*if let Err(e) = Desegmenter::validate_complete_state( txhashset, store, header_pmmr, &header_head, genesis, + last_validated_rangeproof_pos, status.clone(), + stop_state.clone(), ) { error!("Error validating pibd hashset: {}", e); - status.update_pibd_progress(false, true, latest_block_height, header_head.height); + status.update_pibd_progress( + false, + true, + completed_leaves, + latest_block_height, + &header_head, + ); } - stop_state.stop(); + stop_state.stop();*/ } /// TODO: This is largely copied from chain.rs txhashset_write and related functions, @@ -273,34 +226,39 @@ impl Desegmenter { /// segments are still being downloaded and applied. Current validation logic is all tied up /// around unzipping, so re-developing this logic separate from the txhashset version /// will to allow this to happen more cleanly - fn validate_complete_state( - txhashset: Arc>, - store: Arc, - header_pmmr: Arc>>, - header_head: &BlockHeader, - genesis: BlockHeader, + pub fn validate_complete_state( + &self, status: Arc, + stop_state: Arc, ) -> Result<(), Error> { // Quick root check first: { - let txhashset = txhashset.read(); - txhashset.roots().validate(header_head)?; + let txhashset = self.txhashset.read(); + txhashset.roots().validate(&self.archive_header)?; } - status.on_setup(); + // TODO: Keep track of this in the DB so we can pick up where we left off if needed + let last_rangeproof_validation_pos = 0; // Validate kernel history { debug!("desegmenter validation: rewinding and validating kernel history (readonly)"); - let txhashset = txhashset.read(); + let txhashset = self.txhashset.read(); let mut count = 0; - let mut current = header_head.clone(); + let mut current = self.archive_header.clone(); + let total = current.height; txhashset::rewindable_kernel_view(&txhashset, |view, batch| { while current.height > 0 { view.rewind(¤t)?; view.validate_root()?; current = batch.get_previous_header(¤t)?; count += 1; + if current.height % 100000 == 0 || current.height == total { + status.on_setup(Some(total - current.height), Some(total), None, None); + } + if stop_state.is_stopped() { + return Ok(()); + } } Ok(()) })?; @@ -310,38 +268,64 @@ impl Desegmenter { ); } + if stop_state.is_stopped() { + return Ok(()); + } + // Check kernel MMR root for every block header. // Check NRD relative height rules for full kernel history. { - let txhashset = txhashset.read(); - let header_pmmr = header_pmmr.read(); - let batch = store.batch()?; - txhashset.verify_kernel_pos_index(&genesis, &header_pmmr, &batch)?; + let txhashset = self.txhashset.read(); + let header_pmmr = self.header_pmmr.read(); + let batch = self.store.batch()?; + txhashset.verify_kernel_pos_index( + &self.genesis, + &header_pmmr, + &batch, + Some(status.clone()), + Some(stop_state.clone()), + )?; } + if stop_state.is_stopped() { + return Ok(()); + } + + status.on_setup(None, None, None, None); // Prepare a new batch and update all the required records { debug!("desegmenter validation: rewinding a 2nd time (writeable)"); - let mut txhashset = txhashset.write(); - let mut header_pmmr = header_pmmr.write(); - let mut batch = store.batch()?; + let mut txhashset = self.txhashset.write(); + let mut header_pmmr = self.header_pmmr.write(); + let mut batch = self.store.batch()?; txhashset::extending( &mut header_pmmr, &mut txhashset, &mut batch, |ext, batch| { let extension = &mut ext.extension; - extension.rewind(&header_head, batch)?; + extension.rewind(&self.archive_header, batch)?; // Validate the extension, generating the utxo_sum and kernel_sum. // Full validation, including rangeproofs and kernel signature verification. - let (utxo_sum, kernel_sum) = - extension.validate(&genesis, false, &*status, &header_head)?; + let (utxo_sum, kernel_sum) = extension.validate( + &self.genesis, + false, + &*status, + Some(last_rangeproof_validation_pos), + None, + &self.archive_header, + Some(stop_state.clone()), + )?; + + if stop_state.is_stopped() { + return Ok(()); + } // Save the block_sums (utxo_sum, kernel_sum) to the db for use later. batch.save_block_sums( - &header_head.hash(), + &self.archive_header.hash(), BlockSums { utxo_sum, kernel_sum, @@ -352,12 +336,16 @@ impl Desegmenter { }, )?; + if stop_state.is_stopped() { + return Ok(()); + } + debug!("desegmenter_validation: finished validating and rebuilding"); status.on_save(); { // Save the new head to the db and rebuild the header by height index. - let tip = Tip::from_header(&header_head); + let tip = Tip::from_header(&self.archive_header); // TODO: Throw error batch.save_body_head(&tip)?; diff --git a/chain/src/txhashset/txhashset.rs b/chain/src/txhashset/txhashset.rs index a9ad4a630..00316ddcb 100644 --- a/chain/src/txhashset/txhashset.rs +++ b/chain/src/txhashset/txhashset.rs @@ -34,7 +34,8 @@ use crate::txhashset::bitmap_accumulator::{BitmapAccumulator, BitmapChunk}; use crate::txhashset::{RewindableKernelView, UTXOView}; use crate::types::{CommitPos, OutputRoots, Tip, TxHashSetRoots, TxHashsetWriteStatus}; use crate::util::secp::pedersen::{Commitment, RangeProof}; -use crate::util::{file, secp_static, zip}; +use crate::util::{file, secp_static, zip, StopState}; +use crate::SyncState; use croaring::Bitmap; use grin_store::pmmr::{clean_files_by_prefix, PMMRBackend}; use std::cmp::Ordering; @@ -541,7 +542,7 @@ impl TxHashSet { let cutoff = head.height.saturating_sub(WEEK_HEIGHT * 2); let cutoff_hash = header_pmmr.get_header_hash_by_height(cutoff)?; let cutoff_header = batch.get_block_header(&cutoff_hash)?; - self.verify_kernel_pos_index(&cutoff_header, header_pmmr, batch) + self.verify_kernel_pos_index(&cutoff_header, header_pmmr, batch, None, None) } /// Verify and (re)build the NRD kernel_pos index from the provided header onwards. @@ -550,6 +551,8 @@ impl TxHashSet { from_header: &BlockHeader, header_pmmr: &PMMRHandle, batch: &Batch<'_>, + status: Option>, + stop_state: Option>, ) -> Result<(), Error> { if !global::is_nrd_enabled() { return Ok(()); @@ -578,6 +581,8 @@ impl TxHashSet { let mut current_pos = prev_size + 1; let mut current_header = from_header.clone(); let mut count = 0; + let total = pmmr::n_leaves(self.kernel_pmmr_h.size); + let mut applied = 0; while current_pos <= self.kernel_pmmr_h.size { if pmmr::is_leaf(current_pos - 1) { if let Some(kernel) = kernel_pmmr.get_data(current_pos - 1) { @@ -598,7 +603,19 @@ impl TxHashSet { _ => {} } } + applied += 1; + if let Some(ref s) = status { + if total % applied == 10000 { + s.on_setup(None, None, Some(applied), Some(total)); + } + } } + if let Some(ref s) = stop_state { + if s.is_stopped() { + return Ok(()); + } + } + current_pos += 1; } @@ -1799,7 +1816,10 @@ impl<'a> Extension<'a> { genesis: &BlockHeader, fast_validation: bool, status: &dyn TxHashsetWriteStatus, + output_start_pos: Option, + _kernel_start_pos: Option, header: &BlockHeader, + stop_state: Option>, ) -> Result<(Commitment, Commitment), Error> { self.validate_mmrs()?; self.validate_roots(header)?; @@ -1817,10 +1837,26 @@ impl<'a> Extension<'a> { // These are expensive verification step (skipped for "fast validation"). if !fast_validation { // Verify the rangeproof associated with each unspent output. - self.verify_rangeproofs(status)?; + self.verify_rangeproofs( + Some(status), + output_start_pos, + None, + false, + stop_state.clone(), + )?; + if let Some(ref s) = stop_state { + if s.is_stopped() { + return Err(ErrorKind::Stopped.into()); + } + } // Verify all the kernel signatures. - self.verify_kernel_signatures(status)?; + self.verify_kernel_signatures(status, stop_state.clone())?; + if let Some(ref s) = stop_state { + if s.is_stopped() { + return Err(ErrorKind::Stopped.into()); + } + } } Ok((output_sum, kernel_sum)) @@ -1863,7 +1899,11 @@ impl<'a> Extension<'a> { ) } - fn verify_kernel_signatures(&self, status: &dyn TxHashsetWriteStatus) -> Result<(), Error> { + fn verify_kernel_signatures( + &self, + status: &dyn TxHashsetWriteStatus, + stop_state: Option>, + ) -> Result<(), Error> { let now = Instant::now(); const KERNEL_BATCH_SIZE: usize = 5_000; @@ -1884,6 +1924,11 @@ impl<'a> Extension<'a> { kern_count += tx_kernels.len() as u64; tx_kernels.clear(); status.on_validation_kernels(kern_count, total_kernels); + if let Some(ref s) = stop_state { + if s.is_stopped() { + return Ok(()); + } + } debug!( "txhashset: verify_kernel_signatures: verified {} signatures", kern_count, @@ -1901,16 +1946,36 @@ impl<'a> Extension<'a> { Ok(()) } - fn verify_rangeproofs(&self, status: &dyn TxHashsetWriteStatus) -> Result<(), Error> { + fn verify_rangeproofs( + &self, + status: Option<&dyn TxHashsetWriteStatus>, + start_pos: Option, + batch_size: Option, + single_iter: bool, + stop_state: Option>, + ) -> Result { let now = Instant::now(); - let mut commits: Vec = Vec::with_capacity(1_000); - let mut proofs: Vec = Vec::with_capacity(1_000); + let batch_size = batch_size.unwrap_or(1_000); + + let mut commits: Vec = Vec::with_capacity(batch_size); + let mut proofs: Vec = Vec::with_capacity(batch_size); let mut proof_count = 0; + if let Some(s) = start_pos { + if let Some(i) = pmmr::pmmr_leaf_to_insertion_index(s) { + proof_count = self.output_pmmr.n_unpruned_leaves_to_index(i) as usize; + } + } + let total_rproofs = self.output_pmmr.n_unpruned_leaves(); for pos0 in self.output_pmmr.leaf_pos_iter() { + if let Some(p) = start_pos { + if pos0 < p { + continue; + } + } let output = self.output_pmmr.get_data(pos0); let proof = self.rproof_pmmr.get_data(pos0); @@ -1927,7 +1992,7 @@ impl<'a> Extension<'a> { proof_count += 1; - if proofs.len() >= 1_000 { + if proofs.len() >= batch_size { Output::batch_verify_proofs(&commits, &proofs)?; commits.clear(); proofs.clear(); @@ -1935,13 +2000,21 @@ impl<'a> Extension<'a> { "txhashset: verify_rangeproofs: verified {} rangeproofs", proof_count, ); - if proof_count % 1_000 == 0 { - status.on_validation_rproofs(proof_count, total_rproofs); + if let Some(s) = status { + s.on_validation_rproofs(proof_count as u64, total_rproofs); + } + if let Some(ref s) = stop_state { + if s.is_stopped() { + return Ok(pos0); + } + } + if single_iter { + return Ok(pos0); } } } - // remaining part which not full of 1000 range proofs + // remaining part which not full of batch_size range proofs if !proofs.is_empty() { Output::batch_verify_proofs(&commits, &proofs)?; commits.clear(); @@ -1958,7 +2031,7 @@ impl<'a> Extension<'a> { self.rproof_pmmr.unpruned_size(), now.elapsed().as_secs(), ); - Ok(()) + Ok(0) } } diff --git a/chain/src/types.rs b/chain/src/types.rs index 88b954db9..0a56b04bc 100644 --- a/chain/src/types.rs +++ b/chain/src/types.rs @@ -17,7 +17,7 @@ use chrono::prelude::{DateTime, Utc}; use crate::core::core::hash::{Hash, Hashed, ZERO_HASH}; -use crate::core::core::{Block, BlockHeader, HeaderVersion, SegmentTypeIdentifier}; +use crate::core::core::{pmmr, Block, BlockHeader, HeaderVersion, SegmentTypeIdentifier}; use crate::core::pow::Difficulty; use crate::core::ser::{self, PMMRIndexHashable, Readable, Reader, Writeable, Writer}; use crate::error::{Error, ErrorKind}; @@ -65,6 +65,10 @@ pub enum SyncStatus { aborted: bool, /// whether we got an error anywhere (in which case restart the process) errored: bool, + /// total number of leaves applied + completed_leaves: u64, + /// total number of leaves required by archive header + leaves_required: u64, /// 'height', i.e. last 'block' for which there is complete /// pmmr data completed_to_height: u64, @@ -74,7 +78,16 @@ pub enum SyncStatus { /// Downloading the various txhashsets TxHashsetDownload(TxHashsetDownloadStats), /// Setting up before validation - TxHashsetSetup, + TxHashsetSetup { + /// number of 'headers' for which kernels have been checked + headers: Option, + /// headers total + headers_total: Option, + /// kernel position portion + kernel_pos: Option, + /// total kernel position + kernel_pos_total: Option, + }, /// Validating the kernels TxHashsetKernelsValidation { /// kernels validated @@ -134,6 +147,25 @@ impl Default for TxHashsetDownloadStats { } } +/// Container for entry in requested PIBD segments +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct PIBDSegmentContainer { + /// Segment+Type Identifier + pub identifier: SegmentTypeIdentifier, + /// Time at which this request was made + pub request_time: DateTime, +} + +impl PIBDSegmentContainer { + /// Return container with timestamp + pub fn new(identifier: SegmentTypeIdentifier) -> Self { + Self { + identifier, + request_time: Utc::now(), + } + } +} + /// Current sync state. Encapsulates the current SyncStatus. pub struct SyncState { current: RwLock, @@ -145,7 +177,7 @@ pub struct SyncState { /// available where it will be needed (both in the adapter /// and the sync loop) /// TODO: Better struct for this, perhaps hash identifiers - requested_pibd_segments: RwLock>, + requested_pibd_segments: RwLock>, } impl SyncState { @@ -229,30 +261,49 @@ impl SyncState { &self, aborted: bool, errored: bool, + completed_leaves: u64, completed_to_height: u64, - required_height: u64, + archive_header: &BlockHeader, ) { + let leaves_required = pmmr::n_leaves(archive_header.output_mmr_size) * 2 + + pmmr::n_leaves(archive_header.kernel_mmr_size); *self.current.write() = SyncStatus::TxHashsetPibd { aborted, errored, + completed_leaves, + leaves_required, completed_to_height, - required_height, + required_height: archive_header.height, }; } /// Update PIBD segment list pub fn add_pibd_segment(&self, id: &SegmentTypeIdentifier) { - self.requested_pibd_segments.write().push(id.clone()); + self.requested_pibd_segments + .write() + .push(PIBDSegmentContainer::new(id.clone())); } /// Remove segment from list pub fn remove_pibd_segment(&self, id: &SegmentTypeIdentifier) { - self.requested_pibd_segments.write().retain(|i| i != id); + self.requested_pibd_segments + .write() + .retain(|i| &i.identifier != id); + } + + /// Remove segments with request timestamps less cutoff time + pub fn remove_stale_pibd_requests(&self, cutoff_time: DateTime) { + self.requested_pibd_segments + .write() + .retain(|i| i.request_time < cutoff_time); } /// Check whether segment is in request list pub fn contains_pibd_segment(&self, id: &SegmentTypeIdentifier) -> bool { - self.requested_pibd_segments.read().contains(id) + self.requested_pibd_segments + .read() + .iter() + .any(|i| &i.identifier == id) } /// Communicate sync error @@ -272,8 +323,19 @@ impl SyncState { } impl TxHashsetWriteStatus for SyncState { - fn on_setup(&self) { - self.update(SyncStatus::TxHashsetSetup); + fn on_setup( + &self, + headers: Option, + headers_total: Option, + kernel_pos: Option, + kernel_pos_total: Option, + ) { + self.update(SyncStatus::TxHashsetSetup { + headers, + headers_total, + kernel_pos, + kernel_pos_total, + }); } fn on_validation_kernels(&self, kernels: u64, kernels_total: u64) { @@ -500,7 +562,13 @@ pub trait ChainAdapter { /// those values as the processing progresses. pub trait TxHashsetWriteStatus { /// First setup of the txhashset - fn on_setup(&self); + fn on_setup( + &self, + headers: Option, + header_total: Option, + kernel_pos: Option, + kernel_pos_total: Option, + ); /// Starting kernel validation fn on_validation_kernels(&self, kernels: u64, kernel_total: u64); /// Starting rproof validation @@ -515,7 +583,7 @@ pub trait TxHashsetWriteStatus { pub struct NoStatus; impl TxHashsetWriteStatus for NoStatus { - fn on_setup(&self) {} + fn on_setup(&self, _hs: Option, _ht: Option, _kp: Option, _kpt: Option) {} fn on_validation_kernels(&self, _ks: u64, _kts: u64) {} fn on_validation_rproofs(&self, _rs: u64, _rt: u64) {} fn on_save(&self) {} diff --git a/chain/tests/test_pibd_copy.rs b/chain/tests/test_pibd_copy.rs index ee7893646..255f56cdd 100644 --- a/chain/tests/test_pibd_copy.rs +++ b/chain/tests/test_pibd_copy.rs @@ -24,7 +24,7 @@ use std::sync::Arc; use std::{fs, io}; use crate::chain::txhashset::BitmapChunk; -use crate::chain::types::{NoopAdapter, Options, SyncState}; +use crate::chain::types::{NoopAdapter, Options}; use crate::core::core::{ hash::{Hash, Hashed}, pmmr::segment::{Segment, SegmentIdentifier, SegmentType}, @@ -177,9 +177,8 @@ impl DesegmenterRequestor { // Emulate `continue_pibd` function, which would be called from state sync // return whether is complete pub fn continue_pibd(&mut self) -> bool { - let state = Arc::new(SyncState::new()); let archive_header = self.chain.txhashset_archive_header_header_only().unwrap(); - let desegmenter = self.chain.desegmenter(&archive_header, state).unwrap(); + let desegmenter = self.chain.desegmenter(&archive_header).unwrap(); // Apply segments... TODO: figure out how this should be called, might // need to be a separate thread. diff --git a/core/src/core/pmmr/backend.rs b/core/src/core/pmmr/backend.rs index 871ce4950..0383c2709 100644 --- a/core/src/core/pmmr/backend.rs +++ b/core/src/core/pmmr/backend.rs @@ -68,6 +68,9 @@ pub trait Backend { /// Number of leaves fn n_unpruned_leaves(&self) -> u64; + /// Number of leaves up to the given leaf index + fn n_unpruned_leaves_to_index(&self, to_index: u64) -> u64; + /// Iterator over current (unpruned, unremoved) leaf insertion index. /// Note: This differs from underlying MMR pos - [0, 1, 2, 3, 4] vs. [1, 2, 4, 5, 8]. fn leaf_idx_iter(&self, from_idx: u64) -> Box + '_>; diff --git a/core/src/core/pmmr/pmmr.rs b/core/src/core/pmmr/pmmr.rs index 1f486bfbf..4d503bcb2 100644 --- a/core/src/core/pmmr/pmmr.rs +++ b/core/src/core/pmmr/pmmr.rs @@ -58,6 +58,9 @@ pub trait ReadablePMMR { /// Number of leaves in the MMR fn n_unpruned_leaves(&self) -> u64; + /// Number of leaves in the MMR up to index + fn n_unpruned_leaves_to_index(&self, to_index: u64) -> u64; + /// Is the MMR empty? fn is_empty(&self) -> bool { self.unpruned_size() == 0 @@ -485,6 +488,10 @@ where fn n_unpruned_leaves(&self) -> u64 { self.backend.n_unpruned_leaves() } + + fn n_unpruned_leaves_to_index(&self, to_index: u64) -> u64 { + self.backend.n_unpruned_leaves_to_index(to_index) + } } /// 64 bits all ones: 0b11111111...1 diff --git a/core/src/core/pmmr/readonly_pmmr.rs b/core/src/core/pmmr/readonly_pmmr.rs index 9ecc92186..0696813b7 100644 --- a/core/src/core/pmmr/readonly_pmmr.rs +++ b/core/src/core/pmmr/readonly_pmmr.rs @@ -175,4 +175,8 @@ where fn n_unpruned_leaves(&self) -> u64 { self.backend.n_unpruned_leaves() } + + fn n_unpruned_leaves_to_index(&self, to_index: u64) -> u64 { + self.backend.n_unpruned_leaves_to_index(to_index) + } } diff --git a/core/src/core/pmmr/vec_backend.rs b/core/src/core/pmmr/vec_backend.rs index 658169742..ee4629d0a 100644 --- a/core/src/core/pmmr/vec_backend.rs +++ b/core/src/core/pmmr/vec_backend.rs @@ -90,6 +90,10 @@ impl Backend for VecBackend { unimplemented!() } + fn n_unpruned_leaves_to_index(&self, _to_index: u64) -> u64 { + unimplemented!() + } + fn leaf_pos_iter(&self) -> Box + '_> { Box::new( self.hashes diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index 34d98eb2e..72378d62b 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -582,12 +582,7 @@ where let archive_header = self.chain().txhashset_archive_header_header_only()?; let identifier = segment.identifier().clone(); let mut retval = Ok(true); - if let Some(d) = self - .chain() - .desegmenter(&archive_header, self.sync_state.clone())? - .write() - .as_mut() - { + if let Some(d) = self.chain().desegmenter(&archive_header)?.write().as_mut() { let res = d.add_bitmap_segment(segment, output_root); if let Err(e) = res { error!( @@ -620,12 +615,7 @@ where let archive_header = self.chain().txhashset_archive_header_header_only()?; let identifier = segment.identifier().clone(); let mut retval = Ok(true); - if let Some(d) = self - .chain() - .desegmenter(&archive_header, self.sync_state.clone())? - .write() - .as_mut() - { + if let Some(d) = self.chain().desegmenter(&archive_header)?.write().as_mut() { let res = d.add_output_segment(segment, Some(bitmap_root)); if let Err(e) = res { error!( @@ -656,12 +646,7 @@ where let archive_header = self.chain().txhashset_archive_header_header_only()?; let identifier = segment.identifier().clone(); let mut retval = Ok(true); - if let Some(d) = self - .chain() - .desegmenter(&archive_header, self.sync_state.clone())? - .write() - .as_mut() - { + if let Some(d) = self.chain().desegmenter(&archive_header)?.write().as_mut() { let res = d.add_rangeproof_segment(segment); if let Err(e) = res { error!( @@ -692,12 +677,7 @@ where let archive_header = self.chain().txhashset_archive_header_header_only()?; let identifier = segment.identifier().clone(); let mut retval = Ok(true); - if let Some(d) = self - .chain() - .desegmenter(&archive_header, self.sync_state.clone())? - .write() - .as_mut() - { + if let Some(d) = self.chain().desegmenter(&archive_header)?.write().as_mut() { let res = d.add_kernel_segment(segment); if let Err(e) = res { error!( diff --git a/servers/src/grin/sync/state_sync.rs b/servers/src/grin/sync/state_sync.rs index 73e3cd905..424272e3d 100644 --- a/servers/src/grin/sync/state_sync.rs +++ b/servers/src/grin/sync/state_sync.rs @@ -21,6 +21,7 @@ use crate::core::core::{hash::Hashed, pmmr::segment::SegmentType}; use crate::core::global; use crate::core::pow::Difficulty; use crate::p2p::{self, Capabilities, Peer}; +use crate::util::StopState; /// Fast sync has 3 "states": /// * syncing headers @@ -61,6 +62,7 @@ impl StateSync { head: &chain::Tip, tail: &chain::Tip, highest_height: u64, + stop_state: Arc, ) -> bool { trace!("state_sync: head.height: {}, tail.height: {}. header_head.height: {}, highest_height: {}", head.height, tail.height, header_head.height, highest_height, @@ -95,10 +97,7 @@ impl StateSync { let archive_header = self.chain.txhashset_archive_header_header_only().unwrap(); error!("PIBD Reported Failure - Restarting Sync"); // reset desegmenter state - let desegmenter = self - .chain - .desegmenter(&archive_header, self.sync_state.clone()) - .unwrap(); + let desegmenter = self.chain.desegmenter(&archive_header).unwrap(); if let Some(d) = desegmenter.write().as_mut() { d.reset(); @@ -112,7 +111,8 @@ impl StateSync { if let Err(e) = self.chain.reset_prune_lists() { error!("pibd_sync restart: reset prune lists error = {}", e); } - self.sync_state.update_pibd_progress(false, false, 1, 1); + self.sync_state + .update_pibd_progress(false, false, 0, 1, &archive_header); sync_need_restart = true; } } @@ -161,22 +161,34 @@ impl StateSync { return true; } let (launch, _download_timeout) = self.state_sync_due(); + let archive_header = { self.chain.txhashset_archive_header_header_only().unwrap() }; if launch { - let archive_header = self.chain.txhashset_archive_header_header_only().unwrap(); self.sync_state - .update_pibd_progress(false, false, 1, archive_header.height); - let desegmenter = self - .chain - .desegmenter(&archive_header, self.sync_state.clone()) - .unwrap(); - - if let Some(d) = desegmenter.read().as_ref() { - d.launch_validation_thread() - }; + .update_pibd_progress(false, false, 0, 1, &archive_header); } // Continue our PIBD process (which returns true if all segments are in) if self.continue_pibd() { - return false; + let desegmenter = self.chain.desegmenter(&archive_header).unwrap(); + // All segments in, validate + if let Some(d) = desegmenter.read().as_ref() { + if d.check_progress(self.sync_state.clone()) { + if let Err(e) = d.validate_complete_state( + self.sync_state.clone(), + stop_state.clone(), + ) { + error!("error validating PIBD state: {}", e); + self.sync_state.update_pibd_progress( + false, + true, + 0, + 1, + &archive_header, + ); + return false; + } + return true; + } + }; } } else { let (go, download_timeout) = self.state_sync_due(); @@ -215,10 +227,12 @@ impl StateSync { fn continue_pibd(&mut self) -> bool { // Check the state of our chain to figure out what we should be requesting next let archive_header = self.chain.txhashset_archive_header_header_only().unwrap(); - let desegmenter = self - .chain - .desegmenter(&archive_header, self.sync_state.clone()) - .unwrap(); + let desegmenter = self.chain.desegmenter(&archive_header).unwrap(); + + // Remove stale requests + // TODO: verify timing + let timeout_time = Utc::now() + Duration::seconds(15); + self.sync_state.remove_stale_pibd_requests(timeout_time); // Apply segments... TODO: figure out how this should be called, might // need to be a separate thread. @@ -226,8 +240,9 @@ impl StateSync { if let Some(d) = de.as_mut() { let res = d.apply_next_segments(); if let Err(e) = res { - debug!("error applying segment: {}", e); - self.sync_state.update_pibd_progress(false, true, 1, 1); + error!("error applying segment: {}", e); + self.sync_state + .update_pibd_progress(false, true, 0, 1, &archive_header); return false; } } @@ -237,7 +252,7 @@ impl StateSync { // requests we want to send to peers let mut next_segment_ids = vec![]; if let Some(d) = desegmenter.write().as_mut() { - if d.is_complete() { + if d.check_progress(self.sync_state.clone()) { return true; } // Figure out the next segments we need diff --git a/servers/src/grin/sync/syncer.rs b/servers/src/grin/sync/syncer.rs index 395255943..d3642970c 100644 --- a/servers/src/grin/sync/syncer.rs +++ b/servers/src/grin/sync/syncer.rs @@ -209,7 +209,7 @@ impl SyncRunner { match self.sync_state.status() { SyncStatus::TxHashsetPibd { .. } | SyncStatus::TxHashsetDownload { .. } - | SyncStatus::TxHashsetSetup + | SyncStatus::TxHashsetSetup { .. } | SyncStatus::TxHashsetRangeProofsValidation { .. } | SyncStatus::TxHashsetKernelsValidation { .. } | SyncStatus::TxHashsetSave @@ -229,7 +229,13 @@ impl SyncRunner { } if check_state_sync { - state_sync.check_run(&header_head, &head, &tail, highest_height); + state_sync.check_run( + &header_head, + &head, + &tail, + highest_height, + self.stop_state.clone(), + ); } } } diff --git a/src/bin/tui/status.rs b/src/bin/tui/status.rs index 75022667f..992905da3 100644 --- a/src/bin/tui/status.rs +++ b/src/bin/tui/status.rs @@ -53,17 +53,19 @@ impl TUIStatusView { SyncStatus::TxHashsetPibd { aborted: _, errored: _, - completed_to_height, - required_height, + completed_leaves, + leaves_required, + completed_to_height: _, + required_height: _, } => { - let percent = if required_height == 0 { + let percent = if completed_leaves == 0 { 0 } else { - completed_to_height * 100 / required_height + completed_leaves * 100 / leaves_required }; Cow::Owned(format!( - "Sync step 2/7: Downloading chain state - {} / {} Blocks - {}%", - completed_to_height, required_height, percent + "Sync step 2/7: Downloading Tx state (PIBD) - {} / {} entries - {}%", + completed_leaves, leaves_required, percent )) } SyncStatus::TxHashsetDownload(stat) => { @@ -88,8 +90,31 @@ impl TUIStatusView { )) } } - SyncStatus::TxHashsetSetup => { - Cow::Borrowed("Sync step 3/7: Preparing chain state for validation") + SyncStatus::TxHashsetSetup { + headers, + headers_total, + kernel_pos, + kernel_pos_total, + } => { + if headers.is_some() && headers_total.is_some() { + let h = headers.unwrap(); + let ht = headers_total.unwrap(); + let percent = h * 100 / ht; + Cow::Owned(format!( + "Sync step 3/7: Preparing for validation (kernel history) - {}/{} - {}%", + h, ht, percent + )) + } else if kernel_pos.is_some() && kernel_pos_total.is_some() { + let k = kernel_pos.unwrap(); + let kt = kernel_pos_total.unwrap(); + let percent = k * 100 / kt; + Cow::Owned(format!( + "Sync step 3/7: Preparing for validation (kernel position) - {}/{} - {}%", + k, kt, percent + )) + } else { + Cow::Borrowed("Sync step 3/7: Preparing chain state for validation") + } } SyncStatus::TxHashsetRangeProofsValidation { rproofs, diff --git a/store/src/leaf_set.rs b/store/src/leaf_set.rs index 77c140c03..ce839a5f0 100644 --- a/store/src/leaf_set.rs +++ b/store/src/leaf_set.rs @@ -196,6 +196,11 @@ impl LeafSet { self.bitmap.cardinality() as usize } + /// Number of positions up to index n in the leaf set + pub fn n_unpruned_leaves_to_index(&self, to_index: u64) -> u64 { + self.bitmap.range_cardinality(0..to_index) + } + /// Is the leaf_set empty. pub fn is_empty(&self) -> bool { self.len() == 0 diff --git a/store/src/pmmr.rs b/store/src/pmmr.rs index d69e9bb48..790ee1a43 100644 --- a/store/src/pmmr.rs +++ b/store/src/pmmr.rs @@ -180,6 +180,14 @@ impl Backend for PMMRBackend { } } + fn n_unpruned_leaves_to_index(&self, to_index: u64) -> u64 { + if self.prunable { + self.leaf_set.n_unpruned_leaves_to_index(to_index) + } else { + pmmr::n_leaves(pmmr::insertion_to_pmmr_index(to_index)) + } + } + /// Returns an iterator over all the leaf insertion indices (0-indexed). /// If our pos are [1,2,4,5,8] (first 5 leaf pos) then our insertion indices are [0,1,2,3,4] fn leaf_idx_iter(&self, from_idx: u64) -> Box + '_> {