[PIBD_IMPL] PIBD Desegmenter State (#3688)

* add functions to desegmenter to report next desired segments, begin to add state to determine which segments have been requested

* add segmentidentifier type to id requested segments uniquely

* make a call on where to keep track of which PIBD segments have been requested

* move segmenttype definition, add functions to manipulate peer segment list

* remove desegmenter state enum

* change chain desegmenter function to provide rwlock

* trace, warning cleanup

* udpate to test compliation
This commit is contained in:
Yeastplume 2022-01-20 09:57:21 +00:00 committed by GitHub
parent 009a02eec1
commit 41a86b4cd9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 233 additions and 60 deletions

View file

@ -867,11 +867,14 @@ impl Chain {
/// instantiate desegmenter (in same lazy fashion as segmenter, though this should not be as
/// expensive an operation)
pub fn desegmenter(&self, archive_header: &BlockHeader) -> Result<Desegmenter, Error> {
pub fn desegmenter(
&self,
archive_header: &BlockHeader,
) -> Result<Arc<RwLock<Option<Desegmenter>>>, Error> {
// Use our cached desegmenter if we have one and the associated header matches.
if let Some(d) = self.pibd_desegmenter.read().as_ref() {
if let Some(d) = self.pibd_desegmenter.write().as_ref() {
if d.header() == archive_header {
return Ok(d.clone());
return Ok(self.pibd_desegmenter.clone());
}
}
// If no desegmenter or headers don't match init
@ -881,7 +884,7 @@ impl Chain {
let mut cache = self.pibd_desegmenter.write();
*cache = Some(desegmenter.clone());
return Ok(desegmenter);
Ok(self.pibd_desegmenter.clone())
}
/// initialize a desegmenter, which is capable of extending the hashset by appending

View file

@ -18,8 +18,11 @@
use std::sync::Arc;
use crate::core::core::hash::Hash;
use crate::core::core::pmmr;
use crate::core::core::{BlockHeader, OutputIdentifier, Segment, TxKernel};
use crate::core::core::{pmmr, pmmr::ReadablePMMR};
use crate::core::core::{
BlockHeader, OutputIdentifier, Segment, SegmentIdentifier, SegmentType, SegmentTypeIdentifier,
TxKernel,
};
use crate::error::Error;
use crate::txhashset::{BitmapAccumulator, BitmapChunk, TxHashSet};
use crate::util::secp::pedersen::RangeProof;
@ -30,10 +33,6 @@ use crate::txhashset;
use croaring::Bitmap;
/// States that the desegmenter can be in, to keep track of what
/// parts are needed next in the proces
pub enum DesegmenterState {}
/// Desegmenter for rebuilding a txhashset from PIBD segments
#[derive(Clone)]
pub struct Desegmenter {
@ -42,8 +41,10 @@ pub struct Desegmenter {
archive_header: BlockHeader,
store: Arc<store::ChainStore>,
default_segment_height: u8,
bitmap_accumulator: BitmapAccumulator,
_bitmap_segments: Vec<Segment<BitmapChunk>>,
bitmap_segments: Vec<Segment<BitmapChunk>>,
_output_segments: Vec<Segment<OutputIdentifier>>,
_rangeproof_segments: Vec<Segment<RangeProof>>,
_kernel_segments: Vec<Segment<TxKernel>>,
@ -62,13 +63,15 @@ impl Desegmenter {
archive_header: BlockHeader,
store: Arc<store::ChainStore>,
) -> Desegmenter {
trace!("Creating new desegmenter");
let mut retval = Desegmenter {
txhashset,
header_pmmr,
archive_header,
store,
bitmap_accumulator: BitmapAccumulator::new(),
_bitmap_segments: vec![],
default_segment_height: 9,
bitmap_segments: vec![],
_output_segments: vec![],
_rangeproof_segments: vec![],
_kernel_segments: vec![],
@ -91,6 +94,42 @@ impl Desegmenter {
self.bitmap_mmr_size
}
/// Return list of the next preferred segments the desegmenter needs based on
/// the current real state of the underlying elements
pub fn next_desired_segments(&self, max_elements: usize) -> Vec<SegmentTypeIdentifier> {
let mut return_vec = vec![];
// First check for required bitmap elements
if self.bitmap_cache.is_none() {
trace!("Desegmenter needs bitmap segments");
// Get current size of bitmap MMR
let local_pmmr_size = self.bitmap_accumulator.readonly_pmmr().unpruned_size();
trace!("Local Bitmap PMMR Size is: {}", local_pmmr_size);
// Get iterator over expected bitmap elements
let mut identifier_iter = SegmentIdentifier::traversal_iter(
self.bitmap_mmr_size,
self.default_segment_height,
);
trace!("Expected bitmap MMR size is: {}", self.bitmap_mmr_size);
// Advance iterator to next expected segment
while let Some(id) = identifier_iter.next() {
trace!(
"ID segment pos range: {:?}",
id.segment_pos_range(self.bitmap_mmr_size)
);
if id.segment_pos_range(self.bitmap_mmr_size).1 > local_pmmr_size {
if !self.has_bitmap_segment_with_id(id) {
return_vec.push(SegmentTypeIdentifier::new(SegmentType::Bitmap, id));
if return_vec.len() >= max_elements {
return return_vec;
}
}
}
}
}
return_vec
}
/// 'Finalize' the bitmap accumulator, storing an in-memory copy of the bitmap for
/// use in further validation and setting the accumulator on the underlying txhashset
/// TODO: Could be called automatically when we have the calculated number of
@ -151,6 +190,26 @@ impl Desegmenter {
);
}
/// Cache a bitmap segment if we don't already have it
fn cache_bitmap_segment(&mut self, in_seg: Segment<BitmapChunk>) {
if self
.bitmap_segments
.iter()
.find(|i| i.identifier() == in_seg.identifier())
.is_none()
{
self.bitmap_segments.push(in_seg);
}
}
/// Whether our list already contains this bitmap segment
fn has_bitmap_segment_with_id(&self, seg_id: SegmentIdentifier) -> bool {
self.bitmap_segments
.iter()
.find(|i| i.identifier() == seg_id)
.is_some()
}
/// Adds and validates a bitmap chunk
/// TODO: Still experimenting, this expects chunks received to be in order
pub fn add_bitmap_segment(
@ -167,11 +226,15 @@ impl Desegmenter {
output_root_hash, // Other root
true,
)?;
debug!("pibd_desegmenter: adding segment to cache");
// All okay, add to our cached list of bitmap segments
self.cache_bitmap_segment(segment);
// All okay, add leaves to bitmap accumulator
let (_sid, _hash_pos, _hashes, _leaf_pos, leaf_data, _proof) = segment.parts();
/*let (_sid, _hash_pos, _hashes, _leaf_pos, leaf_data, _proof) = segment.parts();
for chunk in leaf_data.into_iter() {
self.bitmap_accumulator.append_chunk(chunk)?;
}
}*/
Ok(())
}

View file

@ -17,7 +17,7 @@
use chrono::prelude::{DateTime, Utc};
use crate::core::core::hash::{Hash, Hashed, ZERO_HASH};
use crate::core::core::{Block, BlockHeader, HeaderVersion};
use crate::core::core::{Block, BlockHeader, HeaderVersion, SegmentTypeIdentifier};
use crate::core::pow::Difficulty;
use crate::core::ser::{self, PMMRIndexHashable, Readable, Reader, Writeable, Writer};
use crate::error::{Error, ErrorKind};
@ -131,6 +131,14 @@ impl Default for TxHashsetDownloadStats {
pub struct SyncState {
current: RwLock<SyncStatus>,
sync_error: RwLock<Option<Error>>,
/// Something has to keep track of segments that have been
/// requested from other peers. TODO consider: This may not
/// be the best place to put code that's concerned with peers
/// but it's currently the only place that makes the info
/// available where it will be needed (both in the adapter
/// and the sync loop)
/// TODO: Better struct for this, perhaps hash identifiers
requested_pibd_segments: RwLock<Vec<SegmentTypeIdentifier>>,
}
impl SyncState {
@ -139,6 +147,7 @@ impl SyncState {
SyncState {
current: RwLock::new(SyncStatus::Initial),
sync_error: RwLock::new(None),
requested_pibd_segments: RwLock::new(vec![]),
}
}
@ -208,6 +217,21 @@ impl SyncState {
*self.current.write() = SyncStatus::TxHashsetDownload(stats);
}
/// Update PIBD segment list
pub fn add_pibd_segment(&self, id: &SegmentTypeIdentifier) {
self.requested_pibd_segments.write().push(id.clone());
}
/// Remove segment from list
pub fn remove_pibd_segment(&self, id: &SegmentTypeIdentifier) {
self.requested_pibd_segments.write().retain(|i| i != id);
}
/// Check whether segment is in request list
pub fn contains_pibd_segment(&self, id: &SegmentTypeIdentifier) -> bool {
self.requested_pibd_segments.read().contains(id)
}
/// Communicate sync error
pub fn set_sync_error(&self, error: Error) {
*self.sync_error.write() = Some(error);

View file

@ -115,7 +115,9 @@ fn test_pibd_copy_impl(is_test_chain: bool, src_root_dir: &str, dest_root_dir: &
// This is going to use the same block as horizon_header
let segmenter = src_chain.segmenter().unwrap();
// Init desegmenter
let mut desegmenter = dest_chain.desegmenter(&horizon_header).unwrap();
let desegmenter_lock = dest_chain.desegmenter(&horizon_header).unwrap();
let mut desegmenter_write = desegmenter_lock.write();
let desegmenter = desegmenter_write.as_mut().unwrap();
// And total size of the bitmap PMMR
let bitmap_mmr_size = desegmenter.expected_bitmap_mmr_size();

View file

@ -21,6 +21,39 @@ use croaring::Bitmap;
use std::cmp::min;
use std::fmt::{self, Debug};
#[derive(Clone, Debug, Eq, PartialEq)]
/// Possible segment types, according to this desegmenter
pub enum SegmentType {
/// Output Bitmap
Bitmap,
/// Output
Output,
/// RangeProof
RangeProof,
/// Kernel
Kernel,
}
/// Lumps possible types with segment ids to enable a unique identifier
/// for a segment with respect to a particular archive header
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct SegmentTypeIdentifier {
/// The type of this segment
pub segment_type: SegmentType,
/// The identfier itself
pub identifier: SegmentIdentifier,
}
impl SegmentTypeIdentifier {
/// Create
pub fn new(segment_type: SegmentType, identifier: SegmentIdentifier) -> Self {
Self {
segment_type,
identifier,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
/// Error related to segment creation or validation
pub enum SegmentError {
@ -90,6 +123,43 @@ impl SegmentIdentifier {
let d = 1 << segment_height;
((pmmr::n_leaves(target_mmr_size) + d - 1) / d) as usize
}
/// Maximum number of leaves in a segment, given by `2**height`
fn segment_capacity(&self) -> u64 {
1 << self.height
}
/// Offset (in leaf idx) of first leaf in the segment
fn leaf_offset(&self) -> u64 {
self.idx * self.segment_capacity()
}
// Number of leaves in this segment. Equal to capacity except for the final segment, which can be smaller
fn segment_unpruned_size(&self, mmr_size: u64) -> u64 {
min(
self.segment_capacity(),
pmmr::n_leaves(mmr_size).saturating_sub(self.leaf_offset()),
)
}
/// Inclusive (full) range of MMR positions for the segment that would be produced
/// by this Identifier
pub fn segment_pos_range(&self, mmr_size: u64) -> (u64, u64) {
let segment_size = self.segment_unpruned_size(mmr_size);
let leaf_offset = self.leaf_offset();
let first = pmmr::insertion_to_pmmr_index(leaf_offset);
let last = if self.full_segment(mmr_size) {
pmmr::insertion_to_pmmr_index(leaf_offset + segment_size - 1) + (self.height as u64)
} else {
mmr_size - 1
};
(first, last)
}
/// Whether the segment is full (segment size == capacity)
fn full_segment(&self, mmr_size: u64) -> bool {
self.segment_unpruned_size(mmr_size) == self.segment_capacity()
}
}
/// Segment of a PMMR: unpruned leaves and the necessary data to verify
@ -119,39 +189,27 @@ impl<T> Segment<T> {
/// Maximum number of leaves in a segment, given by `2**height`
fn segment_capacity(&self) -> u64 {
1 << self.identifier.height
self.identifier.segment_capacity()
}
/// Offset (in leaf idx) of first leaf in the segment
fn leaf_offset(&self) -> u64 {
self.identifier.idx * self.segment_capacity()
self.identifier.leaf_offset()
}
// Number of leaves in this segment. Equal to capacity except for the final segment, which can be smaller
fn segment_unpruned_size(&self, mmr_size: u64) -> u64 {
min(
self.segment_capacity(),
pmmr::n_leaves(mmr_size).saturating_sub(self.leaf_offset()),
)
self.identifier.segment_unpruned_size(mmr_size)
}
/// Whether the segment is full (segment size == capacity)
fn full_segment(&self, mmr_size: u64) -> bool {
self.segment_unpruned_size(mmr_size) == self.segment_capacity()
self.identifier.full_segment(mmr_size)
}
/// Inclusive range of MMR positions for this segment
pub fn segment_pos_range(&self, mmr_size: u64) -> (u64, u64) {
let segment_size = self.segment_unpruned_size(mmr_size);
let leaf_offset = self.leaf_offset();
let first = pmmr::insertion_to_pmmr_index(leaf_offset);
let last = if self.full_segment(mmr_size) {
pmmr::insertion_to_pmmr_index(leaf_offset + segment_size - 1)
+ (self.identifier.height as u64)
} else {
mmr_size - 1
};
(first, last)
self.identifier.segment_pos_range(mmr_size)
}
/// TODO - binary_search_by_key() here (can we assume these are sorted by pos?)

View file

@ -32,7 +32,7 @@ use crate::core::core::hash::{Hash, Hashed};
use crate::core::core::transaction::Transaction;
use crate::core::core::{
BlockHeader, BlockSums, CompactBlock, Inputs, OutputIdentifier, Segment, SegmentIdentifier,
TxKernel,
SegmentType, SegmentTypeIdentifier, TxKernel,
};
use crate::core::pow::Difficulty;
use crate::core::ser::ProtocolVersion;
@ -572,14 +572,31 @@ where
segment: Segment<BitmapChunk>,
) -> Result<bool, chain::Error> {
debug!(
"RECEIVED BITMAP SEGMENT FOR block_hash: {}, output_root: {}",
block_hash, output_root
"Received bitmap segment {} for block_hash: {}, output_root: {}",
segment.identifier().idx,
block_hash,
output_root
);
// Remove segment from outgoing list TODO: Where is the best place to
// do this?
self.sync_state.remove_pibd_segment(&SegmentTypeIdentifier {
segment_type: SegmentType::Bitmap,
identifier: segment.identifier(),
});
// TODO: Entire process needs to be restarted if the horizon block
// has changed (perhaps not here, NB this has to go somewhere)
let archive_header = self.chain().txhashset_archive_header_header_only()?;
let mut desegmenter = self.chain().desegmenter(&archive_header)?;
desegmenter.add_bitmap_segment(segment, output_root)?;
let identifier = segment.identifier().clone();
if let Some(d) = self.chain().desegmenter(&archive_header)?.write().as_mut() {
let res = d.add_bitmap_segment(segment, output_root);
if let Err(e) = res {
debug!(
"Validation of incoming bitmap segment failed: {:?}, reason: {}",
identifier, e
);
return Err(e);
}
}
Ok(true)
}
@ -591,7 +608,9 @@ where
) -> Result<bool, chain::Error> {
let archive_header = self.chain().txhashset_archive_header_header_only()?;
let desegmenter = self.chain().desegmenter(&archive_header)?;
desegmenter.add_output_segment(segment, Some(bitmap_root))?;
if let Some(d) = desegmenter.write().as_ref() {
d.add_output_segment(segment, Some(bitmap_root))?;
}
Ok(true)
}
@ -602,7 +621,9 @@ where
) -> Result<bool, chain::Error> {
let archive_header = self.chain().txhashset_archive_header_header_only()?;
let desegmenter = self.chain().desegmenter(&archive_header)?;
desegmenter.add_rangeproof_segment(segment)?;
if let Some(d) = desegmenter.write().as_ref() {
d.add_rangeproof_segment(segment)?;
}
Ok(true)
}
@ -613,7 +634,9 @@ where
) -> Result<bool, chain::Error> {
let archive_header = self.chain().txhashset_archive_header_header_only()?;
let desegmenter = self.chain().desegmenter(&archive_header)?;
desegmenter.add_kernel_segment(segment)?;
if let Some(d) = desegmenter.write().as_ref() {
d.add_kernel_segment(segment)?;
}
Ok(true)
}
}

View file

@ -35,8 +35,6 @@ pub struct StateSync {
prev_state_sync: Option<DateTime<Utc>>,
state_sync_peer: Option<Arc<Peer>>,
sent_test_pibd_message: bool,
}
impl StateSync {
@ -51,7 +49,6 @@ impl StateSync {
chain,
prev_state_sync: None,
state_sync_peer: None,
sent_test_pibd_message: false,
}
}
@ -172,19 +169,24 @@ impl StateSync {
fn continue_pibd(&mut self) {
// Check the state of our chain to figure out what we should be requesting next
// TODO: Just faking a single request for testing
if !self.sent_test_pibd_message {
debug!("Sending test PIBD message");
let archive_header = self.chain.txhashset_archive_header_header_only().unwrap();
let archive_header = self.chain.txhashset_archive_header_header_only().unwrap();
let desegmenter = self.chain.desegmenter(&archive_header).unwrap();
let target_segment_height = 11;
//let archive_header = self.chain.txhashset_archive_header().unwrap();
let desegmenter = self.chain.desegmenter(&archive_header).unwrap();
let bitmap_mmr_size = desegmenter.expected_bitmap_mmr_size();
let mut identifier_iter =
SegmentIdentifier::traversal_iter(bitmap_mmr_size, target_segment_height);
// TODO and consider: number here depends on how many simultaneous
// requests we want to send to peers
let mut next_segment_ids = vec![];
if let Some(d) = desegmenter.read().as_ref() {
next_segment_ids = d.next_desired_segments(10);
}
// For each segment, pick a desirable peer and send message
// (Provided we're not waiting for a response for this message from someone else)
for seg_id in next_segment_ids.iter() {
if self.sync_state.contains_pibd_segment(seg_id) {
trace!("Request list contains, continuing: {:?}", seg_id);
continue;
}
self.sent_test_pibd_message = true;
let peers_iter = || {
self.peers
.iter()
@ -202,11 +204,9 @@ impl StateSync {
});
debug!("Chosen peer is {:?}", peer);
if let Some(p) = peer {
p.send_bitmap_segment_request(
archive_header.hash(),
identifier_iter.next().unwrap(),
)
.unwrap();
p.send_bitmap_segment_request(archive_header.hash(), seg_id.identifier.clone())
.unwrap();
self.sync_state.add_pibd_segment(seg_id);
}
}
}