[PIBD_IMPL] Bitmap accumulator reconstruction + TxHashset set reconstruction (#3689)

* application of received bitmap segments to local accumulator

* add all required elements to send/receive output segment requests and responses

* testing of output sync

* add special cases to pmmr segment request
This commit is contained in:
Yeastplume 2022-01-25 10:38:13 +00:00 committed by GitHub
parent 41a86b4cd9
commit 436bacf17e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 376 additions and 66 deletions

View file

@ -41,12 +41,13 @@ pub struct Desegmenter {
archive_header: BlockHeader,
store: Arc<store::ChainStore>,
default_segment_height: u8,
default_bitmap_segment_height: u8,
default_output_segment_height: u8,
bitmap_accumulator: BitmapAccumulator,
bitmap_segments: Vec<Segment<BitmapChunk>>,
_output_segments: Vec<Segment<OutputIdentifier>>,
_rangeproof_segments: Vec<Segment<RangeProof>>,
bitmap_segment_cache: Vec<Segment<BitmapChunk>>,
output_segment_cache: Vec<Segment<OutputIdentifier>>,
_rangeproof_segment_cache: Vec<Segment<RangeProof>>,
_kernel_segments: Vec<Segment<TxKernel>>,
bitmap_mmr_leaf_count: u64,
@ -70,10 +71,11 @@ impl Desegmenter {
archive_header,
store,
bitmap_accumulator: BitmapAccumulator::new(),
default_segment_height: 9,
bitmap_segments: vec![],
_output_segments: vec![],
_rangeproof_segments: vec![],
default_bitmap_segment_height: 9,
default_output_segment_height: 11,
bitmap_segment_cache: vec![],
output_segment_cache: vec![],
_rangeproof_segment_cache: vec![],
_kernel_segments: vec![],
bitmap_mmr_leaf_count: 0,
@ -94,29 +96,58 @@ impl Desegmenter {
self.bitmap_mmr_size
}
/// Apply next set of segments that are ready to be appended to their respective trees,
/// and kick off any validations that can happen. TODO: figure out where and how
/// this should be called considering any thread blocking implications
pub fn apply_next_segments(&mut self) -> Result<(), Error> {
let next_bmp_idx = self.next_required_bitmap_segment_index();
if let Some(bmp_idx) = next_bmp_idx {
if let Some((idx, _seg)) = self
.bitmap_segment_cache
.iter()
.enumerate()
.find(|s| s.1.identifier().idx == bmp_idx)
{
self.apply_bitmap_segment(idx)?;
}
} else {
// Check if we need to finalize bitmap
if self.bitmap_cache == None {
// Should have all the pieces now, finalize the bitmap cache
self.finalize_bitmap()?;
}
// Check if we can apply the next output segment
if let Some(next_output_idx) = self.next_required_output_segment_index() {
debug!("Next output index to apply: {}", next_output_idx);
if let Some((idx, _seg)) = self
.output_segment_cache
.iter()
.enumerate()
.find(|s| s.1.identifier().idx == next_output_idx)
{
self.apply_output_segment(idx)?;
}
}
// TODO: Ditto RP, kernel
}
Ok(())
}
/// Return list of the next preferred segments the desegmenter needs based on
/// the current real state of the underlying elements
pub fn next_desired_segments(&self, max_elements: usize) -> Vec<SegmentTypeIdentifier> {
let mut return_vec = vec![];
// First check for required bitmap elements
if self.bitmap_cache.is_none() {
trace!("Desegmenter needs bitmap segments");
// Get current size of bitmap MMR
let local_pmmr_size = self.bitmap_accumulator.readonly_pmmr().unpruned_size();
trace!("Local Bitmap PMMR Size is: {}", local_pmmr_size);
// Get iterator over expected bitmap elements
let mut identifier_iter = SegmentIdentifier::traversal_iter(
self.bitmap_mmr_size,
self.default_segment_height,
self.default_bitmap_segment_height,
);
trace!("Expected bitmap MMR size is: {}", self.bitmap_mmr_size);
// Advance iterator to next expected segment
while let Some(id) = identifier_iter.next() {
trace!(
"ID segment pos range: {:?}",
id.segment_pos_range(self.bitmap_mmr_size)
);
if id.segment_pos_range(self.bitmap_mmr_size).1 > local_pmmr_size {
if !self.has_bitmap_segment_with_id(id) {
return_vec.push(SegmentTypeIdentifier::new(SegmentType::Bitmap, id));
@ -126,6 +157,47 @@ impl Desegmenter {
}
}
}
} else {
// We have all required bitmap segments and have recreated our local
// bitmap, now continue with other segments
// TODO: Outputs only for now, just for testing. we'll want to evenly spread
// requests among the 3 PMMRs
let local_output_mmr_size;
let mut _local_kernel_mmr_size;
let mut _local_rangeproof_mmr_size;
{
let txhashset = self.txhashset.read();
local_output_mmr_size = txhashset.output_mmr_size();
_local_kernel_mmr_size = txhashset.kernel_mmr_size();
_local_rangeproof_mmr_size = txhashset.rangeproof_mmr_size();
}
// TODO: Fix, alternative approach, this is very inefficient
let mut output_identifier_iter = SegmentIdentifier::traversal_iter(
self.archive_header.output_mmr_size,
self.default_output_segment_height,
);
debug!("local output mmr size is: {}", local_output_mmr_size);
while return_vec.len() < max_elements {
// Next segment from output PMMR
if let Some(id) = output_identifier_iter.next() {
if id.segment_pos_range(self.archive_header.output_mmr_size).1
> local_output_mmr_size
{
if !self.has_output_segment_with_id(id) {
return_vec.push(SegmentTypeIdentifier::new(SegmentType::Output, id));
if return_vec.len() >= max_elements {
break;
}
}
}
}
// TODO: likewise next segments from kernel and rangeproof pmmrs
// No more segments required
if return_vec.is_empty() {
break;
}
}
}
return_vec
}
@ -138,7 +210,7 @@ impl Desegmenter {
/// being shut down and restarted
pub fn finalize_bitmap(&mut self) -> Result<(), Error> {
debug!(
"pibd_desgmenter: caching bitmap - accumulator root: {}",
"pibd_desgmenter: finalizing and caching bitmap - accumulator root: {}",
self.bitmap_accumulator.root()
);
self.bitmap_cache = Some(self.bitmap_accumulator.as_bitmap()?);
@ -153,7 +225,6 @@ impl Desegmenter {
&mut batch,
|ext, _batch| {
let extension = &mut ext.extension;
// TODO: Unwrap
extension.set_bitmap_accumulator(self.bitmap_accumulator.clone());
Ok(())
},
@ -193,23 +264,41 @@ impl Desegmenter {
/// Cache a bitmap segment if we don't already have it
fn cache_bitmap_segment(&mut self, in_seg: Segment<BitmapChunk>) {
if self
.bitmap_segments
.bitmap_segment_cache
.iter()
.find(|i| i.identifier() == in_seg.identifier())
.is_none()
{
self.bitmap_segments.push(in_seg);
self.bitmap_segment_cache.push(in_seg);
}
}
/// Whether our list already contains this bitmap segment
fn has_bitmap_segment_with_id(&self, seg_id: SegmentIdentifier) -> bool {
self.bitmap_segments
self.bitmap_segment_cache
.iter()
.find(|i| i.identifier() == seg_id)
.is_some()
}
/// Return an identifier for the next segment we need for the bitmap pmmr
fn next_required_bitmap_segment_index(&self) -> Option<u64> {
let local_bitmap_pmmr_size = self.bitmap_accumulator.readonly_pmmr().unpruned_size();
let cur_segment_count = SegmentIdentifier::count_segments_required(
local_bitmap_pmmr_size,
self.default_bitmap_segment_height,
);
let total_segment_count = SegmentIdentifier::count_segments_required(
self.bitmap_mmr_size,
self.default_bitmap_segment_height,
);
if cur_segment_count == total_segment_count {
None
} else {
Some(cur_segment_count as u64)
}
}
/// Adds and validates a bitmap chunk
/// TODO: Still experimenting, this expects chunks received to be in order
pub fn add_bitmap_segment(
@ -229,32 +318,51 @@ impl Desegmenter {
debug!("pibd_desegmenter: adding segment to cache");
// All okay, add to our cached list of bitmap segments
self.cache_bitmap_segment(segment);
// All okay, add leaves to bitmap accumulator
/*let (_sid, _hash_pos, _hashes, _leaf_pos, leaf_data, _proof) = segment.parts();
for chunk in leaf_data.into_iter() {
self.bitmap_accumulator.append_chunk(chunk)?;
}*/
Ok(())
}
/// Adds a output segment
/// TODO: Still experimenting, expects chunks received to be in order
pub fn add_output_segment(
&self,
segment: Segment<OutputIdentifier>,
_bitmap_root: Option<Hash>,
) -> Result<(), Error> {
debug!("pibd_desegmenter: add output segment");
// TODO: check bitmap root matches what we already have
segment.validate_with(
self.archive_header.output_mmr_size, // Last MMR pos at the height being validated
self.bitmap_cache.as_ref(),
self.archive_header.output_root, // Output root we're checking for
self.archive_header.output_mmr_size,
self.bitmap_accumulator.root(), // Other root
false,
)?;
/// Apply a bitmap segment at the index cache
pub fn apply_bitmap_segment(&mut self, idx: usize) -> Result<(), Error> {
let segment = self.bitmap_segment_cache.remove(idx);
debug!(
"pibd_desegmenter: apply bitmap segment at segment idx {}",
segment.identifier().idx
);
// Add leaves to bitmap accumulator
let (_sid, _hash_pos, _hashes, _leaf_pos, leaf_data, _proof) = segment.parts();
for chunk in leaf_data.into_iter() {
self.bitmap_accumulator.append_chunk(chunk)?;
}
Ok(())
}
/// Whether our list already contains this bitmap segment
fn has_output_segment_with_id(&self, seg_id: SegmentIdentifier) -> bool {
self.output_segment_cache
.iter()
.find(|i| i.identifier() == seg_id)
.is_some()
}
/// Cache an output segment if we don't already have it
fn cache_output_segment(&mut self, in_seg: Segment<OutputIdentifier>) {
if self
.output_segment_cache
.iter()
.find(|i| i.identifier() == in_seg.identifier())
.is_none()
{
self.output_segment_cache.push(in_seg);
}
}
/// Apply an output segment at the index cache
pub fn apply_output_segment(&mut self, idx: usize) -> Result<(), Error> {
let segment = self.output_segment_cache.remove(idx);
debug!(
"pibd_desegmenter: applying output segment at segment idx {}",
segment.identifier().idx
);
let mut header_pmmr = self.header_pmmr.write();
let mut txhashset = self.txhashset.write();
let mut batch = self.store.batch()?;
@ -271,6 +379,66 @@ impl Desegmenter {
Ok(())
}
/// Return an identifier for the next segment we need for the output pmmr
fn next_required_output_segment_index(&self) -> Option<u64> {
let local_output_mmr_size;
{
let txhashset = self.txhashset.read();
local_output_mmr_size = txhashset.output_mmr_size();
}
// Special case here. If the mmr size is 1, this is a fresh chain
// with naught but a humble genesis block. We need segment 0, (and
// also need to skip the genesis block when applying the segment)
let cur_segment_count = if local_output_mmr_size == 1 {
0
} else {
SegmentIdentifier::count_segments_required(
local_output_mmr_size,
self.default_output_segment_height,
)
};
let total_segment_count = SegmentIdentifier::count_segments_required(
self.archive_header.output_mmr_size,
self.default_output_segment_height,
);
debug!(
"Next required output segment is {} of {}",
cur_segment_count, total_segment_count
);
if cur_segment_count == total_segment_count {
None
} else {
Some(cur_segment_count as u64)
}
}
/// Adds a output segment
pub fn add_output_segment(
&mut self,
segment: Segment<OutputIdentifier>,
bitmap_root: Option<Hash>,
) -> Result<(), Error> {
debug!("pibd_desegmenter: add output segment");
// TODO: This, something very wrong, probably need to reset entire body sync
// check bitmap root matches what we already have
/*if bitmap_root != Some(self.bitmap_accumulator.root()) {
}*/
segment.validate_with(
self.archive_header.output_mmr_size, // Last MMR pos at the height being validated
self.bitmap_cache.as_ref(),
self.archive_header.output_root, // Output root we're checking for
self.archive_header.output_mmr_size,
self.bitmap_accumulator.root(), // Other root
false,
)?;
self.cache_output_segment(segment);
Ok(())
}
/// Adds a Rangeproof segment
/// TODO: Still experimenting, expects chunks received to be in order
pub fn add_rangeproof_segment(&self, segment: Segment<RangeProof>) -> Result<(), Error> {

View file

@ -347,11 +347,6 @@ impl TxHashSet {
.elements_from_pmmr_index(start_index, max_count, max_index)
}
/// number of outputs
pub fn output_mmr_size(&self) -> u64 {
self.output_pmmr_h.size
}
/// As above, for rangeproofs
pub fn rangeproofs_by_pmmr_index(
&self,
@ -363,6 +358,21 @@ impl TxHashSet {
.elements_from_pmmr_index(start_index, max_count, max_index)
}
/// size of output mmr
pub fn output_mmr_size(&self) -> u64 {
self.output_pmmr_h.size
}
/// size of kernel mmr
pub fn kernel_mmr_size(&self) -> u64 {
self.kernel_pmmr_h.size
}
/// size of rangeproof mmr (can differ from output mmr size during PIBD sync)
pub fn rangeproof_mmr_size(&self) -> u64 {
self.rproof_pmmr_h.size
}
/// Find a kernel with a given excess. Work backwards from `max_index` to `min_index`
/// NOTE: this linear search over all kernel history can be VERY expensive
/// public API access to this method should be limited
@ -1248,8 +1258,13 @@ impl<'a> Extension<'a> {
&mut self,
segment: Segment<OutputIdentifier>,
) -> Result<(), Error> {
let (_sid, _hash_pos, _hashes, _leaf_pos, leaf_data, _proof) = segment.parts();
for output_identifier in leaf_data {
let (sid, _hash_pos, _hashes, _leaf_pos, leaf_data, _proof) = segment.parts();
for (index, output_identifier) in leaf_data.iter().enumerate() {
// Special case, if this is segment 0, skip the genesis block which should
// already be applied
if sid.idx == 0 && index == 0 {
continue;
}
self.output_pmmr
.push(&output_identifier)
.map_err(&ErrorKind::TxHashSetErr)?;

View file

@ -387,6 +387,48 @@ impl Peer {
)
}
pub fn send_output_segment_request(
&self,
h: Hash,
identifier: SegmentIdentifier,
) -> Result<(), Error> {
self.send(
&SegmentRequest {
block_hash: h,
identifier,
},
msg::Type::GetOutputSegment,
)
}
pub fn send_rangeproof_segment_request(
&self,
h: Hash,
identifier: SegmentIdentifier,
) -> Result<(), Error> {
self.send(
&SegmentRequest {
block_hash: h,
identifier,
},
msg::Type::GetRangeProofSegment,
)
}
pub fn send_kernel_segment_request(
&self,
h: Hash,
identifier: SegmentIdentifier,
) -> Result<(), Error> {
self.send(
&SegmentRequest {
block_hash: h,
identifier,
},
msg::Type::GetKernelSegment,
)
}
/// Stops the peer
pub fn stop(&self) {
debug!("Stopping peer {:?}", self.info.addr);

View file

@ -384,10 +384,40 @@ impl MessageHandler for Protocol {
adapter.receive_bitmap_segment(block_hash, output_root, segment.into())?;
Consumed::None
}
Message::OutputSegment(_)
| Message::RangeProofSegment(_)
| Message::KernelSegment(_) => Consumed::None,
Message::OutputSegment(req) => {
let OutputSegmentResponse {
response,
output_bitmap_root,
} = req;
debug!(
"Received Output Segment: bh, bitmap_root: {}, {}",
response.block_hash, output_bitmap_root
);
adapter.receive_output_segment(
response.block_hash,
output_bitmap_root,
response.segment.into(),
)?;
Consumed::None
}
Message::RangeProofSegment(req) => {
let SegmentResponse {
block_hash,
segment,
} = req;
debug!("Received Rangeproof Segment: bh: {}", block_hash);
adapter.receive_rangeproof_segment(block_hash, segment.into())?;
Consumed::None
}
Message::KernelSegment(req) => {
let SegmentResponse {
block_hash,
segment,
} = req;
debug!("Received Kernel Segment: bh: {}", block_hash);
adapter.receive_kernel_segment(block_hash, segment.into())?;
Consumed::None
}
Message::Unknown(_) => Consumed::None,
};
Ok(consumed)

View file

@ -602,14 +602,33 @@ where
fn receive_output_segment(
&self,
_block_hash: Hash,
block_hash: Hash,
bitmap_root: Hash,
segment: Segment<OutputIdentifier>,
) -> Result<bool, chain::Error> {
debug!(
"Received output segment {} for block_hash: {}, bitmap_root: {:?}",
segment.identifier().idx,
block_hash,
bitmap_root,
);
// Remove segment from outgoing list TODO: Where is the best place to
// do this?
self.sync_state.remove_pibd_segment(&SegmentTypeIdentifier {
segment_type: SegmentType::Output,
identifier: segment.identifier(),
});
let archive_header = self.chain().txhashset_archive_header_header_only()?;
let desegmenter = self.chain().desegmenter(&archive_header)?;
if let Some(d) = desegmenter.write().as_ref() {
d.add_output_segment(segment, Some(bitmap_root))?;
let identifier = segment.identifier().clone();
if let Some(d) = self.chain().desegmenter(&archive_header)?.write().as_mut() {
let res = d.add_output_segment(segment, Some(bitmap_root));
if let Err(e) = res {
debug!(
"Validation of incoming output segment failed: {:?}, reason: {}",
identifier, e
);
return Err(e);
}
}
Ok(true)
}

View file

@ -17,7 +17,7 @@ use chrono::Duration;
use std::sync::Arc;
use crate::chain::{self, SyncState, SyncStatus};
use crate::core::core::{hash::Hashed, pmmr::segment::SegmentIdentifier};
use crate::core::core::{hash::Hashed, pmmr::segment::SegmentType};
use crate::core::global;
use crate::core::pow::Difficulty;
use crate::p2p::{self, Capabilities, Peer};
@ -172,11 +172,22 @@ impl StateSync {
let archive_header = self.chain.txhashset_archive_header_header_only().unwrap();
let desegmenter = self.chain.desegmenter(&archive_header).unwrap();
// Apply segments... TODO: figure out how this should be called, might
// need to be a separate thread.
if let Some(mut de) = desegmenter.try_write() {
if let Some(d) = de.as_mut() {
d.apply_next_segments().unwrap();
}
}
// TODO and consider: number here depends on how many simultaneous
// requests we want to send to peers
let mut next_segment_ids = vec![];
if let Some(d) = desegmenter.read().as_ref() {
next_segment_ids = d.next_desired_segments(10);
if let Some(d) = desegmenter.write().as_mut() {
// Figure out the next segments we need
// (12 is divisible by 3, to try and evenly spread the requests among the 3
// main pmmrs. Bitmaps segments will always be requested first)
next_segment_ids = d.next_desired_segments(12);
}
// For each segment, pick a desirable peer and send message
@ -202,10 +213,35 @@ impl StateSync {
warn!("no suitable outbound peer for pibd message, considering inbound");
peers_iter().inbound().choose_random()
});
debug!("Chosen peer is {:?}", peer);
trace!("Chosen peer is {:?}", peer);
if let Some(p) = peer {
p.send_bitmap_segment_request(archive_header.hash(), seg_id.identifier.clone())
.unwrap();
match seg_id.segment_type {
SegmentType::Bitmap => p
.send_bitmap_segment_request(
archive_header.hash(),
seg_id.identifier.clone(),
)
.unwrap(),
SegmentType::Output => p
.send_output_segment_request(
archive_header.hash(),
seg_id.identifier.clone(),
)
.unwrap(),
SegmentType::RangeProof => p
.send_rangeproof_segment_request(
archive_header.hash(),
seg_id.identifier.clone(),
)
.unwrap(),
SegmentType::Kernel => p
.send_kernel_segment_request(
archive_header.hash(),
seg_id.identifier.clone(),
)
.unwrap(),
};
// add to list of segments that are being tracked
self.sync_state.add_pibd_segment(seg_id);
}
}