track sync_head on header_sync sync status (#3626)

* track sync_head on header_sync sync status
follow header fork as appropriate

* track highest_diff alongside highest_height on HeaderSync status

* cleanup

* not not
This commit is contained in:
Antioch Peverell 2021-04-06 11:16:20 +01:00 committed by GitHub
parent df91bff0f1
commit 34413c1cab
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 195 additions and 136 deletions

View file

@ -79,11 +79,12 @@ fn sync_status_to_api(sync_status: SyncStatus) -> (String, Option<serde_json::Va
SyncStatus::NoSync => ("no_sync".to_string(), None),
SyncStatus::AwaitingPeers(_) => ("awaiting_peers".to_string(), None),
SyncStatus::HeaderSync {
current_height,
sync_head,
highest_height,
..
} => (
"header_sync".to_string(),
Some(json!({ "current_height": current_height, "highest_height": highest_height })),
Some(json!({ "current_height": sync_head.height, "highest_height": highest_height })),
),
SyncStatus::TxHashsetDownload(stats) => (
"txhashset_download".to_string(),

View file

@ -404,19 +404,23 @@ impl Chain {
/// Attempt to add new headers to the header chain (or fork).
/// This is only ever used during sync and is based on sync_head.
/// We update header_head here if our total work increases.
pub fn sync_block_headers(&self, headers: &[BlockHeader], opts: Options) -> Result<(), Error> {
/// Returns the new sync_head (may temporarily diverge from header_head when syncing a long fork).
pub fn sync_block_headers(
&self,
headers: &[BlockHeader],
sync_head: Tip,
opts: Options,
) -> Result<Option<Tip>, Error> {
let mut header_pmmr = self.header_pmmr.write();
let mut txhashset = self.txhashset.write();
let batch = self.store.batch()?;
// Sync the chunk of block headers, updating header_head if total work increases.
{
let batch = self.store.batch()?;
let mut ctx = self.new_ctx(opts, batch, &mut header_pmmr, &mut txhashset)?;
pipe::process_block_headers(headers, &mut ctx)?;
let sync_head = pipe::process_block_headers(headers, sync_head, &mut ctx)?;
ctx.batch.commit()?;
}
Ok(())
Ok(sync_head)
}
/// Build a new block processing context.
@ -1410,12 +1414,20 @@ impl Chain {
}
/// Gets multiple headers at the provided heights.
pub fn get_locator_hashes(&self, heights: &[u64]) -> Result<Vec<Hash>, Error> {
let pmmr = self.header_pmmr.read();
heights
/// Note: This is based on the provided sync_head to support syncing against a fork.
pub fn get_locator_hashes(&self, sync_head: Tip, heights: &[u64]) -> Result<Vec<Hash>, Error> {
let mut header_pmmr = self.header_pmmr.write();
txhashset::header_extending_readonly(&mut header_pmmr, &self.store(), |ext, batch| {
let header = batch.get_block_header(&sync_head.hash())?;
pipe::rewind_and_apply_header_fork(&header, ext, batch)?;
let hashes = heights
.iter()
.map(|h| pmmr.get_header_hash_by_height(*h))
.collect()
.filter_map(|h| ext.get_header_hash_by_height(*h))
.collect();
Ok(hashes)
})
}
/// Builds an iterator on blocks starting from the current chain head and

View file

@ -176,48 +176,52 @@ pub fn process_block(
/// Process a batch of sequential block headers.
/// This is only used during header sync.
/// Will update header_head locally if this batch of headers increases total work.
/// Returns the updated sync_head, which may be on a fork.
pub fn process_block_headers(
headers: &[BlockHeader],
sync_head: Tip,
ctx: &mut BlockContext<'_>,
) -> Result<(), Error> {
) -> Result<Option<Tip>, Error> {
if headers.is_empty() {
return Ok(());
return Ok(None);
}
let last_header = headers.last().expect("last header");
// Check if we know about all these headers. If so we can accept them quickly.
// If they *do not* increase total work on the sync chain we are done.
// If they *do* increase total work then we should process them to update sync_head.
let head = {
let hash = ctx.header_pmmr.head_hash()?;
let header = ctx.batch.get_block_header(&hash)?;
Tip::from_header(&header)
};
if let Ok(existing) = ctx.batch.get_block_header(&last_header.hash()) {
if !has_more_work(&existing, &head) {
return Ok(());
}
}
let head = ctx.batch.header_head()?;
// Validate each header in the chunk and add to our db.
// Note: This batch may be rolled back later if the MMR does not validate successfully.
// Note: This batch may later be committed even if the MMR itself is rollbacked.
for header in headers {
validate_header(header, ctx)?;
add_block_header(header, &ctx.batch)?;
}
// Now apply this entire chunk of headers to the sync MMR (ctx is sync MMR specific).
// Now apply this entire chunk of headers to the header MMR.
txhashset::header_extending(&mut ctx.header_pmmr, &mut ctx.batch, |ext, batch| {
rewind_and_apply_header_fork(&last_header, ext, batch)?;
Ok(())
})?;
// If previous sync_head is not on the "current" chain then
// these headers are on an alternative fork to sync_head.
let alt_fork = !ext.is_on_current_chain(sync_head, batch)?;
// Update our "header_head" if this batch results in an increase in total work.
// Otherwise rollback this header extension.
// Note the outer batch may still be committed to db assuming no errors occur in the extension.
if has_more_work(last_header, &head) {
update_header_head(&Tip::from_header(last_header), &mut ctx.batch)?;
}
let header_head = last_header.into();
update_header_head(&header_head, &batch)?;
} else {
ext.force_rollback();
};
Ok(())
if alt_fork || has_more_work(last_header, &sync_head) {
Ok(Some(last_header.into()))
} else {
Ok(None)
}
})
}
/// Process a block header. Update the header MMR and corresponding header_head if this header
@ -500,7 +504,7 @@ fn add_block_header(bh: &BlockHeader, batch: &store::Batch<'_>) -> Result<(), Er
Ok(())
}
fn update_header_head(head: &Tip, batch: &mut store::Batch<'_>) -> Result<(), Error> {
fn update_header_head(head: &Tip, batch: &store::Batch<'_>) -> Result<(), Error> {
batch
.save_header_head(&head)
.map_err(|e| ErrorKind::StoreErr(e, "pipe save header head".to_owned()))?;
@ -513,7 +517,7 @@ fn update_header_head(head: &Tip, batch: &mut store::Batch<'_>) -> Result<(), Er
Ok(())
}
fn update_head(head: &Tip, batch: &mut store::Batch<'_>) -> Result<(), Error> {
fn update_head(head: &Tip, batch: &store::Batch<'_>) -> Result<(), Error> {
batch
.save_body_head(&head)
.map_err(|e| ErrorKind::StoreErr(e, "pipe save body".to_owned()))?;
@ -536,7 +540,7 @@ pub fn rewind_and_apply_header_fork(
) -> Result<(), Error> {
let mut fork_hashes = vec![];
let mut current = header.clone();
while current.height > 0 && ext.is_on_current_chain(&current, batch).is_err() {
while current.height > 0 && !ext.is_on_current_chain(&current, batch)? {
fork_hashes.push(current.hash());
current = batch.get_previous_header(&current)?;
}
@ -577,11 +581,7 @@ pub fn rewind_and_apply_fork(
// Rewind the txhashset extension back to common ancestor based on header MMR.
let mut current = batch.head_header()?;
while current.height > 0
&& header_extension
.is_on_current_chain(&current, batch)
.is_err()
{
while current.height > 0 && !header_extension.is_on_current_chain(&current, batch)? {
current = batch.get_previous_header(&current)?;
}
let fork_point = current;

View file

@ -915,6 +915,13 @@ impl<'a> HeaderExtension<'a> {
self.head.clone()
}
/// Get header hash by height.
/// Based on current header MMR.
pub fn get_header_hash_by_height(&self, height: u64) -> Option<Hash> {
let pos = pmmr::insertion_to_pmmr_index(height + 1);
self.get_header_hash(pos)
}
/// Get the header at the specified height based on the current state of the header extension.
/// Derives the MMR pos from the height (insertion index) and retrieves the header hash.
/// Looks the header up in the db by hash.
@ -923,8 +930,7 @@ impl<'a> HeaderExtension<'a> {
height: u64,
batch: &Batch<'_>,
) -> Result<BlockHeader, Error> {
let pos = pmmr::insertion_to_pmmr_index(height + 1);
if let Some(hash) = self.get_header_hash(pos) {
if let Some(hash) = self.get_header_hash_by_height(height) {
Ok(batch.get_block_header(&hash)?)
} else {
Err(ErrorKind::Other("get header by height".to_string()).into())
@ -933,20 +939,17 @@ impl<'a> HeaderExtension<'a> {
/// Compares the provided header to the header in the header MMR at that height.
/// If these match we know the header is on the current chain.
pub fn is_on_current_chain(
pub fn is_on_current_chain<T: Into<Tip>>(
&self,
header: &BlockHeader,
t: T,
batch: &Batch<'_>,
) -> Result<(), Error> {
if header.height > self.head.height {
return Err(ErrorKind::Other("not on current chain, out beyond".to_string()).into());
}
let chain_header = self.get_header_by_height(header.height, batch)?;
if chain_header.hash() == header.hash() {
Ok(())
} else {
Err(ErrorKind::Other("not on current chain".to_string()).into())
) -> Result<bool, Error> {
let t = t.into();
if t.height > self.head.height {
return Ok(false);
}
let chain_header = self.get_header_by_height(t.height, batch)?;
Ok(chain_header.hash() == t.hash())
}
/// Force the rollback of this extension, no matter the result.

View file

@ -38,7 +38,7 @@ bitflags! {
}
/// Various status sync can be in, whether it's fast sync or archival.
#[derive(Debug, Clone, Copy, Eq, PartialEq, Deserialize, Serialize)]
#[derive(Debug, Clone, Copy, PartialEq, Deserialize, Serialize)]
pub enum SyncStatus {
/// Initial State (we do not yet know if we are/should be syncing)
Initial,
@ -49,10 +49,12 @@ pub enum SyncStatus {
AwaitingPeers(bool),
/// Downloading block headers
HeaderSync {
/// current node height
current_height: u64,
/// current sync head
sync_head: Tip,
/// height of the most advanced peer
highest_height: u64,
/// diff of the most advanced peer
highest_diff: Difficulty,
},
/// Downloading the various txhashsets
TxHashsetDownload(TxHashsetDownloadStats),
@ -176,6 +178,17 @@ impl SyncState {
}
}
/// Update sync_head if state is currently HeaderSync.
pub fn update_header_sync(&self, new_sync_head: Tip) {
let status: &mut SyncStatus = &mut self.current.write();
match status {
SyncStatus::HeaderSync { sync_head, .. } => {
*sync_head = new_sync_head;
}
_ => (),
}
}
/// Update txhashset downloading progress
pub fn update_txhashset_download(&self, stats: TxHashsetDownloadStats) {
*self.current.write() = SyncStatus::TxHashsetDownload(stats);
@ -346,12 +359,7 @@ pub struct Tip {
impl Tip {
/// Creates a new tip based on provided header.
pub fn from_header(header: &BlockHeader) -> Tip {
Tip {
height: header.height,
last_block_h: header.hash(),
prev_block_h: header.prev_hash,
total_difficulty: header.total_difficulty(),
}
header.into()
}
}
@ -372,6 +380,16 @@ impl Default for Tip {
}
}
}
impl From<&BlockHeader> for Tip {
fn from(header: &BlockHeader) -> Tip {
Tip {
height: header.height,
last_block_h: header.hash(),
prev_block_h: header.prev_hash,
total_difficulty: header.total_difficulty(),
}
}
}
/// Serialization of a tip, required to save to datastore.
impl ser::Writeable for Tip {

View file

@ -321,9 +321,28 @@ where
return Ok(false);
}
// try to add headers to our header chain
match self.chain().sync_block_headers(bhs, chain::Options::SYNC) {
Ok(_) => Ok(true),
// Read our sync_head if we are in header_sync.
// If not then we can ignore this batch of headers.
let sync_head = match self.sync_state.status() {
SyncStatus::HeaderSync { sync_head, .. } => sync_head,
_ => {
debug!("headers_received: ignoring as not in header_sync");
return Ok(true);
}
};
match self
.chain()
.sync_block_headers(bhs, sync_head, chain::Options::SYNC)
{
Ok(sync_head) => {
// If we have an updated sync_head after processing this batch of headers
// then update our sync_state so we can request relevant headers in the next batch.
if let Some(sync_head) = sync_head {
self.sync_state.update_header_sync(sync_head);
}
Ok(true)
}
Err(e) => {
debug!("Block headers refused by chain: {:?}", e);
if e.is_bad_data() {

View file

@ -47,16 +47,9 @@ impl HeaderSync {
}
}
pub fn check_run(
&mut self,
header_head: &chain::Tip,
highest_height: u64,
) -> Result<bool, chain::Error> {
if !self.header_sync_due(header_head) {
return Ok(false);
}
let enable_header_sync = match self.sync_state.status() {
pub fn check_run(&mut self, sync_head: chain::Tip) -> Result<bool, chain::Error> {
// We only want to run header_sync for some sync states.
let do_run = match self.sync_state.status() {
SyncStatus::BodySync { .. }
| SyncStatus::HeaderSync { .. }
| SyncStatus::TxHashsetDone
@ -66,19 +59,41 @@ impl HeaderSync {
_ => false,
};
if enable_header_sync {
if !do_run {
return Ok(false);
}
// TODO - can we safely reuse the peer here across multiple runs?
let sync_peer = self.choose_sync_peer();
if let Some(sync_peer) = sync_peer {
let (peer_height, peer_diff) = {
let info = sync_peer.info.live_info.read();
(info.height, info.total_difficulty)
};
// Quick check - nothing to sync if we are caught up with the peer.
if peer_diff <= sync_head.total_difficulty {
return Ok(false);
}
if !self.header_sync_due(sync_head) {
return Ok(false);
}
self.sync_state.update(SyncStatus::HeaderSync {
current_height: header_head.height,
highest_height: highest_height,
sync_head,
highest_height: peer_height,
highest_diff: peer_diff,
});
self.syncing_peer = self.header_sync();
return Ok(true);
self.header_sync(sync_head, sync_peer.clone());
self.syncing_peer = Some(sync_peer.clone());
}
Ok(false)
Ok(true)
}
fn header_sync_due(&mut self, header_head: &chain::Tip) -> bool {
fn header_sync_due(&mut self, header_head: chain::Tip) -> bool {
let now = Utc::now();
let (timeout, latest_height, prev_height) = self.prev_header_sync;
@ -151,8 +166,7 @@ impl HeaderSync {
}
}
fn header_sync(&mut self) -> Option<Arc<Peer>> {
if let Ok(header_head) = self.chain.header_head() {
fn choose_sync_peer(&self) -> Option<Arc<Peer>> {
let peers_iter = || {
self.peers
.iter()
@ -165,39 +179,34 @@ impl HeaderSync {
let peers_iter = || peers_iter().with_difficulty(|x| x >= max_diff);
// Choose a random "most work" peer, preferring outbound if at all possible.
let peer = peers_iter().outbound().choose_random().or_else(|| {
peers_iter().outbound().choose_random().or_else(|| {
warn!("no suitable outbound peer for header sync, considering inbound");
peers_iter().inbound().choose_random()
});
})
}
if let Some(peer) = peer {
if peer.info.total_difficulty() > header_head.total_difficulty {
return self.request_headers(peer);
fn header_sync(&self, sync_head: chain::Tip, peer: Arc<Peer>) {
if peer.info.total_difficulty() > sync_head.total_difficulty {
self.request_headers(sync_head, peer);
}
}
}
return None;
}
/// Request some block headers from a peer to advance us.
fn request_headers(&mut self, peer: Arc<Peer>) -> Option<Arc<Peer>> {
if let Ok(locator) = self.get_locator() {
fn request_headers(&self, sync_head: chain::Tip, peer: Arc<Peer>) {
if let Ok(locator) = self.get_locator(sync_head) {
debug!(
"sync: request_headers: asking {} for headers, {:?}",
peer.info.addr, locator,
);
let _ = peer.send_header_request(locator);
return Some(peer);
}
return None;
}
/// Build a locator based on header_head.
fn get_locator(&mut self) -> Result<Vec<Hash>, Error> {
let tip = self.chain.header_head()?;
let heights = get_locator_heights(tip.height);
let locator = self.chain.get_locator_hashes(&heights)?;
fn get_locator(&self, sync_head: chain::Tip) -> Result<Vec<Hash>, Error> {
let heights = get_locator_heights(sync_head.height);
let locator = self.chain.get_locator_hashes(sync_head, &heights)?;
Ok(locator)
}
}

View file

@ -133,19 +133,6 @@ impl StateSync {
.set_sync_error(chain::ErrorKind::SyncError(format!("{:?}", e)).into()),
}
// to avoid the confusing log,
// update the final HeaderSync state mainly for 'current_height'
self.sync_state.update_if(
SyncStatus::HeaderSync {
current_height: header_head.height,
highest_height,
},
|s| match s {
SyncStatus::HeaderSync { .. } => true,
_ => false,
},
);
self.sync_state
.update(SyncStatus::TxHashsetDownload(Default::default()));
}

View file

@ -192,9 +192,18 @@ impl SyncRunner {
let tail = self.chain.tail().unwrap_or_else(|_| head.clone());
let header_head = unwrap_or_restart_loop!(self.chain.header_head());
// "sync_head" allows us to sync against a large fork on the header chain
// we track this during an extended header sync
let sync_status = self.sync_state.status();
let sync_head = match sync_status {
SyncStatus::HeaderSync { sync_head, .. } => sync_head,
_ => header_head,
};
// run each sync stage, each of them deciding whether they're needed
// except for state sync that only runs if body sync return true (means txhashset is needed)
unwrap_or_restart_loop!(header_sync.check_run(&header_head, highest_height));
unwrap_or_restart_loop!(header_sync.check_run(sync_head));
let mut check_state_sync = false;
match self.sync_state.status() {
@ -206,7 +215,7 @@ impl SyncRunner {
| SyncStatus::TxHashsetDone => check_state_sync = true,
_ => {
// skip body sync if header chain is not synced.
if header_head.height < highest_height {
if sync_head.height < highest_height {
continue;
}

View file

@ -39,13 +39,14 @@ impl TUIStatusView {
SyncStatus::NoSync => Cow::Borrowed("Running"),
SyncStatus::AwaitingPeers(_) => Cow::Borrowed("Waiting for peers"),
SyncStatus::HeaderSync {
current_height,
sync_head,
highest_height,
..
} => {
let percent = if highest_height == 0 {
0
} else {
current_height * 100 / highest_height
sync_head.height * 100 / highest_height
};
Cow::Owned(format!("Sync step 1/7: Downloading headers: {}%", percent))
}