Enable block archival sync (#3579)

* wip - body sync for full archive

* allow chain compaction during sync

* placeholder for logic to ensure archive nodes sync from archive nodes

* body sync from archival peers

* allow chain compaction during sync

* placeholder for logic to ensure archive nodes sync from archive nodes
This commit is contained in:
Antioch Peverell 2021-03-16 12:04:09 +00:00 committed by GitHub
parent 846b8f82e8
commit 6690b25f05
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 51 additions and 26 deletions

View file

@ -226,6 +226,11 @@ impl Chain {
Ok(chain) Ok(chain)
} }
/// Are we running with archive_mode enabled?
pub fn archive_mode(&self) -> bool {
self.archive_mode
}
/// Return our shared header MMR handle. /// Return our shared header MMR handle.
pub fn header_pmmr(&self) -> Arc<RwLock<PMMRHandle<BlockHeader>>> { pub fn header_pmmr(&self) -> Arc<RwLock<PMMRHandle<BlockHeader>>> {
self.header_pmmr.clone() self.header_pmmr.clone()
@ -889,6 +894,11 @@ impl Chain {
/// If beyond the horizon then we cannot sync via recent full blocks /// If beyond the horizon then we cannot sync via recent full blocks
/// and we need a state (txhashset) sync. /// and we need a state (txhashset) sync.
pub fn check_txhashset_needed(&self, fork_point: &BlockHeader) -> Result<bool, Error> { pub fn check_txhashset_needed(&self, fork_point: &BlockHeader) -> Result<bool, Error> {
if self.archive_mode() {
debug!("check_txhashset_needed: we are running with archive_mode=true, not needed");
return Ok(false);
}
let header_head = self.header_head()?; let header_head = self.header_head()?;
let horizon = global::cut_through_horizon() as u64; let horizon = global::cut_through_horizon() as u64;
Ok(fork_point.height < header_head.height.saturating_sub(horizon)) Ok(fork_point.height < header_head.height.saturating_sub(horizon))
@ -1075,7 +1085,7 @@ impl Chain {
header_pmmr: &txhashset::PMMRHandle<BlockHeader>, header_pmmr: &txhashset::PMMRHandle<BlockHeader>,
batch: &store::Batch<'_>, batch: &store::Batch<'_>,
) -> Result<(), Error> { ) -> Result<(), Error> {
if self.archive_mode { if self.archive_mode() {
return Ok(()); return Ok(());
} }
@ -1161,13 +1171,14 @@ impl Chain {
} }
// If we are not in archival mode remove historical blocks from the db. // If we are not in archival mode remove historical blocks from the db.
if !self.archive_mode { if !self.archive_mode() {
self.remove_historical_blocks(&header_pmmr, &batch)?; self.remove_historical_blocks(&header_pmmr, &batch)?;
} }
// Make sure our output_pos index is consistent with the UTXO set. // Make sure our output_pos index is consistent with the UTXO set.
txhashset.init_output_pos_index(&header_pmmr, &batch)?; txhashset.init_output_pos_index(&header_pmmr, &batch)?;
// TODO - Why is this part of chain compaction?
// Rebuild our NRD kernel_pos index based on recent kernel history. // Rebuild our NRD kernel_pos index based on recent kernel history.
txhashset.init_recent_kernel_pos_index(&header_pmmr, &batch)?; txhashset.init_recent_kernel_pos_index(&header_pmmr, &batch)?;

View file

@ -704,11 +704,6 @@ where
} }
fn check_compact(&self) { fn check_compact(&self) {
// Skip compaction if we are syncing.
if self.sync_state.is_syncing() {
return;
}
// Roll the dice to trigger compaction at 1/COMPACTION_CHECK chance per block, // Roll the dice to trigger compaction at 1/COMPACTION_CHECK chance per block,
// uses a different thread to avoid blocking the caller thread (likely a peer) // uses a different thread to avoid blocking the caller thread (likely a peer)
let mut rng = thread_rng(); let mut rng = thread_rng();

View file

@ -14,6 +14,7 @@
use chrono::prelude::{DateTime, Utc}; use chrono::prelude::{DateTime, Utc};
use chrono::Duration; use chrono::Duration;
use p2p::Capabilities;
use rand::prelude::*; use rand::prelude::*;
use std::cmp; use std::cmp;
use std::sync::Arc; use std::sync::Arc;
@ -71,6 +72,11 @@ impl BodySync {
Ok(false) Ok(false)
} }
/// Is our local node running in archive_mode?
fn archive_mode(&self) -> bool {
self.chain.archive_mode()
}
/// Return true if txhashset download is needed (when requested block is under the horizon). /// Return true if txhashset download is needed (when requested block is under the horizon).
/// Otherwise go request some missing blocks and return false. /// Otherwise go request some missing blocks and return false.
fn body_sync(&mut self) -> Result<bool, chain::Error> { fn body_sync(&mut self) -> Result<bool, chain::Error> {
@ -85,10 +91,20 @@ impl BodySync {
return Ok(true); return Ok(true);
} }
let peers = {
// Find connected peers with strictly greater difficulty than us. // Find connected peers with strictly greater difficulty than us.
let peers_iter = || { let peers_iter = || {
// If we are running with archive mode enabled we only want to sync
// from other archive nodes.
let cap = if self.archive_mode() {
Capabilities::BLOCK_HIST
} else {
Capabilities::UNKNOWN
};
self.peers self.peers
.iter() .iter()
.with_capabilities(cap)
.with_difficulty(|x| x > head.total_difficulty) .with_difficulty(|x| x > head.total_difficulty)
.connected() .connected()
}; };
@ -106,6 +122,9 @@ impl BodySync {
return Ok(false); return Ok(false);
} }
peers
};
// if we have 5 peers to sync from then ask for 50 blocks total (peer_count * // if we have 5 peers to sync from then ask for 50 blocks total (peer_count *
// 10) max will be 80 if all 8 peers are advertising more work // 10) max will be 80 if all 8 peers are advertising more work
// also if the chain is already saturated with orphans, throttle // also if the chain is already saturated with orphans, throttle