From 436bacf17ef8613f50d68d442c23a779019fdeb0 Mon Sep 17 00:00:00 2001 From: Yeastplume Date: Tue, 25 Jan 2022 10:38:13 +0000 Subject: [PATCH] [PIBD_IMPL] Bitmap accumulator reconstruction + TxHashset set reconstruction (#3689) * application of received bitmap segments to local accumulator * add all required elements to send/receive output segment requests and responses * testing of output sync * add special cases to pmmr segment request --- chain/src/txhashset/desegmenter.rs | 258 +++++++++++++++++++++++----- chain/src/txhashset/txhashset.rs | 29 +++- p2p/src/peer.rs | 42 +++++ p2p/src/protocol.rs | 38 +++- servers/src/common/adapters.rs | 27 ++- servers/src/grin/sync/state_sync.rs | 48 +++++- 6 files changed, 376 insertions(+), 66 deletions(-) diff --git a/chain/src/txhashset/desegmenter.rs b/chain/src/txhashset/desegmenter.rs index 56cb8c7d9..e231e2521 100644 --- a/chain/src/txhashset/desegmenter.rs +++ b/chain/src/txhashset/desegmenter.rs @@ -41,12 +41,13 @@ pub struct Desegmenter { archive_header: BlockHeader, store: Arc, - default_segment_height: u8, + default_bitmap_segment_height: u8, + default_output_segment_height: u8, bitmap_accumulator: BitmapAccumulator, - bitmap_segments: Vec>, - _output_segments: Vec>, - _rangeproof_segments: Vec>, + bitmap_segment_cache: Vec>, + output_segment_cache: Vec>, + _rangeproof_segment_cache: Vec>, _kernel_segments: Vec>, bitmap_mmr_leaf_count: u64, @@ -70,10 +71,11 @@ impl Desegmenter { archive_header, store, bitmap_accumulator: BitmapAccumulator::new(), - default_segment_height: 9, - bitmap_segments: vec![], - _output_segments: vec![], - _rangeproof_segments: vec![], + default_bitmap_segment_height: 9, + default_output_segment_height: 11, + bitmap_segment_cache: vec![], + output_segment_cache: vec![], + _rangeproof_segment_cache: vec![], _kernel_segments: vec![], bitmap_mmr_leaf_count: 0, @@ -94,29 +96,58 @@ impl Desegmenter { self.bitmap_mmr_size } + /// Apply next set of segments that are ready to be appended to their respective trees, + /// and kick off any validations that can happen. TODO: figure out where and how + /// this should be called considering any thread blocking implications + pub fn apply_next_segments(&mut self) -> Result<(), Error> { + let next_bmp_idx = self.next_required_bitmap_segment_index(); + if let Some(bmp_idx) = next_bmp_idx { + if let Some((idx, _seg)) = self + .bitmap_segment_cache + .iter() + .enumerate() + .find(|s| s.1.identifier().idx == bmp_idx) + { + self.apply_bitmap_segment(idx)?; + } + } else { + // Check if we need to finalize bitmap + if self.bitmap_cache == None { + // Should have all the pieces now, finalize the bitmap cache + self.finalize_bitmap()?; + } + // Check if we can apply the next output segment + if let Some(next_output_idx) = self.next_required_output_segment_index() { + debug!("Next output index to apply: {}", next_output_idx); + if let Some((idx, _seg)) = self + .output_segment_cache + .iter() + .enumerate() + .find(|s| s.1.identifier().idx == next_output_idx) + { + self.apply_output_segment(idx)?; + } + } + // TODO: Ditto RP, kernel + } + Ok(()) + } + /// Return list of the next preferred segments the desegmenter needs based on /// the current real state of the underlying elements pub fn next_desired_segments(&self, max_elements: usize) -> Vec { let mut return_vec = vec![]; - // First check for required bitmap elements if self.bitmap_cache.is_none() { - trace!("Desegmenter needs bitmap segments"); // Get current size of bitmap MMR let local_pmmr_size = self.bitmap_accumulator.readonly_pmmr().unpruned_size(); - trace!("Local Bitmap PMMR Size is: {}", local_pmmr_size); // Get iterator over expected bitmap elements let mut identifier_iter = SegmentIdentifier::traversal_iter( self.bitmap_mmr_size, - self.default_segment_height, + self.default_bitmap_segment_height, ); - trace!("Expected bitmap MMR size is: {}", self.bitmap_mmr_size); // Advance iterator to next expected segment while let Some(id) = identifier_iter.next() { - trace!( - "ID segment pos range: {:?}", - id.segment_pos_range(self.bitmap_mmr_size) - ); if id.segment_pos_range(self.bitmap_mmr_size).1 > local_pmmr_size { if !self.has_bitmap_segment_with_id(id) { return_vec.push(SegmentTypeIdentifier::new(SegmentType::Bitmap, id)); @@ -126,6 +157,47 @@ impl Desegmenter { } } } + } else { + // We have all required bitmap segments and have recreated our local + // bitmap, now continue with other segments + // TODO: Outputs only for now, just for testing. we'll want to evenly spread + // requests among the 3 PMMRs + let local_output_mmr_size; + let mut _local_kernel_mmr_size; + let mut _local_rangeproof_mmr_size; + { + let txhashset = self.txhashset.read(); + local_output_mmr_size = txhashset.output_mmr_size(); + _local_kernel_mmr_size = txhashset.kernel_mmr_size(); + _local_rangeproof_mmr_size = txhashset.rangeproof_mmr_size(); + } + // TODO: Fix, alternative approach, this is very inefficient + let mut output_identifier_iter = SegmentIdentifier::traversal_iter( + self.archive_header.output_mmr_size, + self.default_output_segment_height, + ); + debug!("local output mmr size is: {}", local_output_mmr_size); + while return_vec.len() < max_elements { + // Next segment from output PMMR + if let Some(id) = output_identifier_iter.next() { + if id.segment_pos_range(self.archive_header.output_mmr_size).1 + > local_output_mmr_size + { + if !self.has_output_segment_with_id(id) { + return_vec.push(SegmentTypeIdentifier::new(SegmentType::Output, id)); + if return_vec.len() >= max_elements { + break; + } + } + } + } + // TODO: likewise next segments from kernel and rangeproof pmmrs + + // No more segments required + if return_vec.is_empty() { + break; + } + } } return_vec } @@ -138,7 +210,7 @@ impl Desegmenter { /// being shut down and restarted pub fn finalize_bitmap(&mut self) -> Result<(), Error> { debug!( - "pibd_desgmenter: caching bitmap - accumulator root: {}", + "pibd_desgmenter: finalizing and caching bitmap - accumulator root: {}", self.bitmap_accumulator.root() ); self.bitmap_cache = Some(self.bitmap_accumulator.as_bitmap()?); @@ -153,7 +225,6 @@ impl Desegmenter { &mut batch, |ext, _batch| { let extension = &mut ext.extension; - // TODO: Unwrap extension.set_bitmap_accumulator(self.bitmap_accumulator.clone()); Ok(()) }, @@ -193,23 +264,41 @@ impl Desegmenter { /// Cache a bitmap segment if we don't already have it fn cache_bitmap_segment(&mut self, in_seg: Segment) { if self - .bitmap_segments + .bitmap_segment_cache .iter() .find(|i| i.identifier() == in_seg.identifier()) .is_none() { - self.bitmap_segments.push(in_seg); + self.bitmap_segment_cache.push(in_seg); } } /// Whether our list already contains this bitmap segment fn has_bitmap_segment_with_id(&self, seg_id: SegmentIdentifier) -> bool { - self.bitmap_segments + self.bitmap_segment_cache .iter() .find(|i| i.identifier() == seg_id) .is_some() } + /// Return an identifier for the next segment we need for the bitmap pmmr + fn next_required_bitmap_segment_index(&self) -> Option { + let local_bitmap_pmmr_size = self.bitmap_accumulator.readonly_pmmr().unpruned_size(); + let cur_segment_count = SegmentIdentifier::count_segments_required( + local_bitmap_pmmr_size, + self.default_bitmap_segment_height, + ); + let total_segment_count = SegmentIdentifier::count_segments_required( + self.bitmap_mmr_size, + self.default_bitmap_segment_height, + ); + if cur_segment_count == total_segment_count { + None + } else { + Some(cur_segment_count as u64) + } + } + /// Adds and validates a bitmap chunk /// TODO: Still experimenting, this expects chunks received to be in order pub fn add_bitmap_segment( @@ -229,32 +318,51 @@ impl Desegmenter { debug!("pibd_desegmenter: adding segment to cache"); // All okay, add to our cached list of bitmap segments self.cache_bitmap_segment(segment); - - // All okay, add leaves to bitmap accumulator - /*let (_sid, _hash_pos, _hashes, _leaf_pos, leaf_data, _proof) = segment.parts(); - for chunk in leaf_data.into_iter() { - self.bitmap_accumulator.append_chunk(chunk)?; - }*/ Ok(()) } - /// Adds a output segment - /// TODO: Still experimenting, expects chunks received to be in order - pub fn add_output_segment( - &self, - segment: Segment, - _bitmap_root: Option, - ) -> Result<(), Error> { - debug!("pibd_desegmenter: add output segment"); - // TODO: check bitmap root matches what we already have - segment.validate_with( - self.archive_header.output_mmr_size, // Last MMR pos at the height being validated - self.bitmap_cache.as_ref(), - self.archive_header.output_root, // Output root we're checking for - self.archive_header.output_mmr_size, - self.bitmap_accumulator.root(), // Other root - false, - )?; + /// Apply a bitmap segment at the index cache + pub fn apply_bitmap_segment(&mut self, idx: usize) -> Result<(), Error> { + let segment = self.bitmap_segment_cache.remove(idx); + debug!( + "pibd_desegmenter: apply bitmap segment at segment idx {}", + segment.identifier().idx + ); + // Add leaves to bitmap accumulator + let (_sid, _hash_pos, _hashes, _leaf_pos, leaf_data, _proof) = segment.parts(); + for chunk in leaf_data.into_iter() { + self.bitmap_accumulator.append_chunk(chunk)?; + } + Ok(()) + } + + /// Whether our list already contains this bitmap segment + fn has_output_segment_with_id(&self, seg_id: SegmentIdentifier) -> bool { + self.output_segment_cache + .iter() + .find(|i| i.identifier() == seg_id) + .is_some() + } + + /// Cache an output segment if we don't already have it + fn cache_output_segment(&mut self, in_seg: Segment) { + if self + .output_segment_cache + .iter() + .find(|i| i.identifier() == in_seg.identifier()) + .is_none() + { + self.output_segment_cache.push(in_seg); + } + } + + /// Apply an output segment at the index cache + pub fn apply_output_segment(&mut self, idx: usize) -> Result<(), Error> { + let segment = self.output_segment_cache.remove(idx); + debug!( + "pibd_desegmenter: applying output segment at segment idx {}", + segment.identifier().idx + ); let mut header_pmmr = self.header_pmmr.write(); let mut txhashset = self.txhashset.write(); let mut batch = self.store.batch()?; @@ -271,6 +379,66 @@ impl Desegmenter { Ok(()) } + /// Return an identifier for the next segment we need for the output pmmr + fn next_required_output_segment_index(&self) -> Option { + let local_output_mmr_size; + { + let txhashset = self.txhashset.read(); + local_output_mmr_size = txhashset.output_mmr_size(); + } + + // Special case here. If the mmr size is 1, this is a fresh chain + // with naught but a humble genesis block. We need segment 0, (and + // also need to skip the genesis block when applying the segment) + + let cur_segment_count = if local_output_mmr_size == 1 { + 0 + } else { + SegmentIdentifier::count_segments_required( + local_output_mmr_size, + self.default_output_segment_height, + ) + }; + + let total_segment_count = SegmentIdentifier::count_segments_required( + self.archive_header.output_mmr_size, + self.default_output_segment_height, + ); + debug!( + "Next required output segment is {} of {}", + cur_segment_count, total_segment_count + ); + if cur_segment_count == total_segment_count { + None + } else { + Some(cur_segment_count as u64) + } + } + + /// Adds a output segment + pub fn add_output_segment( + &mut self, + segment: Segment, + bitmap_root: Option, + ) -> Result<(), Error> { + debug!("pibd_desegmenter: add output segment"); + // TODO: This, something very wrong, probably need to reset entire body sync + // check bitmap root matches what we already have + /*if bitmap_root != Some(self.bitmap_accumulator.root()) { + + }*/ + segment.validate_with( + self.archive_header.output_mmr_size, // Last MMR pos at the height being validated + self.bitmap_cache.as_ref(), + self.archive_header.output_root, // Output root we're checking for + self.archive_header.output_mmr_size, + self.bitmap_accumulator.root(), // Other root + false, + )?; + self.cache_output_segment(segment); + Ok(()) + } + /// Adds a Rangeproof segment /// TODO: Still experimenting, expects chunks received to be in order pub fn add_rangeproof_segment(&self, segment: Segment) -> Result<(), Error> { diff --git a/chain/src/txhashset/txhashset.rs b/chain/src/txhashset/txhashset.rs index 8263b8e3d..6ecb84768 100644 --- a/chain/src/txhashset/txhashset.rs +++ b/chain/src/txhashset/txhashset.rs @@ -347,11 +347,6 @@ impl TxHashSet { .elements_from_pmmr_index(start_index, max_count, max_index) } - /// number of outputs - pub fn output_mmr_size(&self) -> u64 { - self.output_pmmr_h.size - } - /// As above, for rangeproofs pub fn rangeproofs_by_pmmr_index( &self, @@ -363,6 +358,21 @@ impl TxHashSet { .elements_from_pmmr_index(start_index, max_count, max_index) } + /// size of output mmr + pub fn output_mmr_size(&self) -> u64 { + self.output_pmmr_h.size + } + + /// size of kernel mmr + pub fn kernel_mmr_size(&self) -> u64 { + self.kernel_pmmr_h.size + } + + /// size of rangeproof mmr (can differ from output mmr size during PIBD sync) + pub fn rangeproof_mmr_size(&self) -> u64 { + self.rproof_pmmr_h.size + } + /// Find a kernel with a given excess. Work backwards from `max_index` to `min_index` /// NOTE: this linear search over all kernel history can be VERY expensive /// public API access to this method should be limited @@ -1248,8 +1258,13 @@ impl<'a> Extension<'a> { &mut self, segment: Segment, ) -> Result<(), Error> { - let (_sid, _hash_pos, _hashes, _leaf_pos, leaf_data, _proof) = segment.parts(); - for output_identifier in leaf_data { + let (sid, _hash_pos, _hashes, _leaf_pos, leaf_data, _proof) = segment.parts(); + for (index, output_identifier) in leaf_data.iter().enumerate() { + // Special case, if this is segment 0, skip the genesis block which should + // already be applied + if sid.idx == 0 && index == 0 { + continue; + } self.output_pmmr .push(&output_identifier) .map_err(&ErrorKind::TxHashSetErr)?; diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 6ec566bd4..a0433d25e 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -387,6 +387,48 @@ impl Peer { ) } + pub fn send_output_segment_request( + &self, + h: Hash, + identifier: SegmentIdentifier, + ) -> Result<(), Error> { + self.send( + &SegmentRequest { + block_hash: h, + identifier, + }, + msg::Type::GetOutputSegment, + ) + } + + pub fn send_rangeproof_segment_request( + &self, + h: Hash, + identifier: SegmentIdentifier, + ) -> Result<(), Error> { + self.send( + &SegmentRequest { + block_hash: h, + identifier, + }, + msg::Type::GetRangeProofSegment, + ) + } + + pub fn send_kernel_segment_request( + &self, + h: Hash, + identifier: SegmentIdentifier, + ) -> Result<(), Error> { + self.send( + &SegmentRequest { + block_hash: h, + identifier, + }, + msg::Type::GetKernelSegment, + ) + } + /// Stops the peer pub fn stop(&self) { debug!("Stopping peer {:?}", self.info.addr); diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index f61d4a9c0..3fdfe3e5b 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -384,10 +384,40 @@ impl MessageHandler for Protocol { adapter.receive_bitmap_segment(block_hash, output_root, segment.into())?; Consumed::None } - Message::OutputSegment(_) - | Message::RangeProofSegment(_) - | Message::KernelSegment(_) => Consumed::None, - + Message::OutputSegment(req) => { + let OutputSegmentResponse { + response, + output_bitmap_root, + } = req; + debug!( + "Received Output Segment: bh, bitmap_root: {}, {}", + response.block_hash, output_bitmap_root + ); + adapter.receive_output_segment( + response.block_hash, + output_bitmap_root, + response.segment.into(), + )?; + Consumed::None + } + Message::RangeProofSegment(req) => { + let SegmentResponse { + block_hash, + segment, + } = req; + debug!("Received Rangeproof Segment: bh: {}", block_hash); + adapter.receive_rangeproof_segment(block_hash, segment.into())?; + Consumed::None + } + Message::KernelSegment(req) => { + let SegmentResponse { + block_hash, + segment, + } = req; + debug!("Received Kernel Segment: bh: {}", block_hash); + adapter.receive_kernel_segment(block_hash, segment.into())?; + Consumed::None + } Message::Unknown(_) => Consumed::None, }; Ok(consumed) diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index ef4b1afa4..0b62c6e76 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -602,14 +602,33 @@ where fn receive_output_segment( &self, - _block_hash: Hash, + block_hash: Hash, bitmap_root: Hash, segment: Segment, ) -> Result { + debug!( + "Received output segment {} for block_hash: {}, bitmap_root: {:?}", + segment.identifier().idx, + block_hash, + bitmap_root, + ); + // Remove segment from outgoing list TODO: Where is the best place to + // do this? + self.sync_state.remove_pibd_segment(&SegmentTypeIdentifier { + segment_type: SegmentType::Output, + identifier: segment.identifier(), + }); let archive_header = self.chain().txhashset_archive_header_header_only()?; - let desegmenter = self.chain().desegmenter(&archive_header)?; - if let Some(d) = desegmenter.write().as_ref() { - d.add_output_segment(segment, Some(bitmap_root))?; + let identifier = segment.identifier().clone(); + if let Some(d) = self.chain().desegmenter(&archive_header)?.write().as_mut() { + let res = d.add_output_segment(segment, Some(bitmap_root)); + if let Err(e) = res { + debug!( + "Validation of incoming output segment failed: {:?}, reason: {}", + identifier, e + ); + return Err(e); + } } Ok(true) } diff --git a/servers/src/grin/sync/state_sync.rs b/servers/src/grin/sync/state_sync.rs index f53310e7b..53ac07e35 100644 --- a/servers/src/grin/sync/state_sync.rs +++ b/servers/src/grin/sync/state_sync.rs @@ -17,7 +17,7 @@ use chrono::Duration; use std::sync::Arc; use crate::chain::{self, SyncState, SyncStatus}; -use crate::core::core::{hash::Hashed, pmmr::segment::SegmentIdentifier}; +use crate::core::core::{hash::Hashed, pmmr::segment::SegmentType}; use crate::core::global; use crate::core::pow::Difficulty; use crate::p2p::{self, Capabilities, Peer}; @@ -172,11 +172,22 @@ impl StateSync { let archive_header = self.chain.txhashset_archive_header_header_only().unwrap(); let desegmenter = self.chain.desegmenter(&archive_header).unwrap(); + // Apply segments... TODO: figure out how this should be called, might + // need to be a separate thread. + if let Some(mut de) = desegmenter.try_write() { + if let Some(d) = de.as_mut() { + d.apply_next_segments().unwrap(); + } + } + // TODO and consider: number here depends on how many simultaneous // requests we want to send to peers let mut next_segment_ids = vec![]; - if let Some(d) = desegmenter.read().as_ref() { - next_segment_ids = d.next_desired_segments(10); + if let Some(d) = desegmenter.write().as_mut() { + // Figure out the next segments we need + // (12 is divisible by 3, to try and evenly spread the requests among the 3 + // main pmmrs. Bitmaps segments will always be requested first) + next_segment_ids = d.next_desired_segments(12); } // For each segment, pick a desirable peer and send message @@ -202,10 +213,35 @@ impl StateSync { warn!("no suitable outbound peer for pibd message, considering inbound"); peers_iter().inbound().choose_random() }); - debug!("Chosen peer is {:?}", peer); + trace!("Chosen peer is {:?}", peer); if let Some(p) = peer { - p.send_bitmap_segment_request(archive_header.hash(), seg_id.identifier.clone()) - .unwrap(); + match seg_id.segment_type { + SegmentType::Bitmap => p + .send_bitmap_segment_request( + archive_header.hash(), + seg_id.identifier.clone(), + ) + .unwrap(), + SegmentType::Output => p + .send_output_segment_request( + archive_header.hash(), + seg_id.identifier.clone(), + ) + .unwrap(), + SegmentType::RangeProof => p + .send_rangeproof_segment_request( + archive_header.hash(), + seg_id.identifier.clone(), + ) + .unwrap(), + SegmentType::Kernel => p + .send_kernel_segment_request( + archive_header.hash(), + seg_id.identifier.clone(), + ) + .unwrap(), + }; + // add to list of segments that are being tracked self.sync_state.add_pibd_segment(seg_id); } }