read header_head and sync_head from the header and sync MMR respectively (#3045)

no need to maintain header_head and sync_head in the db explicitly
This commit is contained in:
Antioch Peverell 2019-10-29 16:47:08 +00:00 committed by GitHub
parent a39ff54a7f
commit a362888ab9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 95 additions and 187 deletions

View file

@ -201,7 +201,6 @@ impl Chain {
&mut sync_pmmr,
&mut txhashset,
)?;
Chain::log_heads(&store)?;
let chain = Chain {
db_root,
@ -226,6 +225,8 @@ impl Chain {
chain.rebuild_height_for_pos()?;
}
chain.log_heads()?;
Ok(chain)
}
@ -244,46 +245,22 @@ impl Chain {
self.store.clone()
}
fn log_heads(store: &store::ChainStore) -> Result<(), Error> {
let head = store.head()?;
debug!(
"init: head: {} @ {} [{}]",
head.total_difficulty.to_num(),
head.height,
head.last_block_h,
);
let header_head = store.header_head()?;
debug!(
"init: header_head: {} @ {} [{}]",
header_head.total_difficulty.to_num(),
header_head.height,
header_head.last_block_h,
);
let sync_head = store.get_sync_head()?;
debug!(
"init: sync_head: {} @ {} [{}]",
sync_head.total_difficulty.to_num(),
sync_head.height,
sync_head.last_block_h,
);
fn log_heads(&self) -> Result<(), Error> {
let log_head = |name, head: Tip| {
debug!(
"{}: {} @ {} [{}]",
name,
head.total_difficulty.to_num(),
head.height,
head.hash(),
);
};
log_head("head", self.head()?);
log_head("header_head", self.header_head()?);
log_head("sync_head", self.get_sync_head()?);
Ok(())
}
/// Reset sync_head to current header_head.
/// We do this when we first transition to header_sync to ensure we extend
/// the "sync" header MMR from a known consistent state and to ensure we track
/// the header chain correctly at the fork point.
pub fn reset_sync_head(&self) -> Result<Tip, Error> {
let batch = self.store.batch()?;
batch.reset_sync_head()?;
let head = batch.get_sync_head()?;
batch.commit()?;
Ok(head)
}
/// Processes a single block, then checks for orphans, processing
/// those as well if they're found
pub fn process_block(&self, b: Block, opts: Options) -> Result<Option<Tip>, Error> {
@ -773,9 +750,8 @@ impl Chain {
pub fn rebuild_sync_mmr(&self, head: &Tip) -> Result<(), Error> {
let mut sync_pmmr = self.sync_pmmr.write();
let mut batch = self.store.batch()?;
let sync_head = batch.get_sync_head()?;
let header = batch.get_block_header(&head.hash())?;
txhashset::header_extending(&mut sync_pmmr, &sync_head, &mut batch, |extension| {
txhashset::header_extending(&mut sync_pmmr, &mut batch, |extension| {
pipe::rewind_and_apply_header_fork(&header, extension)?;
Ok(())
})?;
@ -1217,9 +1193,9 @@ impl Chain {
/// Tip (head) of the header chain.
pub fn header_head(&self) -> Result<Tip, Error> {
self.store
.header_head()
.map_err(|e| ErrorKind::StoreErr(e, "chain header head".to_owned()).into())
let hash = self.header_pmmr.read().head_hash()?;
let header = self.store.get_block_header(&hash)?;
Ok(Tip::from_header(&header))
}
/// Block header for the chain head
@ -1429,9 +1405,9 @@ impl Chain {
/// Get the tip of the current "sync" header chain.
/// This may be significantly different to current header chain.
pub fn get_sync_head(&self) -> Result<Tip, Error> {
self.store
.get_sync_head()
.map_err(|e| ErrorKind::StoreErr(e, "chain get sync head".to_owned()).into())
let hash = self.sync_pmmr.read().head_hash()?;
let header = self.store.get_block_header(&hash)?;
Ok(Tip::from_header(&header))
}
/// Builds an iterator on blocks starting from the current chain head and
@ -1460,6 +1436,22 @@ fn setup_head(
) -> Result<(), Error> {
let mut batch = store.batch()?;
// Apply the genesis header to header and sync MMRs to ensure they are non-empty.
// We read header_head and sync_head directly from the MMR and assume they are non-empty.
{
if header_pmmr.last_pos == 0 {
txhashset::header_extending(header_pmmr, &mut batch, |extension| {
extension.apply_header(&genesis.header)
})?;
}
if sync_pmmr.last_pos == 0 {
txhashset::header_extending(sync_pmmr, &mut batch, |extension| {
extension.apply_header(&genesis.header)
})?;
}
}
// check if we have a head in store, otherwise the genesis block is it
let head_res = batch.head();
let mut head: Tip;
@ -1534,13 +1526,7 @@ fn setup_head(
// We will update this later once we have the correct header_root.
batch.save_block_header(&genesis.header)?;
batch.save_block(&genesis)?;
let tip = Tip::from_header(&genesis.header);
// Save these ahead of time as we need head and header_head to be initialized
// with *something* when creating a txhashset extension below.
batch.save_body_head(&tip)?;
batch.save_header_head(&tip)?;
batch.save_body_head(&Tip::from_header(&genesis.header))?;
if genesis.kernels().len() > 0 {
let (utxo_sum, kernel_sum) = (sums, genesis as &dyn Committed).verify_kernel_sums(
@ -1552,14 +1538,6 @@ fn setup_head(
kernel_sum,
};
}
txhashset::header_extending(header_pmmr, &tip, &mut batch, |extension| {
extension.apply_header(&genesis.header)?;
Ok(())
})?;
txhashset::header_extending(sync_pmmr, &tip, &mut batch, |extension| {
extension.apply_header(&genesis.header)?;
Ok(())
})?;
txhashset::extending(header_pmmr, txhashset, &mut batch, |ext| {
let ref mut extension = ext.extension;
extension.apply_block(&genesis)?;
@ -1575,29 +1553,6 @@ fn setup_head(
}
Err(e) => return Err(ErrorKind::StoreErr(e, "chain init load head".to_owned()))?,
};
// Check we have the header corresponding to the header_head.
// If not then something is corrupted and we should reset our header_head.
// Either way we want to reset sync_head to match header_head.
let head = batch.head()?;
let header_head = batch.header_head()?;
if batch.get_block_header(&header_head.last_block_h).is_ok() {
// Reset sync_head to be consistent with current header_head.
batch.reset_sync_head()?;
} else {
// Reset both header_head and sync_head to be consistent with current head.
warn!(
"setup_head: header missing for {}, {}, resetting header_head and sync_head to head: {}, {}",
header_head.last_block_h,
header_head.height,
head.last_block_h,
head.height,
);
batch.reset_header_head()?;
batch.reset_sync_head()?;
}
batch.commit()?;
Ok(())
}

View file

@ -183,7 +183,12 @@ pub fn sync_block_headers(
// Check if we know about all these headers. If so we can accept them quickly.
// If they *do not* increase total work on the sync chain we are done.
// If they *do* increase total work then we should process them to update sync_head.
let sync_head = ctx.batch.get_sync_head()?;
let sync_head = {
let hash = ctx.header_pmmr.head_hash()?;
let header = ctx.batch.get_block_header(&hash)?;
Tip::from_header(&header)
};
if let Ok(existing) = ctx.batch.get_block_header(&last_header.hash()) {
if !has_more_work(&existing, &sync_head) {
return Ok(());
@ -197,16 +202,12 @@ pub fn sync_block_headers(
add_block_header(header, &ctx.batch)?;
}
// Now apply this entire chunk of headers to the sync MMR.
txhashset::header_extending(&mut ctx.header_pmmr, &sync_head, &mut ctx.batch, |ext| {
// Now apply this entire chunk of headers to the sync MMR (ctx is sync MMR specific).
txhashset::header_extending(&mut ctx.header_pmmr, &mut ctx.batch, |ext| {
rewind_and_apply_header_fork(&last_header, ext)?;
Ok(())
})?;
if has_more_work(&last_header, &sync_head) {
update_sync_head(&Tip::from_header(&last_header), &mut ctx.batch)?;
}
Ok(())
}
@ -227,14 +228,19 @@ pub fn process_block_header(header: &BlockHeader, ctx: &mut BlockContext<'_>) ->
// If it does not increase total_difficulty beyond our current header_head
// then we can (re)accept this header and process the full block (or request it).
// This header is on a fork and we should still accept it as the fork may eventually win.
let header_head = ctx.batch.header_head()?;
let header_head = {
let hash = ctx.header_pmmr.head_hash()?;
let header = ctx.batch.get_block_header(&hash)?;
Tip::from_header(&header)
};
if let Ok(existing) = ctx.batch.get_block_header(&header.hash()) {
if !has_more_work(&existing, &header_head) {
return Ok(());
}
}
txhashset::header_extending(&mut ctx.header_pmmr, &header_head, &mut ctx.batch, |ext| {
txhashset::header_extending(&mut ctx.header_pmmr, &mut ctx.batch, |ext| {
rewind_and_apply_header_fork(&prev_header, ext)?;
ext.validate_root(header)?;
ext.apply_header(header)?;
@ -247,15 +253,6 @@ pub fn process_block_header(header: &BlockHeader, ctx: &mut BlockContext<'_>) ->
validate_header(header, ctx)?;
add_block_header(header, &ctx.batch)?;
// Update header_head independently of chain head (full blocks).
// If/when we process the corresponding full block we will update the
// chain head to match. This allows our header chain to extend safely beyond
// the full chain in a fork scenario without needing excessive rewinds to handle
// the temporarily divergent chains.
if has_more_work(&header, &header_head) {
update_header_head(&Tip::from_header(&header), &mut ctx.batch)?;
}
Ok(())
}
@ -470,30 +467,6 @@ fn has_more_work(header: &BlockHeader, head: &Tip) -> bool {
header.total_difficulty() > head.total_difficulty
}
/// Update the sync head so we can keep syncing from where we left off.
fn update_sync_head(head: &Tip, batch: &mut store::Batch<'_>) -> Result<(), Error> {
batch
.save_sync_head(&head)
.map_err(|e| ErrorKind::StoreErr(e, "pipe save sync head".to_owned()))?;
debug!(
"sync_head updated to {} at {}",
head.last_block_h, head.height
);
Ok(())
}
/// Update the header_head.
fn update_header_head(head: &Tip, batch: &mut store::Batch<'_>) -> Result<(), Error> {
batch
.save_header_head(&head)
.map_err(|e| ErrorKind::StoreErr(e, "pipe save header head".to_owned()))?;
debug!(
"header_head updated to {} at {}",
head.last_block_h, head.height
);
Ok(())
}
/// Rewind the header chain and reapply headers on a fork.
pub fn rewind_and_apply_header_fork(
header: &BlockHeader,

View file

@ -32,8 +32,6 @@ const BLOCK_HEADER_PREFIX: u8 = 'h' as u8;
const BLOCK_PREFIX: u8 = 'b' as u8;
const HEAD_PREFIX: u8 = 'H' as u8;
const TAIL_PREFIX: u8 = 'T' as u8;
const HEADER_HEAD_PREFIX: u8 = 'I' as u8;
const SYNC_HEAD_PREFIX: u8 = 's' as u8;
const COMMIT_POS_PREFIX: u8 = 'c' as u8;
const COMMIT_POS_HGT_PREFIX: u8 = 'p' as u8;
const BLOCK_INPUT_BITMAP_PREFIX: u8 = 'B' as u8;
@ -79,20 +77,6 @@ impl ChainStore {
self.get_block_header(&self.head()?.last_block_h)
}
/// Head of the header chain (not the same thing as head_header).
pub fn header_head(&self) -> Result<Tip, Error> {
option_to_not_found(self.db.get_ser(&vec![HEADER_HEAD_PREFIX]), || {
"HEADER_HEAD".to_owned()
})
}
/// The "sync" head.
pub fn get_sync_head(&self) -> Result<Tip, Error> {
option_to_not_found(self.db.get_ser(&vec![SYNC_HEAD_PREFIX]), || {
"SYNC_HEAD".to_owned()
})
}
/// Get full block.
pub fn get_block(&self, h: &Hash) -> Result<Block, Error> {
option_to_not_found(
@ -198,20 +182,6 @@ impl<'a> Batch<'a> {
self.get_block_header(&self.head()?.last_block_h)
}
/// Head of the header chain (not the same thing as head_header).
pub fn header_head(&self) -> Result<Tip, Error> {
option_to_not_found(self.db.get_ser(&vec![HEADER_HEAD_PREFIX]), || {
"HEADER_HEAD".to_owned()
})
}
/// Get "sync" head.
pub fn get_sync_head(&self) -> Result<Tip, Error> {
option_to_not_found(self.db.get_ser(&vec![SYNC_HEAD_PREFIX]), || {
"SYNC_HEAD".to_owned()
})
}
/// Save body head to db.
pub fn save_body_head(&self, t: &Tip) -> Result<(), Error> {
self.db.put_ser(&vec![HEAD_PREFIX], t)
@ -222,28 +192,6 @@ impl<'a> Batch<'a> {
self.db.put_ser(&vec![TAIL_PREFIX], t)
}
/// Save header_head to db.
pub fn save_header_head(&self, t: &Tip) -> Result<(), Error> {
self.db.put_ser(&vec![HEADER_HEAD_PREFIX], t)
}
/// Save "sync" head to db.
pub fn save_sync_head(&self, t: &Tip) -> Result<(), Error> {
self.db.put_ser(&vec![SYNC_HEAD_PREFIX], t)
}
/// Reset sync_head to the current head of the header chain.
pub fn reset_sync_head(&self) -> Result<(), Error> {
let head = self.header_head()?;
self.save_sync_head(&head)
}
/// Reset header_head to the current head of the body chain.
pub fn reset_header_head(&self) -> Result<(), Error> {
let tip = self.head()?;
self.save_header_head(&tip)
}
/// get block
pub fn get_block(&self, h: &Hash) -> Result<Block, Error> {
option_to_not_found(

View file

@ -86,6 +86,21 @@ impl PMMRHandle<BlockHeader> {
Err(ErrorKind::Other(format!("get header hash by height")).into())
}
}
/// Get the header hash for the head of the header chain based on current MMR state.
/// Find the last leaf pos based on MMR size and return its header hash.
pub fn head_hash(&self) -> Result<Hash, Error> {
if self.last_pos == 0 {
return Err(ErrorKind::Other(format!("MMR empty, no head")).into());
}
let header_pmmr = ReadonlyPMMR::at(&self.backend, self.last_pos);
let leaf_pos = pmmr::bintree_rightmost(self.last_pos);
if let Some(entry) = header_pmmr.get_data(leaf_pos) {
Ok(entry.hash())
} else {
Err(ErrorKind::Other(format!("failed to find head hash")).into())
}
}
}
/// An easy to manipulate structure holding the 3 sum trees necessary to
@ -436,7 +451,13 @@ where
trace!("Starting new txhashset (readonly) extension.");
let head = batch.head()?;
let header_head = batch.header_head()?;
// Find header head based on current header MMR (the rightmost leaf node in the MMR).
let header_head = {
let hash = handle.head_hash()?;
let header = batch.get_block_header(&hash)?;
Tip::from_header(&header)
};
let res = {
let header_pmmr = PMMR::at(&mut handle.backend, handle.last_pos);
@ -532,7 +553,13 @@ where
let rollback: bool;
let head = batch.head()?;
let header_head = batch.header_head()?;
// Find header head based on current header MMR (the rightmost leaf node in the MMR).
let header_head = {
let hash = header_pmmr.head_hash()?;
let header = batch.get_block_header(&hash)?;
Tip::from_header(&header)
};
// create a child transaction so if the state is rolled back by itself, all
// index saving can be undone
@ -588,12 +615,11 @@ where
}
}
/// Start a new header MMR unit of work. This MMR tracks the header_head.
/// Start a new header MMR unit of work.
/// This MMR can be extended individually beyond the other (output, rangeproof and kernel) MMRs
/// to allow headers to be validated before we receive the full block data.
pub fn header_extending<'a, F, T>(
handle: &'a mut PMMRHandle<BlockHeader>,
head: &Tip,
batch: &'a mut Batch<'_>,
inner: F,
) -> Result<T, Error>
@ -607,9 +633,19 @@ where
// create a child transaction so if the state is rolled back by itself, all
// index saving can be undone
let child_batch = batch.child()?;
// Find chain head based on current MMR (the rightmost leaf node in the MMR).
let head = match handle.head_hash() {
Ok(hash) => {
let header = child_batch.get_block_header(&hash)?;
Tip::from_header(&header)
}
Err(_) => Tip::default(),
};
{
let pmmr = PMMR::at(&mut handle.backend, handle.last_pos);
let mut extension = HeaderExtension::new(pmmr, &child_batch, head.clone());
let mut extension = HeaderExtension::new(pmmr, &child_batch, head);
res = inner(&mut extension);
rollback = extension.rollback;

View file

@ -59,7 +59,6 @@ impl ChainAdapter {
batch.save_block_header(header).unwrap();
batch.save_body_head(&tip).unwrap();
batch.save_header_head(&tip).unwrap();
// Retrieve previous block_sums from the db.
let prev_sums = if let Ok(prev_sums) = batch.get_block_sums(&tip.prev_block_h) {

View file

@ -81,9 +81,6 @@ impl HeaderSync {
// correctly, so reset any previous (and potentially stale) sync_head to match
// our last known "good" header_head.
//
self.chain.reset_sync_head()?;
// Rebuild the sync MMR to match our updated sync_head.
self.chain.rebuild_sync_mmr(&header_head)?;
self.history_locator.retain(|&x| x.0 == 0);