[PIBD_IMPL] Introduce PIBD state into sync workflow (#3685)

* experimental addition of pibd download state for testnet only

* fixes to bitmap number of segments calculation + conversion of bitmap accumulator to bitmap

* attempt to call a test message

* add p2p methods for receiving bitmap segment and applying to desegmenter associated with chain

* fixes to state sync
This commit is contained in:
Yeastplume 2022-01-12 13:02:59 +00:00 committed by GitHub
parent 78c9794d30
commit 89730b7d6d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 228 additions and 39 deletions

View file

@ -923,6 +923,17 @@ impl Chain {
self.get_header_by_height(txhashset_height)
}
/// Return the Block Header at the txhashset horizon, considering only the
/// contents of the header PMMR
pub fn txhashset_archive_header_header_only(&self) -> Result<BlockHeader, Error> {
let header_head = self.header_head()?;
let threshold = global::state_sync_threshold() as u64;
let archive_interval = global::txhashset_archive_interval();
let mut txhashset_height = header_head.height.saturating_sub(threshold);
txhashset_height = txhashset_height.saturating_sub(txhashset_height % archive_interval);
self.get_header_by_height(txhashset_height)
}
// Special handling to make sure the whole kernel set matches each of its
// roots in each block header, without truncation. We go back header by
// header, rewind and check each root. This fixes a potential weakness in

View file

@ -194,10 +194,11 @@ impl BitmapAccumulator {
/// Return a raw in-memory bitmap of this accumulator
pub fn as_bitmap(&self) -> Result<Bitmap, Error> {
let mut bitmap = Bitmap::create();
for (chunk_count, chunk_index) in self.backend.leaf_idx_iter(0).enumerate() {
for (chunk_index, chunk_pos) in self.backend.leaf_pos_iter().enumerate() {
//TODO: Unwrap
let chunk = self.backend.get_data(chunk_index).unwrap();
bitmap.add_many(&chunk.set_iter(chunk_count * 1024).collect::<Vec<u32>>());
let chunk = self.backend.get_data(chunk_pos as u64).unwrap();
let additive = chunk.set_iter(chunk_index * 1024).collect::<Vec<u32>>();
bitmap.add_many(&additive);
}
Ok(bitmap)
}

View file

@ -30,6 +30,10 @@ use crate::txhashset;
use croaring::Bitmap;
/// States that the desegmenter can be in, to keep track of what
/// parts are needed next in the proces
pub enum DesegmenterState {}
/// Desegmenter for rebuilding a txhashset from PIBD segments
#[derive(Clone)]
pub struct Desegmenter {
@ -129,10 +133,18 @@ impl Desegmenter {
self.bitmap_mmr_leaf_count
);
// Total size of Bitmap PMMR
self.bitmap_mmr_size = pmmr::peaks(self.bitmap_mmr_leaf_count)
.last()
.unwrap_or(&pmmr::insertion_to_pmmr_index(self.bitmap_mmr_leaf_count))
.clone();
self.bitmap_mmr_size =
1 + pmmr::peaks(pmmr::insertion_to_pmmr_index(self.bitmap_mmr_leaf_count))
.last()
.unwrap_or(
&(pmmr::peaks(pmmr::insertion_to_pmmr_index(
self.bitmap_mmr_leaf_count - 1,
))
.last()
.unwrap()),
)
.clone();
debug!(
"pibd_desgmenter - expected size of bitmap MMR: {}",
self.bitmap_mmr_size

View file

@ -56,6 +56,14 @@ pub enum SyncStatus {
/// diff of the most advanced peer
highest_diff: Difficulty,
},
/// Performing PIBD reconstruction of txhashset
/// If PIBD syncer determines there's not enough
/// PIBD peers to continue, then move on to TxHashsetDownload state
TxHashsetPibd {
/// Whether the syncer has determined there's not enough
/// data to continue via PIBD
aborted: bool,
},
/// Downloading the various txhashsets
TxHashsetDownload(TxHashsetDownloadStats),
/// Setting up before validation

View file

@ -31,7 +31,9 @@ use crate::core::pow::Difficulty;
use crate::core::ser::Writeable;
use crate::core::{core, global};
use crate::handshake::Handshake;
use crate::msg::{self, BanReason, GetPeerAddrs, Locator, Msg, Ping, TxHashSetRequest, Type};
use crate::msg::{
self, BanReason, GetPeerAddrs, Locator, Msg, Ping, SegmentRequest, TxHashSetRequest, Type,
};
use crate::protocol::Protocol;
use crate::types::{
Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan,
@ -371,6 +373,20 @@ impl Peer {
)
}
pub fn send_bitmap_segment_request(
&self,
h: Hash,
identifier: SegmentIdentifier,
) -> Result<(), Error> {
self.send(
&SegmentRequest {
block_hash: h,
identifier,
},
msg::Type::GetOutputBitmapSegment,
)
}
/// Stops the peer
pub fn stop(&self) {
debug!("Stopping peer {:?}", self.info.addr);
@ -586,6 +602,16 @@ impl ChainAdapter for TrackingAdapter {
) -> Result<Segment<RangeProof>, chain::Error> {
self.adapter.get_rangeproof_segment(hash, id)
}
fn receive_bitmap_segment(
&self,
block_hash: Hash,
output_root: Hash,
segment: Segment<BitmapChunk>,
) -> Result<bool, chain::Error> {
self.adapter
.receive_bitmap_segment(block_hash, output_root, segment)
}
}
impl NetAdapter for TrackingAdapter {

View file

@ -669,6 +669,16 @@ impl ChainAdapter for Peers {
) -> Result<Segment<RangeProof>, chain::Error> {
self.adapter.get_rangeproof_segment(hash, id)
}
fn receive_bitmap_segment(
&self,
block_hash: Hash,
output_root: Hash,
segment: Segment<BitmapChunk>,
) -> Result<bool, chain::Error> {
self.adapter
.receive_bitmap_segment(block_hash, output_root, segment)
}
}
impl NetAdapter for Peers {

View file

@ -371,8 +371,20 @@ impl MessageHandler for Protocol {
Consumed::None
}
}
Message::OutputBitmapSegment(_)
| Message::OutputSegment(_)
Message::OutputBitmapSegment(req) => {
let OutputBitmapSegmentResponse {
block_hash,
segment,
output_root,
} = req;
debug!(
"Received Output Bitmap Segment: bh, output_root: {}, {}",
block_hash, output_root
);
adapter.receive_bitmap_segment(block_hash, output_root, segment.into())?;
Consumed::None
}
Message::OutputSegment(_)
| Message::RangeProofSegment(_)
| Message::KernelSegment(_) => Consumed::None,

View file

@ -410,6 +410,15 @@ impl ChainAdapter for DummyAdapter {
) -> Result<Segment<RangeProof>, chain::Error> {
unimplemented!()
}
fn receive_bitmap_segment(
&self,
block_hash: Hash,
output_root: Hash,
segment: Segment<BitmapChunk>,
) -> Result<bool, chain::Error> {
unimplemented!()
}
}
impl NetAdapter for DummyAdapter {

View file

@ -667,6 +667,13 @@ pub trait ChainAdapter: Sync + Send {
hash: Hash,
id: SegmentIdentifier,
) -> Result<Segment<RangeProof>, chain::Error>;
fn receive_bitmap_segment(
&self,
block_hash: Hash,
output_root: Hash,
segment: Segment<BitmapChunk>,
) -> Result<bool, chain::Error>;
}
/// Additional methods required by the protocol that don't need to be

View file

@ -564,6 +564,24 @@ where
}
segmenter.rangeproof_segment(id)
}
fn receive_bitmap_segment(
&self,
block_hash: Hash,
output_root: Hash,
segment: Segment<BitmapChunk>,
) -> Result<bool, chain::Error> {
debug!(
"RECEIVED BITMAP SEGMENT FOR block_hash: {}, output_root: {}",
block_hash, output_root
);
// TODO: Entire process needs to be restarted if the horizon block
// has changed (perhaps not here, NB for somewhere)
let archive_header = self.chain().txhashset_archive_header_header_only()?;
let mut desegmenter = self.chain().desegmenter(&archive_header)?;
desegmenter.add_bitmap_segment(segment, output_root)?;
Ok(true)
}
}
impl<B, P> NetToChainAdapter<B, P>

View file

@ -17,7 +17,7 @@ use chrono::Duration;
use std::sync::Arc;
use crate::chain::{self, SyncState, SyncStatus};
use crate::core::core::hash::Hashed;
use crate::core::core::{hash::Hashed, pmmr::segment::SegmentIdentifier};
use crate::core::global;
use crate::core::pow::Difficulty;
use crate::p2p::{self, Capabilities, Peer};
@ -35,6 +35,8 @@ pub struct StateSync {
prev_state_sync: Option<DateTime<Utc>>,
state_sync_peer: Option<Arc<Peer>>,
sent_test_pibd_message: bool,
}
impl StateSync {
@ -49,6 +51,7 @@ impl StateSync {
chain,
prev_state_sync: None,
state_sync_peer: None,
sent_test_pibd_message: false,
}
}
@ -74,15 +77,31 @@ impl StateSync {
sync_need_restart = true;
}
// Determine whether we're going to try using PIBD or whether we've already given up
// on it
let using_pibd =
if let SyncStatus::TxHashsetPibd { aborted: true, .. } = self.sync_state.status() {
false
} else {
// Only on testing chains for now
if global::get_chain_type() != global::ChainTypes::Mainnet {
true
} else {
false
}
};
// check peer connection status of this sync
if let Some(ref peer) = self.state_sync_peer {
if let SyncStatus::TxHashsetDownload { .. } = self.sync_state.status() {
if !peer.is_connected() {
sync_need_restart = true;
info!(
"state_sync: peer connection lost: {:?}. restart",
peer.info.addr,
);
if !using_pibd {
if let Some(ref peer) = self.state_sync_peer {
if let SyncStatus::TxHashsetDownload { .. } = self.sync_state.status() {
if !peer.is_connected() {
sync_need_restart = true;
info!(
"state_sync: peer connection lost: {:?}. restart",
peer.info.addr,
);
}
}
}
}
@ -111,35 +130,87 @@ impl StateSync {
// run fast sync if applicable, normally only run one-time, except restart in error
if sync_need_restart || header_head.height == highest_height {
let (go, download_timeout) = self.state_sync_due();
if let SyncStatus::TxHashsetDownload { .. } = self.sync_state.status() {
if download_timeout {
error!("state_sync: TxHashsetDownload status timeout in 10 minutes!");
self.sync_state.set_sync_error(
chain::ErrorKind::SyncError(format!("{:?}", p2p::Error::Timeout)).into(),
);
if using_pibd {
let (launch, _download_timeout) = self.state_sync_due();
if launch {
self.sync_state
.update(SyncStatus::TxHashsetPibd { aborted: false });
}
}
// Continue our PIBD process
self.continue_pibd();
} else {
let (go, download_timeout) = self.state_sync_due();
if go {
self.state_sync_peer = None;
match self.request_state(&header_head) {
Ok(peer) => {
self.state_sync_peer = Some(peer);
if let SyncStatus::TxHashsetDownload { .. } = self.sync_state.status() {
if download_timeout {
error!("state_sync: TxHashsetDownload status timeout in 10 minutes!");
self.sync_state.set_sync_error(
chain::ErrorKind::SyncError(format!("{:?}", p2p::Error::Timeout))
.into(),
);
}
Err(e) => self
.sync_state
.set_sync_error(chain::ErrorKind::SyncError(format!("{:?}", e)).into()),
}
self.sync_state
.update(SyncStatus::TxHashsetDownload(Default::default()));
if go {
self.state_sync_peer = None;
match self.request_state(&header_head) {
Ok(peer) => {
self.state_sync_peer = Some(peer);
}
Err(e) => self
.sync_state
.set_sync_error(chain::ErrorKind::SyncError(format!("{:?}", e)).into()),
}
self.sync_state
.update(SyncStatus::TxHashsetDownload(Default::default()));
}
}
}
true
}
fn continue_pibd(&mut self) {
// Check the state of our chain to figure out what we should be requesting next
// TODO: Just faking a single request for testing
if !self.sent_test_pibd_message {
debug!("Sending test PIBD message");
let archive_header = self.chain.txhashset_archive_header_header_only().unwrap();
let target_segment_height = 11;
//let archive_header = self.chain.txhashset_archive_header().unwrap();
let desegmenter = self.chain.desegmenter(&archive_header).unwrap();
let bitmap_mmr_size = desegmenter.expected_bitmap_mmr_size();
let mut identifier_iter =
SegmentIdentifier::traversal_iter(bitmap_mmr_size, target_segment_height);
self.sent_test_pibd_message = true;
let peers_iter = || {
self.peers
.iter()
.with_capabilities(Capabilities::PIBD_HIST)
.connected()
};
// Filter peers further based on max difficulty.
let max_diff = peers_iter().max_difficulty().unwrap_or(Difficulty::zero());
let peers_iter = || peers_iter().with_difficulty(|x| x >= max_diff);
// Choose a random "most work" peer, preferring outbound if at all possible.
let peer = peers_iter().outbound().choose_random().or_else(|| {
warn!("no suitable outbound peer for pibd message, considering inbound");
peers_iter().inbound().choose_random()
});
debug!("Chosen peer is {:?}", peer);
if let Some(p) = peer {
p.send_bitmap_segment_request(
archive_header.hash(),
identifier_iter.next().unwrap(),
)
.unwrap();
}
}
}
fn request_state(&self, header_head: &chain::Tip) -> Result<Arc<Peer>, p2p::Error> {
let threshold = global::state_sync_threshold() as u64;
let archive_interval = global::txhashset_archive_interval();

View file

@ -207,7 +207,8 @@ impl SyncRunner {
let mut check_state_sync = false;
match self.sync_state.status() {
SyncStatus::TxHashsetDownload { .. }
SyncStatus::TxHashsetPibd { .. }
| SyncStatus::TxHashsetDownload { .. }
| SyncStatus::TxHashsetSetup
| SyncStatus::TxHashsetRangeProofsValidation { .. }
| SyncStatus::TxHashsetKernelsValidation { .. }

View file

@ -50,6 +50,9 @@ impl TUIStatusView {
};
Cow::Owned(format!("Sync step 1/7: Downloading headers: {}%", percent))
}
SyncStatus::TxHashsetPibd { .. } => {
Cow::Borrowed("Sync step 2/7: Performing PIBD Body Sync (experimental)")
}
SyncStatus::TxHashsetDownload(stat) => {
if stat.total_size > 0 {
let percent = stat.downloaded_size * 100 / stat.total_size;