cleanup redundant sync_head and associated MMR (#3556)

This commit is contained in:
Antioch Peverell 2021-02-24 19:11:08 +00:00 committed by GitHub
parent 3583028781
commit 03b7518884
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 23 additions and 92 deletions

View file

@ -151,7 +151,6 @@ pub struct Chain {
orphans: Arc<OrphanBlockPool>, orphans: Arc<OrphanBlockPool>,
txhashset: Arc<RwLock<txhashset::TxHashSet>>, txhashset: Arc<RwLock<txhashset::TxHashSet>>,
header_pmmr: Arc<RwLock<txhashset::PMMRHandle<BlockHeader>>>, header_pmmr: Arc<RwLock<txhashset::PMMRHandle<BlockHeader>>>,
sync_pmmr: Arc<RwLock<txhashset::PMMRHandle<BlockHeader>>>,
verifier_cache: Arc<RwLock<dyn VerifierCache>>, verifier_cache: Arc<RwLock<dyn VerifierCache>>,
pibd_segmenter: Arc<RwLock<Option<Segmenter>>>, pibd_segmenter: Arc<RwLock<Option<Segmenter>>>,
// POW verification function // POW verification function
@ -187,20 +186,8 @@ impl Chain {
ProtocolVersion(1), ProtocolVersion(1),
None, None,
)?; )?;
let mut sync_pmmr = PMMRHandle::new(
Path::new(&db_root).join("header").join("sync_head"),
false,
ProtocolVersion(1),
None,
)?;
setup_head( setup_head(&genesis, &store, &mut header_pmmr, &mut txhashset)?;
&genesis,
&store,
&mut header_pmmr,
&mut sync_pmmr,
&mut txhashset,
)?;
// Initialize the output_pos index based on UTXO set // Initialize the output_pos index based on UTXO set
// and NRD kernel_pos index based recent kernel history. // and NRD kernel_pos index based recent kernel history.
@ -218,7 +205,6 @@ impl Chain {
orphans: Arc::new(OrphanBlockPool::new()), orphans: Arc::new(OrphanBlockPool::new()),
txhashset: Arc::new(RwLock::new(txhashset)), txhashset: Arc::new(RwLock::new(txhashset)),
header_pmmr: Arc::new(RwLock::new(header_pmmr)), header_pmmr: Arc::new(RwLock::new(header_pmmr)),
sync_pmmr: Arc::new(RwLock::new(sync_pmmr)),
pibd_segmenter: Arc::new(RwLock::new(None)), pibd_segmenter: Arc::new(RwLock::new(None)),
pow_verifier, pow_verifier,
verifier_cache, verifier_cache,
@ -273,7 +259,6 @@ impl Chain {
}; };
log_head("head", self.head()?); log_head("head", self.head()?);
log_head("header_head", self.header_head()?); log_head("header_head", self.header_head()?);
log_head("sync_head", self.get_sync_head()?);
Ok(()) Ok(())
} }
@ -497,23 +482,14 @@ impl Chain {
/// This is only ever used during sync and is based on sync_head. /// This is only ever used during sync and is based on sync_head.
/// We update header_head here if our total work increases. /// We update header_head here if our total work increases.
pub fn sync_block_headers(&self, headers: &[BlockHeader], opts: Options) -> Result<(), Error> { pub fn sync_block_headers(&self, headers: &[BlockHeader], opts: Options) -> Result<(), Error> {
let mut sync_pmmr = self.sync_pmmr.write();
let mut header_pmmr = self.header_pmmr.write(); let mut header_pmmr = self.header_pmmr.write();
let mut txhashset = self.txhashset.write(); let mut txhashset = self.txhashset.write();
// Sync the chunk of block headers, updating sync_head as necessary. // Sync the chunk of block headers, updating header_head if total work increases.
{ {
let batch = self.store.batch()?;
let mut ctx = self.new_ctx(opts, batch, &mut sync_pmmr, &mut txhashset)?;
pipe::sync_block_headers(headers, &mut ctx)?;
ctx.batch.commit()?;
}
// Now "process" the last block header, updating header_head to match sync_head.
if let Some(header) = headers.last() {
let batch = self.store.batch()?; let batch = self.store.batch()?;
let mut ctx = self.new_ctx(opts, batch, &mut header_pmmr, &mut txhashset)?; let mut ctx = self.new_ctx(opts, batch, &mut header_pmmr, &mut txhashset)?;
pipe::process_block_header(header, &mut ctx)?; pipe::process_block_headers(headers, &mut ctx)?;
ctx.batch.commit()?; ctx.batch.commit()?;
} }
@ -944,21 +920,6 @@ impl Chain {
Ok(()) Ok(())
} }
/// Rebuild the sync MMR based on current header_head.
/// We rebuild the sync MMR when first entering sync mode so ensure we
/// have an MMR we can safely rewind based on the headers received from a peer.
pub fn rebuild_sync_mmr(&self, head: &Tip) -> Result<(), Error> {
let mut sync_pmmr = self.sync_pmmr.write();
let mut batch = self.store.batch()?;
let header = batch.get_block_header(&head.hash())?;
txhashset::header_extending(&mut sync_pmmr, &mut batch, |ext, batch| {
pipe::rewind_and_apply_header_fork(&header, ext, batch)?;
Ok(())
})?;
batch.commit()?;
Ok(())
}
/// Finds the "fork point" where header chain diverges from full block chain. /// Finds the "fork point" where header chain diverges from full block chain.
/// If we are syncing this will correspond to the last full block where /// If we are syncing this will correspond to the last full block where
/// the next header is known but we do not yet have the full block. /// the next header is known but we do not yet have the full block.
@ -1572,18 +1533,9 @@ impl Chain {
} }
} }
/// Get the tip of the current "sync" header chain.
/// This may be significantly different to current header chain.
pub fn get_sync_head(&self) -> Result<Tip, Error> {
let hash = self.sync_pmmr.read().head_hash()?;
let header = self.store.get_block_header(&hash)?;
Ok(Tip::from_header(&header))
}
/// Gets multiple headers at the provided heights. /// Gets multiple headers at the provided heights.
/// Note: Uses the sync pmmr, not the header pmmr.
pub fn get_locator_hashes(&self, heights: &[u64]) -> Result<Vec<Hash>, Error> { pub fn get_locator_hashes(&self, heights: &[u64]) -> Result<Vec<Hash>, Error> {
let pmmr = self.sync_pmmr.read(); let pmmr = self.header_pmmr.read();
heights heights
.iter() .iter()
.map(|h| pmmr.get_header_hash_by_height(*h)) .map(|h| pmmr.get_header_hash_by_height(*h))
@ -1611,12 +1563,11 @@ fn setup_head(
genesis: &Block, genesis: &Block,
store: &store::ChainStore, store: &store::ChainStore,
header_pmmr: &mut txhashset::PMMRHandle<BlockHeader>, header_pmmr: &mut txhashset::PMMRHandle<BlockHeader>,
sync_pmmr: &mut txhashset::PMMRHandle<BlockHeader>,
txhashset: &mut txhashset::TxHashSet, txhashset: &mut txhashset::TxHashSet,
) -> Result<(), Error> { ) -> Result<(), Error> {
let mut batch = store.batch()?; let mut batch = store.batch()?;
// Apply the genesis header to header and sync MMRs. // Apply the genesis header to header MMR.
{ {
if batch.get_block_header(&genesis.hash()).is_err() { if batch.get_block_header(&genesis.hash()).is_err() {
batch.save_block_header(&genesis.header)?; batch.save_block_header(&genesis.header)?;
@ -1627,12 +1578,6 @@ fn setup_head(
ext.apply_header(&genesis.header) ext.apply_header(&genesis.header)
})?; })?;
} }
if sync_pmmr.last_pos == 0 {
txhashset::header_extending(sync_pmmr, &mut batch, |ext, _| {
ext.apply_header(&genesis.header)
})?;
}
} }
// Make sure our header PMMR is consistent with header_head from db if it exists. // Make sure our header PMMR is consistent with header_head from db if it exists.

