From 41a86b4cd99f332a2a2dfc54af88c1735952a070 Mon Sep 17 00:00:00 2001 From: Yeastplume Date: Thu, 20 Jan 2022 09:57:21 +0000 Subject: [PATCH] [PIBD_IMPL] PIBD Desegmenter State (#3688) * add functions to desegmenter to report next desired segments, begin to add state to determine which segments have been requested * add segmentidentifier type to id requested segments uniquely * make a call on where to keep track of which PIBD segments have been requested * move segmenttype definition, add functions to manipulate peer segment list * remove desegmenter state enum * change chain desegmenter function to provide rwlock * trace, warning cleanup * udpate to test compliation --- chain/src/chain.rs | 11 ++- chain/src/txhashset/desegmenter.rs | 83 ++++++++++++++-- chain/src/types.rs | 26 ++++- chain/tests/test_data/chain_raw/lmdb/lock.mdb | Bin 8192 -> 8192 bytes chain/tests/test_pibd_copy.rs | 4 +- core/src/core/pmmr/segment.rs | 92 ++++++++++++++---- servers/src/common/adapters.rs | 39 ++++++-- servers/src/grin/sync/state_sync.rs | 38 ++++---- 8 files changed, 233 insertions(+), 60 deletions(-) diff --git a/chain/src/chain.rs b/chain/src/chain.rs index 7f61f8c98..ef5c54f4a 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -867,11 +867,14 @@ impl Chain { /// instantiate desegmenter (in same lazy fashion as segmenter, though this should not be as /// expensive an operation) - pub fn desegmenter(&self, archive_header: &BlockHeader) -> Result { + pub fn desegmenter( + &self, + archive_header: &BlockHeader, + ) -> Result>>, Error> { // Use our cached desegmenter if we have one and the associated header matches. - if let Some(d) = self.pibd_desegmenter.read().as_ref() { + if let Some(d) = self.pibd_desegmenter.write().as_ref() { if d.header() == archive_header { - return Ok(d.clone()); + return Ok(self.pibd_desegmenter.clone()); } } // If no desegmenter or headers don't match init @@ -881,7 +884,7 @@ impl Chain { let mut cache = self.pibd_desegmenter.write(); *cache = Some(desegmenter.clone()); - return Ok(desegmenter); + Ok(self.pibd_desegmenter.clone()) } /// initialize a desegmenter, which is capable of extending the hashset by appending diff --git a/chain/src/txhashset/desegmenter.rs b/chain/src/txhashset/desegmenter.rs index ee3748cdb..56cb8c7d9 100644 --- a/chain/src/txhashset/desegmenter.rs +++ b/chain/src/txhashset/desegmenter.rs @@ -18,8 +18,11 @@ use std::sync::Arc; use crate::core::core::hash::Hash; -use crate::core::core::pmmr; -use crate::core::core::{BlockHeader, OutputIdentifier, Segment, TxKernel}; +use crate::core::core::{pmmr, pmmr::ReadablePMMR}; +use crate::core::core::{ + BlockHeader, OutputIdentifier, Segment, SegmentIdentifier, SegmentType, SegmentTypeIdentifier, + TxKernel, +}; use crate::error::Error; use crate::txhashset::{BitmapAccumulator, BitmapChunk, TxHashSet}; use crate::util::secp::pedersen::RangeProof; @@ -30,10 +33,6 @@ use crate::txhashset; use croaring::Bitmap; -/// States that the desegmenter can be in, to keep track of what -/// parts are needed next in the proces -pub enum DesegmenterState {} - /// Desegmenter for rebuilding a txhashset from PIBD segments #[derive(Clone)] pub struct Desegmenter { @@ -42,8 +41,10 @@ pub struct Desegmenter { archive_header: BlockHeader, store: Arc, + default_segment_height: u8, + bitmap_accumulator: BitmapAccumulator, - _bitmap_segments: Vec>, + bitmap_segments: Vec>, _output_segments: Vec>, _rangeproof_segments: Vec>, _kernel_segments: Vec>, @@ -62,13 +63,15 @@ impl Desegmenter { archive_header: BlockHeader, store: Arc, ) -> Desegmenter { + trace!("Creating new desegmenter"); let mut retval = Desegmenter { txhashset, header_pmmr, archive_header, store, bitmap_accumulator: BitmapAccumulator::new(), - _bitmap_segments: vec![], + default_segment_height: 9, + bitmap_segments: vec![], _output_segments: vec![], _rangeproof_segments: vec![], _kernel_segments: vec![], @@ -91,6 +94,42 @@ impl Desegmenter { self.bitmap_mmr_size } + /// Return list of the next preferred segments the desegmenter needs based on + /// the current real state of the underlying elements + pub fn next_desired_segments(&self, max_elements: usize) -> Vec { + let mut return_vec = vec![]; + + // First check for required bitmap elements + if self.bitmap_cache.is_none() { + trace!("Desegmenter needs bitmap segments"); + // Get current size of bitmap MMR + let local_pmmr_size = self.bitmap_accumulator.readonly_pmmr().unpruned_size(); + trace!("Local Bitmap PMMR Size is: {}", local_pmmr_size); + // Get iterator over expected bitmap elements + let mut identifier_iter = SegmentIdentifier::traversal_iter( + self.bitmap_mmr_size, + self.default_segment_height, + ); + trace!("Expected bitmap MMR size is: {}", self.bitmap_mmr_size); + // Advance iterator to next expected segment + while let Some(id) = identifier_iter.next() { + trace!( + "ID segment pos range: {:?}", + id.segment_pos_range(self.bitmap_mmr_size) + ); + if id.segment_pos_range(self.bitmap_mmr_size).1 > local_pmmr_size { + if !self.has_bitmap_segment_with_id(id) { + return_vec.push(SegmentTypeIdentifier::new(SegmentType::Bitmap, id)); + if return_vec.len() >= max_elements { + return return_vec; + } + } + } + } + } + return_vec + } + /// 'Finalize' the bitmap accumulator, storing an in-memory copy of the bitmap for /// use in further validation and setting the accumulator on the underlying txhashset /// TODO: Could be called automatically when we have the calculated number of @@ -151,6 +190,26 @@ impl Desegmenter { ); } + /// Cache a bitmap segment if we don't already have it + fn cache_bitmap_segment(&mut self, in_seg: Segment) { + if self + .bitmap_segments + .iter() + .find(|i| i.identifier() == in_seg.identifier()) + .is_none() + { + self.bitmap_segments.push(in_seg); + } + } + + /// Whether our list already contains this bitmap segment + fn has_bitmap_segment_with_id(&self, seg_id: SegmentIdentifier) -> bool { + self.bitmap_segments + .iter() + .find(|i| i.identifier() == seg_id) + .is_some() + } + /// Adds and validates a bitmap chunk /// TODO: Still experimenting, this expects chunks received to be in order pub fn add_bitmap_segment( @@ -167,11 +226,15 @@ impl Desegmenter { output_root_hash, // Other root true, )?; + debug!("pibd_desegmenter: adding segment to cache"); + // All okay, add to our cached list of bitmap segments + self.cache_bitmap_segment(segment); + // All okay, add leaves to bitmap accumulator - let (_sid, _hash_pos, _hashes, _leaf_pos, leaf_data, _proof) = segment.parts(); + /*let (_sid, _hash_pos, _hashes, _leaf_pos, leaf_data, _proof) = segment.parts(); for chunk in leaf_data.into_iter() { self.bitmap_accumulator.append_chunk(chunk)?; - } + }*/ Ok(()) } diff --git a/chain/src/types.rs b/chain/src/types.rs index e206ea3c9..e829e79d8 100644 --- a/chain/src/types.rs +++ b/chain/src/types.rs @@ -17,7 +17,7 @@ use chrono::prelude::{DateTime, Utc}; use crate::core::core::hash::{Hash, Hashed, ZERO_HASH}; -use crate::core::core::{Block, BlockHeader, HeaderVersion}; +use crate::core::core::{Block, BlockHeader, HeaderVersion, SegmentTypeIdentifier}; use crate::core::pow::Difficulty; use crate::core::ser::{self, PMMRIndexHashable, Readable, Reader, Writeable, Writer}; use crate::error::{Error, ErrorKind}; @@ -131,6 +131,14 @@ impl Default for TxHashsetDownloadStats { pub struct SyncState { current: RwLock, sync_error: RwLock>, + /// Something has to keep track of segments that have been + /// requested from other peers. TODO consider: This may not + /// be the best place to put code that's concerned with peers + /// but it's currently the only place that makes the info + /// available where it will be needed (both in the adapter + /// and the sync loop) + /// TODO: Better struct for this, perhaps hash identifiers + requested_pibd_segments: RwLock>, } impl SyncState { @@ -139,6 +147,7 @@ impl SyncState { SyncState { current: RwLock::new(SyncStatus::Initial), sync_error: RwLock::new(None), + requested_pibd_segments: RwLock::new(vec![]), } } @@ -208,6 +217,21 @@ impl SyncState { *self.current.write() = SyncStatus::TxHashsetDownload(stats); } + /// Update PIBD segment list + pub fn add_pibd_segment(&self, id: &SegmentTypeIdentifier) { + self.requested_pibd_segments.write().push(id.clone()); + } + + /// Remove segment from list + pub fn remove_pibd_segment(&self, id: &SegmentTypeIdentifier) { + self.requested_pibd_segments.write().retain(|i| i != id); + } + + /// Check whether segment is in request list + pub fn contains_pibd_segment(&self, id: &SegmentTypeIdentifier) -> bool { + self.requested_pibd_segments.read().contains(id) + } + /// Communicate sync error pub fn set_sync_error(&self, error: Error) { *self.sync_error.write() = Some(error); diff --git a/chain/tests/test_data/chain_raw/lmdb/lock.mdb b/chain/tests/test_data/chain_raw/lmdb/lock.mdb index d594bd14502a4551c6038dde57fc3efc68a089db..da13f9f45cf59e5ba389ce9724f05b7e2c54e40a 100644 GIT binary patch delta 19 bcmZp0XmFS?k@Xu>-~9TG3*+S{CU64)Q(Xub delta 19 acmZp0XmFS?ku@yhV_E&ih4Jze6Sx6U{0IjC diff --git a/chain/tests/test_pibd_copy.rs b/chain/tests/test_pibd_copy.rs index d56052635..411898bf7 100644 --- a/chain/tests/test_pibd_copy.rs +++ b/chain/tests/test_pibd_copy.rs @@ -115,7 +115,9 @@ fn test_pibd_copy_impl(is_test_chain: bool, src_root_dir: &str, dest_root_dir: & // This is going to use the same block as horizon_header let segmenter = src_chain.segmenter().unwrap(); // Init desegmenter - let mut desegmenter = dest_chain.desegmenter(&horizon_header).unwrap(); + let desegmenter_lock = dest_chain.desegmenter(&horizon_header).unwrap(); + let mut desegmenter_write = desegmenter_lock.write(); + let desegmenter = desegmenter_write.as_mut().unwrap(); // And total size of the bitmap PMMR let bitmap_mmr_size = desegmenter.expected_bitmap_mmr_size(); diff --git a/core/src/core/pmmr/segment.rs b/core/src/core/pmmr/segment.rs index 5458d907a..1bf8458f3 100644 --- a/core/src/core/pmmr/segment.rs +++ b/core/src/core/pmmr/segment.rs @@ -21,6 +21,39 @@ use croaring::Bitmap; use std::cmp::min; use std::fmt::{self, Debug}; +#[derive(Clone, Debug, Eq, PartialEq)] +/// Possible segment types, according to this desegmenter +pub enum SegmentType { + /// Output Bitmap + Bitmap, + /// Output + Output, + /// RangeProof + RangeProof, + /// Kernel + Kernel, +} + +/// Lumps possible types with segment ids to enable a unique identifier +/// for a segment with respect to a particular archive header +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct SegmentTypeIdentifier { + /// The type of this segment + pub segment_type: SegmentType, + /// The identfier itself + pub identifier: SegmentIdentifier, +} + +impl SegmentTypeIdentifier { + /// Create + pub fn new(segment_type: SegmentType, identifier: SegmentIdentifier) -> Self { + Self { + segment_type, + identifier, + } + } +} + #[derive(Clone, Debug, PartialEq, Eq)] /// Error related to segment creation or validation pub enum SegmentError { @@ -90,6 +123,43 @@ impl SegmentIdentifier { let d = 1 << segment_height; ((pmmr::n_leaves(target_mmr_size) + d - 1) / d) as usize } + + /// Maximum number of leaves in a segment, given by `2**height` + fn segment_capacity(&self) -> u64 { + 1 << self.height + } + + /// Offset (in leaf idx) of first leaf in the segment + fn leaf_offset(&self) -> u64 { + self.idx * self.segment_capacity() + } + + // Number of leaves in this segment. Equal to capacity except for the final segment, which can be smaller + fn segment_unpruned_size(&self, mmr_size: u64) -> u64 { + min( + self.segment_capacity(), + pmmr::n_leaves(mmr_size).saturating_sub(self.leaf_offset()), + ) + } + + /// Inclusive (full) range of MMR positions for the segment that would be produced + /// by this Identifier + pub fn segment_pos_range(&self, mmr_size: u64) -> (u64, u64) { + let segment_size = self.segment_unpruned_size(mmr_size); + let leaf_offset = self.leaf_offset(); + let first = pmmr::insertion_to_pmmr_index(leaf_offset); + let last = if self.full_segment(mmr_size) { + pmmr::insertion_to_pmmr_index(leaf_offset + segment_size - 1) + (self.height as u64) + } else { + mmr_size - 1 + }; + (first, last) + } + + /// Whether the segment is full (segment size == capacity) + fn full_segment(&self, mmr_size: u64) -> bool { + self.segment_unpruned_size(mmr_size) == self.segment_capacity() + } } /// Segment of a PMMR: unpruned leaves and the necessary data to verify @@ -119,39 +189,27 @@ impl Segment { /// Maximum number of leaves in a segment, given by `2**height` fn segment_capacity(&self) -> u64 { - 1 << self.identifier.height + self.identifier.segment_capacity() } /// Offset (in leaf idx) of first leaf in the segment fn leaf_offset(&self) -> u64 { - self.identifier.idx * self.segment_capacity() + self.identifier.leaf_offset() } // Number of leaves in this segment. Equal to capacity except for the final segment, which can be smaller fn segment_unpruned_size(&self, mmr_size: u64) -> u64 { - min( - self.segment_capacity(), - pmmr::n_leaves(mmr_size).saturating_sub(self.leaf_offset()), - ) + self.identifier.segment_unpruned_size(mmr_size) } /// Whether the segment is full (segment size == capacity) fn full_segment(&self, mmr_size: u64) -> bool { - self.segment_unpruned_size(mmr_size) == self.segment_capacity() + self.identifier.full_segment(mmr_size) } /// Inclusive range of MMR positions for this segment pub fn segment_pos_range(&self, mmr_size: u64) -> (u64, u64) { - let segment_size = self.segment_unpruned_size(mmr_size); - let leaf_offset = self.leaf_offset(); - let first = pmmr::insertion_to_pmmr_index(leaf_offset); - let last = if self.full_segment(mmr_size) { - pmmr::insertion_to_pmmr_index(leaf_offset + segment_size - 1) - + (self.identifier.height as u64) - } else { - mmr_size - 1 - }; - (first, last) + self.identifier.segment_pos_range(mmr_size) } /// TODO - binary_search_by_key() here (can we assume these are sorted by pos?) diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index 621e75e9a..ef4b1afa4 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -32,7 +32,7 @@ use crate::core::core::hash::{Hash, Hashed}; use crate::core::core::transaction::Transaction; use crate::core::core::{ BlockHeader, BlockSums, CompactBlock, Inputs, OutputIdentifier, Segment, SegmentIdentifier, - TxKernel, + SegmentType, SegmentTypeIdentifier, TxKernel, }; use crate::core::pow::Difficulty; use crate::core::ser::ProtocolVersion; @@ -572,14 +572,31 @@ where segment: Segment, ) -> Result { debug!( - "RECEIVED BITMAP SEGMENT FOR block_hash: {}, output_root: {}", - block_hash, output_root + "Received bitmap segment {} for block_hash: {}, output_root: {}", + segment.identifier().idx, + block_hash, + output_root ); + // Remove segment from outgoing list TODO: Where is the best place to + // do this? + self.sync_state.remove_pibd_segment(&SegmentTypeIdentifier { + segment_type: SegmentType::Bitmap, + identifier: segment.identifier(), + }); // TODO: Entire process needs to be restarted if the horizon block // has changed (perhaps not here, NB this has to go somewhere) let archive_header = self.chain().txhashset_archive_header_header_only()?; - let mut desegmenter = self.chain().desegmenter(&archive_header)?; - desegmenter.add_bitmap_segment(segment, output_root)?; + let identifier = segment.identifier().clone(); + if let Some(d) = self.chain().desegmenter(&archive_header)?.write().as_mut() { + let res = d.add_bitmap_segment(segment, output_root); + if let Err(e) = res { + debug!( + "Validation of incoming bitmap segment failed: {:?}, reason: {}", + identifier, e + ); + return Err(e); + } + } Ok(true) } @@ -591,7 +608,9 @@ where ) -> Result { let archive_header = self.chain().txhashset_archive_header_header_only()?; let desegmenter = self.chain().desegmenter(&archive_header)?; - desegmenter.add_output_segment(segment, Some(bitmap_root))?; + if let Some(d) = desegmenter.write().as_ref() { + d.add_output_segment(segment, Some(bitmap_root))?; + } Ok(true) } @@ -602,7 +621,9 @@ where ) -> Result { let archive_header = self.chain().txhashset_archive_header_header_only()?; let desegmenter = self.chain().desegmenter(&archive_header)?; - desegmenter.add_rangeproof_segment(segment)?; + if let Some(d) = desegmenter.write().as_ref() { + d.add_rangeproof_segment(segment)?; + } Ok(true) } @@ -613,7 +634,9 @@ where ) -> Result { let archive_header = self.chain().txhashset_archive_header_header_only()?; let desegmenter = self.chain().desegmenter(&archive_header)?; - desegmenter.add_kernel_segment(segment)?; + if let Some(d) = desegmenter.write().as_ref() { + d.add_kernel_segment(segment)?; + } Ok(true) } } diff --git a/servers/src/grin/sync/state_sync.rs b/servers/src/grin/sync/state_sync.rs index 157e99d8b..f53310e7b 100644 --- a/servers/src/grin/sync/state_sync.rs +++ b/servers/src/grin/sync/state_sync.rs @@ -35,8 +35,6 @@ pub struct StateSync { prev_state_sync: Option>, state_sync_peer: Option>, - - sent_test_pibd_message: bool, } impl StateSync { @@ -51,7 +49,6 @@ impl StateSync { chain, prev_state_sync: None, state_sync_peer: None, - sent_test_pibd_message: false, } } @@ -172,19 +169,24 @@ impl StateSync { fn continue_pibd(&mut self) { // Check the state of our chain to figure out what we should be requesting next - // TODO: Just faking a single request for testing - if !self.sent_test_pibd_message { - debug!("Sending test PIBD message"); - let archive_header = self.chain.txhashset_archive_header_header_only().unwrap(); + let archive_header = self.chain.txhashset_archive_header_header_only().unwrap(); + let desegmenter = self.chain.desegmenter(&archive_header).unwrap(); - let target_segment_height = 11; - //let archive_header = self.chain.txhashset_archive_header().unwrap(); - let desegmenter = self.chain.desegmenter(&archive_header).unwrap(); - let bitmap_mmr_size = desegmenter.expected_bitmap_mmr_size(); - let mut identifier_iter = - SegmentIdentifier::traversal_iter(bitmap_mmr_size, target_segment_height); + // TODO and consider: number here depends on how many simultaneous + // requests we want to send to peers + let mut next_segment_ids = vec![]; + if let Some(d) = desegmenter.read().as_ref() { + next_segment_ids = d.next_desired_segments(10); + } + + // For each segment, pick a desirable peer and send message + // (Provided we're not waiting for a response for this message from someone else) + for seg_id in next_segment_ids.iter() { + if self.sync_state.contains_pibd_segment(seg_id) { + trace!("Request list contains, continuing: {:?}", seg_id); + continue; + } - self.sent_test_pibd_message = true; let peers_iter = || { self.peers .iter() @@ -202,11 +204,9 @@ impl StateSync { }); debug!("Chosen peer is {:?}", peer); if let Some(p) = peer { - p.send_bitmap_segment_request( - archive_header.hash(), - identifier_iter.next().unwrap(), - ) - .unwrap(); + p.send_bitmap_segment_request(archive_header.hash(), seg_id.identifier.clone()) + .unwrap(); + self.sync_state.add_pibd_segment(seg_id); } } }