[PIBD_IMPL] Thread simplification + More TUI Updates + Stop State Propagation (#3698)

* change pibd stat display to show progress as a percentage of downloaded leaves

* attempt some inline rp validation

* propagate shutdown state through kernel validation

* change validation loop timing

* simplify validator threading

* add more detailed tracking of kernel history validation to tui, allow stop state during

* adding more stop state + tui progress indication

* remove progressive validate

* test fix
This commit is contained in:
Yeastplume 2022-03-01 13:52:16 +00:00 committed by GitHub
parent bf48e52374
commit 21b1ac50d7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 428 additions and 247 deletions

View file

@ -37,7 +37,6 @@ use crate::{
core::core::hash::{Hash, Hashed},
store::Batch,
txhashset::{ExtensionPair, HeaderExtension},
SyncState,
};
use grin_store::Error::NotFoundErr;
use std::collections::HashMap;
@ -698,8 +697,15 @@ impl Chain {
// ensure the view is consistent.
txhashset::extending_readonly(&mut header_pmmr, &mut txhashset, |ext, batch| {
self.rewind_and_apply_fork(&header, ext, batch)?;
ext.extension
.validate(&self.genesis, fast_validation, &NoStatus, &header)?;
ext.extension.validate(
&self.genesis,
fast_validation,
&NoStatus,
None,
None,
&header,
None,
)?;
Ok(())
})
}
@ -903,7 +909,6 @@ impl Chain {
pub fn desegmenter(
&self,
archive_header: &BlockHeader,
sync_state: Arc<SyncState>,
) -> Result<Arc<RwLock<Option<Desegmenter>>>, Error> {
// Use our cached desegmenter if we have one and the associated header matches.
if let Some(d) = self.pibd_desegmenter.write().as_ref() {
@ -911,14 +916,10 @@ impl Chain {
return Ok(self.pibd_desegmenter.clone());
}
}
// If no desegmenter or headers don't match init
// Stop previous thread if running
if let Some(d) = self.pibd_desegmenter.read().as_ref() {
d.stop_validation_thread();
}
// TODO: (Check whether we can do this.. we *should* be able to modify this as the desegmenter
// is in flight and we cross a horizon boundary, but needs more thinking)
let desegmenter = self.init_desegmenter(archive_header, sync_state)?;
let desegmenter = self.init_desegmenter(archive_header)?;
let mut cache = self.pibd_desegmenter.write();
*cache = Some(desegmenter.clone());
@ -928,11 +929,7 @@ impl Chain {
/// initialize a desegmenter, which is capable of extending the hashset by appending
/// PIBD segments of the three PMMR trees + Bitmap PMMR
/// header should be the same header as selected for the txhashset.zip archive
fn init_desegmenter(
&self,
header: &BlockHeader,
sync_state: Arc<SyncState>,
) -> Result<Desegmenter, Error> {
fn init_desegmenter(&self, header: &BlockHeader) -> Result<Desegmenter, Error> {
debug!(
"init_desegmenter: initializing new desegmenter for {} at {}",
header.hash(),
@ -945,7 +942,6 @@ impl Chain {
header.clone(),
self.genesis.clone(),
self.store.clone(),
sync_state,
))
}
@ -1086,7 +1082,7 @@ impl Chain {
txhashset_data: File,
status: &dyn TxHashsetWriteStatus,
) -> Result<bool, Error> {
status.on_setup();
status.on_setup(None, None, None, None);
// Initial check whether this txhashset is needed or not
let fork_point = self.fork_point()?;
@ -1126,7 +1122,7 @@ impl Chain {
let header_pmmr = self.header_pmmr.read();
let batch = self.store.batch()?;
txhashset.verify_kernel_pos_index(&self.genesis, &header_pmmr, &batch)?;
txhashset.verify_kernel_pos_index(&self.genesis, &header_pmmr, &batch, None, None)?;
}
// all good, prepare a new batch and update all the required records
@ -1145,7 +1141,7 @@ impl Chain {
// Validate the extension, generating the utxo_sum and kernel_sum.
// Full validation, including rangeproofs and kernel signature verification.
let (utxo_sum, kernel_sum) =
extension.validate(&self.genesis, false, status, &header)?;
extension.validate(&self.genesis, false, status, None, None, &header, None)?;
// Save the block_sums (utxo_sum, kernel_sum) to the db for use later.
batch.save_block_sums(

View file

@ -16,8 +16,6 @@
//! segmenter
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use crate::core::core::hash::{Hash, Hashed};
use crate::core::core::{pmmr, pmmr::ReadablePMMR};
@ -44,12 +42,9 @@ pub struct Desegmenter {
header_pmmr: Arc<RwLock<txhashset::PMMRHandle<BlockHeader>>>,
archive_header: BlockHeader,
store: Arc<store::ChainStore>,
sync_state: Arc<SyncState>,
genesis: BlockHeader,
validator_stop_state: Arc<StopState>,
default_bitmap_segment_height: u8,
default_output_segment_height: u8,
default_rangeproof_segment_height: u8,
@ -78,7 +73,6 @@ impl Desegmenter {
archive_header: BlockHeader,
genesis: BlockHeader,
store: Arc<store::ChainStore>,
sync_state: Arc<SyncState>,
) -> Desegmenter {
trace!("Creating new desegmenter");
let mut retval = Desegmenter {
@ -86,9 +80,7 @@ impl Desegmenter {
header_pmmr,
archive_header,
store,
sync_state,
genesis,
validator_stop_state: Arc::new(StopState::new()),
bitmap_accumulator: BitmapAccumulator::new(),
default_bitmap_segment_height: 9,
default_output_segment_height: 11,
@ -139,133 +131,94 @@ impl Desegmenter {
self.all_segments_complete
}
/// Launch a separate validation thread, which will update and validate the body head
/// as we go
pub fn launch_validation_thread(&self) {
let stop_state = self.validator_stop_state.clone();
let txhashset = self.txhashset.clone();
let header_pmmr = self.header_pmmr.clone();
let store = self.store.clone();
let genesis = self.genesis.clone();
let status = self.sync_state.clone();
let desegmenter = Arc::new(RwLock::new(self.clone()));
let _ = thread::Builder::new()
.name("pibd-validation".to_string())
.spawn(move || {
Desegmenter::validation_loop(
stop_state,
txhashset,
store,
desegmenter,
header_pmmr,
genesis,
status,
);
});
}
/// Stop the validation loop
pub fn stop_validation_thread(&self) {
self.validator_stop_state.stop();
}
/// Validation loop
fn validation_loop(
stop_state: Arc<StopState>,
txhashset: Arc<RwLock<TxHashSet>>,
store: Arc<store::ChainStore>,
desegmenter: Arc<RwLock<Desegmenter>>,
header_pmmr: Arc<RwLock<txhashset::PMMRHandle<BlockHeader>>>,
genesis: BlockHeader,
status: Arc<SyncState>,
) {
/// Check progress, update status if needed, returns true if all required
/// segments are in place
pub fn check_progress(&self, status: Arc<SyncState>) -> bool {
let mut latest_block_height = 0;
let header_head = { desegmenter.read().header().clone() };
loop {
if stop_state.is_stopped() {
break;
}
thread::sleep(Duration::from_millis(5000));
trace!("In Desegmenter Validation Loop");
let local_output_mmr_size;
let local_kernel_mmr_size;
let local_rangeproof_mmr_size;
let local_output_mmr_size;
let local_kernel_mmr_size;
let local_rangeproof_mmr_size;
{
let txhashset = self.txhashset.read();
local_output_mmr_size = txhashset.output_mmr_size();
local_kernel_mmr_size = txhashset.kernel_mmr_size();
local_rangeproof_mmr_size = txhashset.rangeproof_mmr_size();
}
// going to try presenting PIBD progress as total leaves downloaded
// total segments probably doesn't make much sense since the segment
// sizes will be able to change over time, and representative block height
// can be too lopsided if one pmmr completes faster, so perhaps just
// use total leaves downloaded and display as a percentage
let completed_leaves = pmmr::n_leaves(local_output_mmr_size)
+ pmmr::n_leaves(local_rangeproof_mmr_size)
+ pmmr::n_leaves(local_kernel_mmr_size);
// Find latest 'complete' header.
// First take lesser of rangeproof and output mmr sizes
let latest_output_size = std::cmp::min(local_output_mmr_size, local_rangeproof_mmr_size);
// Find first header in which 'output_mmr_size' and 'kernel_mmr_size' are greater than
// given sizes
let res = {
let header_pmmr = self.header_pmmr.read();
header_pmmr.get_first_header_with(
latest_output_size,
local_kernel_mmr_size,
latest_block_height,
self.store.clone(),
)
};
if let Some(h) = res {
latest_block_height = h.height;
// TODO: Unwraps
let tip = Tip::from_header(&h);
let batch = self.store.batch().unwrap();
batch.save_pibd_head(&tip).unwrap();
batch.commit().unwrap();
status.update_pibd_progress(
false,
false,
completed_leaves,
latest_block_height,
&self.archive_header,
);
if local_kernel_mmr_size == self.archive_header.kernel_mmr_size
&& local_output_mmr_size == self.archive_header.output_mmr_size
&& local_rangeproof_mmr_size == self.archive_header.output_mmr_size
{
let txhashset = txhashset.read();
local_output_mmr_size = txhashset.output_mmr_size();
local_kernel_mmr_size = txhashset.kernel_mmr_size();
local_rangeproof_mmr_size = txhashset.rangeproof_mmr_size();
}
trace!(
"Desegmenter Validation: Output MMR Size: {}",
local_output_mmr_size
);
trace!(
"Desegmenter Validation: Rangeproof MMR Size: {}",
local_rangeproof_mmr_size
);
trace!(
"Desegmenter Validation: Kernel MMR Size: {}",
local_kernel_mmr_size
);
// Find latest 'complete' header.
// First take lesser of rangeproof and output mmr sizes
let latest_output_size =
std::cmp::min(local_output_mmr_size, local_rangeproof_mmr_size);
// Find first header in which 'output_mmr_size' and 'kernel_mmr_size' are greater than
// given sizes
{
let header_pmmr = header_pmmr.read();
let res = header_pmmr.get_first_header_with(
latest_output_size,
local_kernel_mmr_size,
latest_block_height,
store.clone(),
);
if let Some(h) = res {
latest_block_height = h.height;
debug!(
"PIBD Desegmenter Validation Loop: PMMRs complete up to block {}: {:?}",
h.height, h
);
// TODO: 'In-flight' validation. At the moment the entire tree
// will be presented for validation after all segments are downloaded
// TODO: Unwraps
let tip = Tip::from_header(&h);
let batch = store.batch().unwrap();
batch.save_pibd_head(&tip).unwrap();
batch.commit().unwrap();
status.update_pibd_progress(
false,
false,
latest_block_height,
header_head.height,
);
if h == header_head {
// get out of this loop and move on to validation
break;
}
}
// All is complete
return true;
}
}
// If all done, kick off validation, setting error state if necessary
if let Err(e) = Desegmenter::validate_complete_state(
false
/*if let Err(e) = Desegmenter::validate_complete_state(
txhashset,
store,
header_pmmr,
&header_head,
genesis,
last_validated_rangeproof_pos,
status.clone(),
stop_state.clone(),
) {
error!("Error validating pibd hashset: {}", e);
status.update_pibd_progress(false, true, latest_block_height, header_head.height);
status.update_pibd_progress(
false,
true,
completed_leaves,
latest_block_height,
&header_head,
);
}
stop_state.stop();
stop_state.stop();*/
}
/// TODO: This is largely copied from chain.rs txhashset_write and related functions,
@ -273,34 +226,39 @@ impl Desegmenter {
/// segments are still being downloaded and applied. Current validation logic is all tied up
/// around unzipping, so re-developing this logic separate from the txhashset version
/// will to allow this to happen more cleanly
fn validate_complete_state(
txhashset: Arc<RwLock<TxHashSet>>,
store: Arc<store::ChainStore>,
header_pmmr: Arc<RwLock<txhashset::PMMRHandle<BlockHeader>>>,
header_head: &BlockHeader,
genesis: BlockHeader,
pub fn validate_complete_state(
&self,
status: Arc<SyncState>,
stop_state: Arc<StopState>,
) -> Result<(), Error> {
// Quick root check first:
{
let txhashset = txhashset.read();
txhashset.roots().validate(header_head)?;
let txhashset = self.txhashset.read();
txhashset.roots().validate(&self.archive_header)?;
}
status.on_setup();
// TODO: Keep track of this in the DB so we can pick up where we left off if needed
let last_rangeproof_validation_pos = 0;
// Validate kernel history
{
debug!("desegmenter validation: rewinding and validating kernel history (readonly)");
let txhashset = txhashset.read();
let txhashset = self.txhashset.read();
let mut count = 0;
let mut current = header_head.clone();
let mut current = self.archive_header.clone();
let total = current.height;
txhashset::rewindable_kernel_view(&txhashset, |view, batch| {
while current.height > 0 {
view.rewind(&current)?;
view.validate_root()?;
current = batch.get_previous_header(&current)?;
count += 1;
if current.height % 100000 == 0 || current.height == total {
status.on_setup(Some(total - current.height), Some(total), None, None);
}
if stop_state.is_stopped() {
return Ok(());
}
}
Ok(())
})?;
@ -310,38 +268,64 @@ impl Desegmenter {
);
}
if stop_state.is_stopped() {
return Ok(());
}
// Check kernel MMR root for every block header.
// Check NRD relative height rules for full kernel history.
{
let txhashset = txhashset.read();
let header_pmmr = header_pmmr.read();
let batch = store.batch()?;
txhashset.verify_kernel_pos_index(&genesis, &header_pmmr, &batch)?;
let txhashset = self.txhashset.read();
let header_pmmr = self.header_pmmr.read();
let batch = self.store.batch()?;
txhashset.verify_kernel_pos_index(
&self.genesis,
&header_pmmr,
&batch,
Some(status.clone()),
Some(stop_state.clone()),
)?;
}
if stop_state.is_stopped() {
return Ok(());
}
status.on_setup(None, None, None, None);
// Prepare a new batch and update all the required records
{
debug!("desegmenter validation: rewinding a 2nd time (writeable)");
let mut txhashset = txhashset.write();
let mut header_pmmr = header_pmmr.write();
let mut batch = store.batch()?;
let mut txhashset = self.txhashset.write();
let mut header_pmmr = self.header_pmmr.write();
let mut batch = self.store.batch()?;
txhashset::extending(
&mut header_pmmr,
&mut txhashset,
&mut batch,
|ext, batch| {
let extension = &mut ext.extension;
extension.rewind(&header_head, batch)?;
extension.rewind(&self.archive_header, batch)?;
// Validate the extension, generating the utxo_sum and kernel_sum.
// Full validation, including rangeproofs and kernel signature verification.
let (utxo_sum, kernel_sum) =
extension.validate(&genesis, false, &*status, &header_head)?;
let (utxo_sum, kernel_sum) = extension.validate(
&self.genesis,
false,
&*status,
Some(last_rangeproof_validation_pos),
None,
&self.archive_header,
Some(stop_state.clone()),
)?;
if stop_state.is_stopped() {
return Ok(());
}
// Save the block_sums (utxo_sum, kernel_sum) to the db for use later.
batch.save_block_sums(
&header_head.hash(),
&self.archive_header.hash(),
BlockSums {
utxo_sum,
kernel_sum,
@ -352,12 +336,16 @@ impl Desegmenter {
},
)?;
if stop_state.is_stopped() {
return Ok(());
}
debug!("desegmenter_validation: finished validating and rebuilding");
status.on_save();
{
// Save the new head to the db and rebuild the header by height index.
let tip = Tip::from_header(&header_head);
let tip = Tip::from_header(&self.archive_header);
// TODO: Throw error
batch.save_body_head(&tip)?;

View file

@ -34,7 +34,8 @@ use crate::txhashset::bitmap_accumulator::{BitmapAccumulator, BitmapChunk};
use crate::txhashset::{RewindableKernelView, UTXOView};
use crate::types::{CommitPos, OutputRoots, Tip, TxHashSetRoots, TxHashsetWriteStatus};
use crate::util::secp::pedersen::{Commitment, RangeProof};
use crate::util::{file, secp_static, zip};
use crate::util::{file, secp_static, zip, StopState};
use crate::SyncState;
use croaring::Bitmap;
use grin_store::pmmr::{clean_files_by_prefix, PMMRBackend};
use std::cmp::Ordering;
@ -541,7 +542,7 @@ impl TxHashSet {
let cutoff = head.height.saturating_sub(WEEK_HEIGHT * 2);
let cutoff_hash = header_pmmr.get_header_hash_by_height(cutoff)?;
let cutoff_header = batch.get_block_header(&cutoff_hash)?;
self.verify_kernel_pos_index(&cutoff_header, header_pmmr, batch)
self.verify_kernel_pos_index(&cutoff_header, header_pmmr, batch, None, None)
}
/// Verify and (re)build the NRD kernel_pos index from the provided header onwards.
@ -550,6 +551,8 @@ impl TxHashSet {
from_header: &BlockHeader,
header_pmmr: &PMMRHandle<BlockHeader>,
batch: &Batch<'_>,
status: Option<Arc<SyncState>>,
stop_state: Option<Arc<StopState>>,
) -> Result<(), Error> {
if !global::is_nrd_enabled() {
return Ok(());
@ -578,6 +581,8 @@ impl TxHashSet {
let mut current_pos = prev_size + 1;
let mut current_header = from_header.clone();
let mut count = 0;
let total = pmmr::n_leaves(self.kernel_pmmr_h.size);
let mut applied = 0;
while current_pos <= self.kernel_pmmr_h.size {
if pmmr::is_leaf(current_pos - 1) {
if let Some(kernel) = kernel_pmmr.get_data(current_pos - 1) {
@ -598,7 +603,19 @@ impl TxHashSet {
_ => {}
}
}
applied += 1;
if let Some(ref s) = status {
if total % applied == 10000 {
s.on_setup(None, None, Some(applied), Some(total));
}
}
}
if let Some(ref s) = stop_state {
if s.is_stopped() {
return Ok(());
}
}
current_pos += 1;
}
@ -1799,7 +1816,10 @@ impl<'a> Extension<'a> {
genesis: &BlockHeader,
fast_validation: bool,
status: &dyn TxHashsetWriteStatus,
output_start_pos: Option<u64>,
_kernel_start_pos: Option<u64>,
header: &BlockHeader,
stop_state: Option<Arc<StopState>>,
) -> Result<(Commitment, Commitment), Error> {
self.validate_mmrs()?;
self.validate_roots(header)?;
@ -1817,10 +1837,26 @@ impl<'a> Extension<'a> {
// These are expensive verification step (skipped for "fast validation").
if !fast_validation {
// Verify the rangeproof associated with each unspent output.
self.verify_rangeproofs(status)?;
self.verify_rangeproofs(
Some(status),
output_start_pos,
None,
false,
stop_state.clone(),
)?;
if let Some(ref s) = stop_state {
if s.is_stopped() {
return Err(ErrorKind::Stopped.into());
}
}
// Verify all the kernel signatures.
self.verify_kernel_signatures(status)?;
self.verify_kernel_signatures(status, stop_state.clone())?;
if let Some(ref s) = stop_state {
if s.is_stopped() {
return Err(ErrorKind::Stopped.into());
}
}
}
Ok((output_sum, kernel_sum))
@ -1863,7 +1899,11 @@ impl<'a> Extension<'a> {
)
}
fn verify_kernel_signatures(&self, status: &dyn TxHashsetWriteStatus) -> Result<(), Error> {
fn verify_kernel_signatures(
&self,
status: &dyn TxHashsetWriteStatus,
stop_state: Option<Arc<StopState>>,
) -> Result<(), Error> {
let now = Instant::now();
const KERNEL_BATCH_SIZE: usize = 5_000;
@ -1884,6 +1924,11 @@ impl<'a> Extension<'a> {
kern_count += tx_kernels.len() as u64;
tx_kernels.clear();
status.on_validation_kernels(kern_count, total_kernels);
if let Some(ref s) = stop_state {
if s.is_stopped() {
return Ok(());
}
}
debug!(
"txhashset: verify_kernel_signatures: verified {} signatures",
kern_count,
@ -1901,16 +1946,36 @@ impl<'a> Extension<'a> {
Ok(())
}
fn verify_rangeproofs(&self, status: &dyn TxHashsetWriteStatus) -> Result<(), Error> {
fn verify_rangeproofs(
&self,
status: Option<&dyn TxHashsetWriteStatus>,
start_pos: Option<u64>,
batch_size: Option<usize>,
single_iter: bool,
stop_state: Option<Arc<StopState>>,
) -> Result<u64, Error> {
let now = Instant::now();
let mut commits: Vec<Commitment> = Vec::with_capacity(1_000);
let mut proofs: Vec<RangeProof> = Vec::with_capacity(1_000);
let batch_size = batch_size.unwrap_or(1_000);
let mut commits: Vec<Commitment> = Vec::with_capacity(batch_size);
let mut proofs: Vec<RangeProof> = Vec::with_capacity(batch_size);
let mut proof_count = 0;
if let Some(s) = start_pos {
if let Some(i) = pmmr::pmmr_leaf_to_insertion_index(s) {
proof_count = self.output_pmmr.n_unpruned_leaves_to_index(i) as usize;
}
}
let total_rproofs = self.output_pmmr.n_unpruned_leaves();
for pos0 in self.output_pmmr.leaf_pos_iter() {
if let Some(p) = start_pos {
if pos0 < p {
continue;
}
}
let output = self.output_pmmr.get_data(pos0);
let proof = self.rproof_pmmr.get_data(pos0);
@ -1927,7 +1992,7 @@ impl<'a> Extension<'a> {
proof_count += 1;
if proofs.len() >= 1_000 {
if proofs.len() >= batch_size {
Output::batch_verify_proofs(&commits, &proofs)?;
commits.clear();
proofs.clear();
@ -1935,13 +2000,21 @@ impl<'a> Extension<'a> {
"txhashset: verify_rangeproofs: verified {} rangeproofs",
proof_count,
);
if proof_count % 1_000 == 0 {
status.on_validation_rproofs(proof_count, total_rproofs);
if let Some(s) = status {
s.on_validation_rproofs(proof_count as u64, total_rproofs);
}
if let Some(ref s) = stop_state {
if s.is_stopped() {
return Ok(pos0);
}
}
if single_iter {
return Ok(pos0);
}
}
}
// remaining part which not full of 1000 range proofs
// remaining part which not full of batch_size range proofs
if !proofs.is_empty() {
Output::batch_verify_proofs(&commits, &proofs)?;
commits.clear();
@ -1958,7 +2031,7 @@ impl<'a> Extension<'a> {
self.rproof_pmmr.unpruned_size(),
now.elapsed().as_secs(),
);
Ok(())
Ok(0)
}
}

View file

@ -17,7 +17,7 @@
use chrono::prelude::{DateTime, Utc};
use crate::core::core::hash::{Hash, Hashed, ZERO_HASH};
use crate::core::core::{Block, BlockHeader, HeaderVersion, SegmentTypeIdentifier};
use crate::core::core::{pmmr, Block, BlockHeader, HeaderVersion, SegmentTypeIdentifier};
use crate::core::pow::Difficulty;
use crate::core::ser::{self, PMMRIndexHashable, Readable, Reader, Writeable, Writer};
use crate::error::{Error, ErrorKind};
@ -65,6 +65,10 @@ pub enum SyncStatus {
aborted: bool,
/// whether we got an error anywhere (in which case restart the process)
errored: bool,
/// total number of leaves applied
completed_leaves: u64,
/// total number of leaves required by archive header
leaves_required: u64,
/// 'height', i.e. last 'block' for which there is complete
/// pmmr data
completed_to_height: u64,
@ -74,7 +78,16 @@ pub enum SyncStatus {
/// Downloading the various txhashsets
TxHashsetDownload(TxHashsetDownloadStats),
/// Setting up before validation
TxHashsetSetup,
TxHashsetSetup {
/// number of 'headers' for which kernels have been checked
headers: Option<u64>,
/// headers total
headers_total: Option<u64>,
/// kernel position portion
kernel_pos: Option<u64>,
/// total kernel position
kernel_pos_total: Option<u64>,
},
/// Validating the kernels
TxHashsetKernelsValidation {
/// kernels validated
@ -134,6 +147,25 @@ impl Default for TxHashsetDownloadStats {
}
}
/// Container for entry in requested PIBD segments
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct PIBDSegmentContainer {
/// Segment+Type Identifier
pub identifier: SegmentTypeIdentifier,
/// Time at which this request was made
pub request_time: DateTime<Utc>,
}
impl PIBDSegmentContainer {
/// Return container with timestamp
pub fn new(identifier: SegmentTypeIdentifier) -> Self {
Self {
identifier,
request_time: Utc::now(),
}
}
}
/// Current sync state. Encapsulates the current SyncStatus.
pub struct SyncState {
current: RwLock<SyncStatus>,
@ -145,7 +177,7 @@ pub struct SyncState {
/// available where it will be needed (both in the adapter
/// and the sync loop)
/// TODO: Better struct for this, perhaps hash identifiers
requested_pibd_segments: RwLock<Vec<SegmentTypeIdentifier>>,
requested_pibd_segments: RwLock<Vec<PIBDSegmentContainer>>,
}
impl SyncState {
@ -229,30 +261,49 @@ impl SyncState {
&self,
aborted: bool,
errored: bool,
completed_leaves: u64,
completed_to_height: u64,
required_height: u64,
archive_header: &BlockHeader,
) {
let leaves_required = pmmr::n_leaves(archive_header.output_mmr_size) * 2
+ pmmr::n_leaves(archive_header.kernel_mmr_size);
*self.current.write() = SyncStatus::TxHashsetPibd {
aborted,
errored,
completed_leaves,
leaves_required,
completed_to_height,
required_height,
required_height: archive_header.height,
};
}
/// Update PIBD segment list
pub fn add_pibd_segment(&self, id: &SegmentTypeIdentifier) {
self.requested_pibd_segments.write().push(id.clone());
self.requested_pibd_segments
.write()
.push(PIBDSegmentContainer::new(id.clone()));
}
/// Remove segment from list
pub fn remove_pibd_segment(&self, id: &SegmentTypeIdentifier) {
self.requested_pibd_segments.write().retain(|i| i != id);
self.requested_pibd_segments
.write()
.retain(|i| &i.identifier != id);
}
/// Remove segments with request timestamps less cutoff time
pub fn remove_stale_pibd_requests(&self, cutoff_time: DateTime<Utc>) {
self.requested_pibd_segments
.write()
.retain(|i| i.request_time < cutoff_time);
}
/// Check whether segment is in request list
pub fn contains_pibd_segment(&self, id: &SegmentTypeIdentifier) -> bool {
self.requested_pibd_segments.read().contains(id)
self.requested_pibd_segments
.read()
.iter()
.any(|i| &i.identifier == id)
}
/// Communicate sync error
@ -272,8 +323,19 @@ impl SyncState {
}
impl TxHashsetWriteStatus for SyncState {
fn on_setup(&self) {
self.update(SyncStatus::TxHashsetSetup);
fn on_setup(
&self,
headers: Option<u64>,
headers_total: Option<u64>,
kernel_pos: Option<u64>,
kernel_pos_total: Option<u64>,
) {
self.update(SyncStatus::TxHashsetSetup {
headers,
headers_total,
kernel_pos,
kernel_pos_total,
});
}
fn on_validation_kernels(&self, kernels: u64, kernels_total: u64) {
@ -500,7 +562,13 @@ pub trait ChainAdapter {
/// those values as the processing progresses.
pub trait TxHashsetWriteStatus {
/// First setup of the txhashset
fn on_setup(&self);
fn on_setup(
&self,
headers: Option<u64>,
header_total: Option<u64>,
kernel_pos: Option<u64>,
kernel_pos_total: Option<u64>,
);
/// Starting kernel validation
fn on_validation_kernels(&self, kernels: u64, kernel_total: u64);
/// Starting rproof validation
@ -515,7 +583,7 @@ pub trait TxHashsetWriteStatus {
pub struct NoStatus;
impl TxHashsetWriteStatus for NoStatus {
fn on_setup(&self) {}
fn on_setup(&self, _hs: Option<u64>, _ht: Option<u64>, _kp: Option<u64>, _kpt: Option<u64>) {}
fn on_validation_kernels(&self, _ks: u64, _kts: u64) {}
fn on_validation_rproofs(&self, _rs: u64, _rt: u64) {}
fn on_save(&self) {}

View file

@ -24,7 +24,7 @@ use std::sync::Arc;
use std::{fs, io};
use crate::chain::txhashset::BitmapChunk;
use crate::chain::types::{NoopAdapter, Options, SyncState};
use crate::chain::types::{NoopAdapter, Options};
use crate::core::core::{
hash::{Hash, Hashed},
pmmr::segment::{Segment, SegmentIdentifier, SegmentType},
@ -177,9 +177,8 @@ impl DesegmenterRequestor {
// Emulate `continue_pibd` function, which would be called from state sync
// return whether is complete
pub fn continue_pibd(&mut self) -> bool {
let state = Arc::new(SyncState::new());
let archive_header = self.chain.txhashset_archive_header_header_only().unwrap();
let desegmenter = self.chain.desegmenter(&archive_header, state).unwrap();
let desegmenter = self.chain.desegmenter(&archive_header).unwrap();
// Apply segments... TODO: figure out how this should be called, might
// need to be a separate thread.

View file

@ -68,6 +68,9 @@ pub trait Backend<T: PMMRable> {
/// Number of leaves
fn n_unpruned_leaves(&self) -> u64;
/// Number of leaves up to the given leaf index
fn n_unpruned_leaves_to_index(&self, to_index: u64) -> u64;
/// Iterator over current (unpruned, unremoved) leaf insertion index.
/// Note: This differs from underlying MMR pos - [0, 1, 2, 3, 4] vs. [1, 2, 4, 5, 8].
fn leaf_idx_iter(&self, from_idx: u64) -> Box<dyn Iterator<Item = u64> + '_>;

View file

@ -58,6 +58,9 @@ pub trait ReadablePMMR {
/// Number of leaves in the MMR
fn n_unpruned_leaves(&self) -> u64;
/// Number of leaves in the MMR up to index
fn n_unpruned_leaves_to_index(&self, to_index: u64) -> u64;
/// Is the MMR empty?
fn is_empty(&self) -> bool {
self.unpruned_size() == 0
@ -485,6 +488,10 @@ where
fn n_unpruned_leaves(&self) -> u64 {
self.backend.n_unpruned_leaves()
}
fn n_unpruned_leaves_to_index(&self, to_index: u64) -> u64 {
self.backend.n_unpruned_leaves_to_index(to_index)
}
}
/// 64 bits all ones: 0b11111111...1

View file

@ -175,4 +175,8 @@ where
fn n_unpruned_leaves(&self) -> u64 {
self.backend.n_unpruned_leaves()
}
fn n_unpruned_leaves_to_index(&self, to_index: u64) -> u64 {
self.backend.n_unpruned_leaves_to_index(to_index)
}
}

View file

@ -90,6 +90,10 @@ impl<T: PMMRable> Backend<T> for VecBackend<T> {
unimplemented!()
}
fn n_unpruned_leaves_to_index(&self, _to_index: u64) -> u64 {
unimplemented!()
}
fn leaf_pos_iter(&self) -> Box<dyn Iterator<Item = u64> + '_> {
Box::new(
self.hashes

View file

@ -582,12 +582,7 @@ where
let archive_header = self.chain().txhashset_archive_header_header_only()?;
let identifier = segment.identifier().clone();
let mut retval = Ok(true);
if let Some(d) = self
.chain()
.desegmenter(&archive_header, self.sync_state.clone())?
.write()
.as_mut()
{
if let Some(d) = self.chain().desegmenter(&archive_header)?.write().as_mut() {
let res = d.add_bitmap_segment(segment, output_root);
if let Err(e) = res {
error!(
@ -620,12 +615,7 @@ where
let archive_header = self.chain().txhashset_archive_header_header_only()?;
let identifier = segment.identifier().clone();
let mut retval = Ok(true);
if let Some(d) = self
.chain()
.desegmenter(&archive_header, self.sync_state.clone())?
.write()
.as_mut()
{
if let Some(d) = self.chain().desegmenter(&archive_header)?.write().as_mut() {
let res = d.add_output_segment(segment, Some(bitmap_root));
if let Err(e) = res {
error!(
@ -656,12 +646,7 @@ where
let archive_header = self.chain().txhashset_archive_header_header_only()?;
let identifier = segment.identifier().clone();
let mut retval = Ok(true);
if let Some(d) = self
.chain()
.desegmenter(&archive_header, self.sync_state.clone())?
.write()
.as_mut()
{
if let Some(d) = self.chain().desegmenter(&archive_header)?.write().as_mut() {
let res = d.add_rangeproof_segment(segment);
if let Err(e) = res {
error!(
@ -692,12 +677,7 @@ where
let archive_header = self.chain().txhashset_archive_header_header_only()?;
let identifier = segment.identifier().clone();
let mut retval = Ok(true);
if let Some(d) = self
.chain()
.desegmenter(&archive_header, self.sync_state.clone())?
.write()
.as_mut()
{
if let Some(d) = self.chain().desegmenter(&archive_header)?.write().as_mut() {
let res = d.add_kernel_segment(segment);
if let Err(e) = res {
error!(

View file

@ -21,6 +21,7 @@ use crate::core::core::{hash::Hashed, pmmr::segment::SegmentType};
use crate::core::global;
use crate::core::pow::Difficulty;
use crate::p2p::{self, Capabilities, Peer};
use crate::util::StopState;
/// Fast sync has 3 "states":
/// * syncing headers
@ -61,6 +62,7 @@ impl StateSync {
head: &chain::Tip,
tail: &chain::Tip,
highest_height: u64,
stop_state: Arc<StopState>,
) -> bool {
trace!("state_sync: head.height: {}, tail.height: {}. header_head.height: {}, highest_height: {}",
head.height, tail.height, header_head.height, highest_height,
@ -95,10 +97,7 @@ impl StateSync {
let archive_header = self.chain.txhashset_archive_header_header_only().unwrap();
error!("PIBD Reported Failure - Restarting Sync");
// reset desegmenter state
let desegmenter = self
.chain
.desegmenter(&archive_header, self.sync_state.clone())
.unwrap();
let desegmenter = self.chain.desegmenter(&archive_header).unwrap();
if let Some(d) = desegmenter.write().as_mut() {
d.reset();
@ -112,7 +111,8 @@ impl StateSync {
if let Err(e) = self.chain.reset_prune_lists() {
error!("pibd_sync restart: reset prune lists error = {}", e);
}
self.sync_state.update_pibd_progress(false, false, 1, 1);
self.sync_state
.update_pibd_progress(false, false, 0, 1, &archive_header);
sync_need_restart = true;
}
}
@ -161,22 +161,34 @@ impl StateSync {
return true;
}
let (launch, _download_timeout) = self.state_sync_due();
let archive_header = { self.chain.txhashset_archive_header_header_only().unwrap() };
if launch {
let archive_header = self.chain.txhashset_archive_header_header_only().unwrap();
self.sync_state
.update_pibd_progress(false, false, 1, archive_header.height);
let desegmenter = self
.chain
.desegmenter(&archive_header, self.sync_state.clone())
.unwrap();
if let Some(d) = desegmenter.read().as_ref() {
d.launch_validation_thread()
};
.update_pibd_progress(false, false, 0, 1, &archive_header);
}
// Continue our PIBD process (which returns true if all segments are in)
if self.continue_pibd() {
return false;
let desegmenter = self.chain.desegmenter(&archive_header).unwrap();
// All segments in, validate
if let Some(d) = desegmenter.read().as_ref() {
if d.check_progress(self.sync_state.clone()) {
if let Err(e) = d.validate_complete_state(
self.sync_state.clone(),
stop_state.clone(),
) {
error!("error validating PIBD state: {}", e);
self.sync_state.update_pibd_progress(
false,
true,
0,
1,
&archive_header,
);
return false;
}
return true;
}
};
}
} else {
let (go, download_timeout) = self.state_sync_due();
@ -215,10 +227,12 @@ impl StateSync {
fn continue_pibd(&mut self) -> bool {
// Check the state of our chain to figure out what we should be requesting next
let archive_header = self.chain.txhashset_archive_header_header_only().unwrap();
let desegmenter = self
.chain
.desegmenter(&archive_header, self.sync_state.clone())
.unwrap();
let desegmenter = self.chain.desegmenter(&archive_header).unwrap();
// Remove stale requests
// TODO: verify timing
let timeout_time = Utc::now() + Duration::seconds(15);
self.sync_state.remove_stale_pibd_requests(timeout_time);
// Apply segments... TODO: figure out how this should be called, might
// need to be a separate thread.
@ -226,8 +240,9 @@ impl StateSync {
if let Some(d) = de.as_mut() {
let res = d.apply_next_segments();
if let Err(e) = res {
debug!("error applying segment: {}", e);
self.sync_state.update_pibd_progress(false, true, 1, 1);
error!("error applying segment: {}", e);
self.sync_state
.update_pibd_progress(false, true, 0, 1, &archive_header);
return false;
}
}
@ -237,7 +252,7 @@ impl StateSync {
// requests we want to send to peers
let mut next_segment_ids = vec![];
if let Some(d) = desegmenter.write().as_mut() {
if d.is_complete() {
if d.check_progress(self.sync_state.clone()) {
return true;
}
// Figure out the next segments we need

View file

@ -209,7 +209,7 @@ impl SyncRunner {
match self.sync_state.status() {
SyncStatus::TxHashsetPibd { .. }
| SyncStatus::TxHashsetDownload { .. }
| SyncStatus::TxHashsetSetup
| SyncStatus::TxHashsetSetup { .. }
| SyncStatus::TxHashsetRangeProofsValidation { .. }
| SyncStatus::TxHashsetKernelsValidation { .. }
| SyncStatus::TxHashsetSave
@ -229,7 +229,13 @@ impl SyncRunner {
}
if check_state_sync {
state_sync.check_run(&header_head, &head, &tail, highest_height);
state_sync.check_run(
&header_head,
&head,
&tail,
highest_height,
self.stop_state.clone(),
);
}
}
}

View file

@ -53,17 +53,19 @@ impl TUIStatusView {
SyncStatus::TxHashsetPibd {
aborted: _,
errored: _,
completed_to_height,
required_height,
completed_leaves,
leaves_required,
completed_to_height: _,
required_height: _,
} => {
let percent = if required_height == 0 {
let percent = if completed_leaves == 0 {
0
} else {
completed_to_height * 100 / required_height
completed_leaves * 100 / leaves_required
};
Cow::Owned(format!(
"Sync step 2/7: Downloading chain state - {} / {} Blocks - {}%",
completed_to_height, required_height, percent
"Sync step 2/7: Downloading Tx state (PIBD) - {} / {} entries - {}%",
completed_leaves, leaves_required, percent
))
}
SyncStatus::TxHashsetDownload(stat) => {
@ -88,8 +90,31 @@ impl TUIStatusView {
))
}
}
SyncStatus::TxHashsetSetup => {
Cow::Borrowed("Sync step 3/7: Preparing chain state for validation")
SyncStatus::TxHashsetSetup {
headers,
headers_total,
kernel_pos,
kernel_pos_total,
} => {
if headers.is_some() && headers_total.is_some() {
let h = headers.unwrap();
let ht = headers_total.unwrap();
let percent = h * 100 / ht;
Cow::Owned(format!(
"Sync step 3/7: Preparing for validation (kernel history) - {}/{} - {}%",
h, ht, percent
))
} else if kernel_pos.is_some() && kernel_pos_total.is_some() {
let k = kernel_pos.unwrap();
let kt = kernel_pos_total.unwrap();
let percent = k * 100 / kt;
Cow::Owned(format!(
"Sync step 3/7: Preparing for validation (kernel position) - {}/{} - {}%",
k, kt, percent
))
} else {
Cow::Borrowed("Sync step 3/7: Preparing chain state for validation")
}
}
SyncStatus::TxHashsetRangeProofsValidation {
rproofs,

View file

@ -196,6 +196,11 @@ impl LeafSet {
self.bitmap.cardinality() as usize
}
/// Number of positions up to index n in the leaf set
pub fn n_unpruned_leaves_to_index(&self, to_index: u64) -> u64 {
self.bitmap.range_cardinality(0..to_index)
}
/// Is the leaf_set empty.
pub fn is_empty(&self) -> bool {
self.len() == 0

View file

@ -180,6 +180,14 @@ impl<T: PMMRable> Backend<T> for PMMRBackend<T> {
}
}
fn n_unpruned_leaves_to_index(&self, to_index: u64) -> u64 {
if self.prunable {
self.leaf_set.n_unpruned_leaves_to_index(to_index)
} else {
pmmr::n_leaves(pmmr::insertion_to_pmmr_index(to_index))
}
}
/// Returns an iterator over all the leaf insertion indices (0-indexed).
/// If our pos are [1,2,4,5,8] (first 5 leaf pos) then our insertion indices are [0,1,2,3,4]
fn leaf_idx_iter(&self, from_idx: u64) -> Box<dyn Iterator<Item = u64> + '_> {