View file

@ -179,9 +179,9 @@ pub fn process_block(
} }
} }
/// Sync a chunk of block headers. /// Process a batch of sequential block headers.
/// This is only used during header sync. /// This is only used during header sync.
pub fn sync_block_headers( pub fn process_block_headers(
headers: &[BlockHeader], headers: &[BlockHeader],
ctx: &mut BlockContext<'_>, ctx: &mut BlockContext<'_>,
) -> Result<(), Error> { ) -> Result<(), Error> {
@ -193,14 +193,14 @@ pub fn sync_block_headers(
// Check if we know about all these headers. If so we can accept them quickly. // Check if we know about all these headers. If so we can accept them quickly.
// If they *do not* increase total work on the sync chain we are done. // If they *do not* increase total work on the sync chain we are done.
// If they *do* increase total work then we should process them to update sync_head. // If they *do* increase total work then we should process them to update sync_head.
let sync_head = { let head = {
let hash = ctx.header_pmmr.head_hash()?; let hash = ctx.header_pmmr.head_hash()?;
let header = ctx.batch.get_block_header(&hash)?; let header = ctx.batch.get_block_header(&hash)?;
Tip::from_header(&header) Tip::from_header(&header)
}; };
if let Ok(existing) = ctx.batch.get_block_header(&last_header.hash()) { if let Ok(existing) = ctx.batch.get_block_header(&last_header.hash()) {
if !has_more_work(&existing, &sync_head) { if !has_more_work(&existing, &head) {
return Ok(()); return Ok(());
} }
} }
@ -216,7 +216,13 @@ pub fn sync_block_headers(
txhashset::header_extending(&mut ctx.header_pmmr, &mut ctx.batch, |ext, batch| { txhashset::header_extending(&mut ctx.header_pmmr, &mut ctx.batch, |ext, batch| {
rewind_and_apply_header_fork(&last_header, ext, batch)?; rewind_and_apply_header_fork(&last_header, ext, batch)?;
Ok(()) Ok(())
}) })?;
if has_more_work(last_header, &head) {
update_header_head(&Tip::from_header(last_header), &mut ctx.batch)?;
}
Ok(())
} }
/// Process a block header. Update the header MMR and corresponding header_head if this header /// Process a block header. Update the header MMR and corresponding header_head if this header

View file

@ -18,7 +18,7 @@ use std::sync::Arc;
use crate::chain::{self, SyncState, SyncStatus}; use crate::chain::{self, SyncState, SyncStatus};
use crate::common::types::Error; use crate::common::types::Error;
use crate::core::core::hash::{Hash, Hashed}; use crate::core::core::hash::Hash;
use crate::core::pow::Difficulty; use crate::core::pow::Difficulty;
use crate::p2p::{self, types::ReasonForBan, Capabilities, Peer}; use crate::p2p::{self, types::ReasonForBan, Capabilities, Peer};
@ -59,28 +59,10 @@ impl HeaderSync {
let enable_header_sync = match self.sync_state.status() { let enable_header_sync = match self.sync_state.status() {
SyncStatus::BodySync { .. } SyncStatus::BodySync { .. }
| SyncStatus::HeaderSync { .. } | SyncStatus::HeaderSync { .. }
| SyncStatus::TxHashsetDone => true, | SyncStatus::TxHashsetDone
SyncStatus::NoSync | SyncStatus::Initial | SyncStatus::AwaitingPeers(_) => { | SyncStatus::NoSync
let sync_head = self.chain.get_sync_head()?; | SyncStatus::Initial
debug!( | SyncStatus::AwaitingPeers(_) => true,
"sync: initial transition to HeaderSync. sync_head: {} at {}, resetting to: {} at {}",
sync_head.hash(),
sync_head.height,
header_head.hash(),
header_head.height,
);
// Reset sync_head to header_head on transition to HeaderSync,
// but ONLY on initial transition to HeaderSync state.
//
// The header_head and sync_head may diverge here in the presence of a fork
// in the header chain. Ensure we track the new advertised header chain here
// correctly, so reset any previous (and potentially stale) sync_head to match
// our last known "good" header_head.
//
self.chain.rebuild_sync_mmr(&header_head)?;
true
}
_ => false, _ => false,
}; };
@ -211,11 +193,9 @@ impl HeaderSync {
return None; return None;
} }
/// We build a locator based on sync_head. /// Build a locator based on header_head.
/// Even if sync_head is significantly out of date we will "reset" it once we
/// start getting headers back from a peer.
fn get_locator(&mut self) -> Result<Vec<Hash>, Error> { fn get_locator(&mut self) -> Result<Vec<Hash>, Error> {
let tip = self.chain.get_sync_head()?; let tip = self.chain.header_head()?;
let heights = get_locator_heights(tip.height); let heights = get_locator_heights(tip.height);
let locator = self.chain.get_locator_hashes(&heights)?; let locator = self.chain.get_locator_hashes(&heights)?;
Ok(locator) Ok(locator)