From 12be191ecd8433516d867d332e3fad6444888c8c Mon Sep 17 00:00:00 2001 From: Antioch Peverell Date: Thu, 1 Nov 2018 09:51:32 +0000 Subject: [PATCH] validate root of header MMR when processing headers (during sync and full blocks) (#1836) validate root of header MMR when processing headers (during sync and full blocks) (#1836) --- api/src/handlers.rs | 15 +- api/src/types.rs | 40 +++--- chain/src/chain.rs | 157 +++++++++++++++------ chain/src/pipe.rs | 188 ++++++++++++++++---------- chain/src/store.rs | 46 +++++-- chain/src/txhashset/txhashset.rs | 17 ++- chain/src/types.rs | 30 ++-- chain/tests/data_file_integrity.rs | 11 +- chain/tests/mine_simple_chain.rs | 49 +++---- chain/tests/store_indices.rs | 40 ++++-- chain/tests/test_coinbase_maturity.rs | 8 +- chain/tests/test_txhashset.rs | 5 +- core/src/core/block.rs | 16 +-- core/src/core/pmmr/db_pmmr.rs | 16 ++- core/src/core/pmmr/pmmr.rs | 4 +- core/src/genesis.rs | 10 +- pool/tests/block_building.rs | 23 ++-- pool/tests/block_reconciliation.rs | 17 ++- pool/tests/common/mod.rs | 7 +- servers/src/common/adapters.rs | 17 ++- servers/src/grin/sync/body_sync.rs | 2 +- servers/src/grin/sync/header_sync.rs | 36 ++--- servers/src/grin/sync/state_sync.rs | 5 +- servers/src/mining/mine_block.rs | 2 +- servers/src/mining/test_miner.rs | 5 +- wallet/tests/common/mod.rs | 2 +- 26 files changed, 466 insertions(+), 302 deletions(-) diff --git a/api/src/handlers.rs b/api/src/handlers.rs index d42bab871..b280568b8 100644 --- a/api/src/handlers.rs +++ b/api/src/handlers.rs @@ -25,7 +25,7 @@ use hyper::{Body, Request, StatusCode}; use auth::BasicAuthMiddleware; use chain; use core::core::hash::{Hash, Hashed}; -use core::core::{OutputFeatures, OutputIdentifier, Transaction}; +use core::core::{BlockHeader, OutputFeatures, OutputIdentifier, Transaction}; use core::ser; use p2p; use p2p::types::{PeerInfoDisplay, ReasonForBan}; @@ -569,7 +569,9 @@ impl HeaderHandler { } if let Ok(height) = input.parse() { match w(&self.chain).get_header_by_height(height) { - Ok(header) => return Ok(BlockHeaderPrintable::from_header(&header)), + Ok(header) => { + return self.convert_header(&header); + } Err(_) => return Err(ErrorKind::NotFound)?, } } @@ -580,13 +582,18 @@ impl HeaderHandler { let header = w(&self.chain) .get_block_header(&h) .context(ErrorKind::NotFound)?; - Ok(BlockHeaderPrintable::from_header(&header)) + self.convert_header(&header) + } + + /// Convert a header into a "printable" version for json serialization. + fn convert_header(&self, header: &BlockHeader) -> Result { + return Ok(BlockHeaderPrintable::from_header(header)); } fn get_header_for_output(&self, commit_id: String) -> Result { let oid = get_output(&self.chain, &commit_id)?.1; match w(&self.chain).get_header_for_output(&oid) { - Ok(header) => return Ok(BlockHeaderPrintable::from_header(&header)), + Ok(header) => return self.convert_header(&header), Err(_) => return Err(ErrorKind::NotFound)?, } } diff --git a/api/src/types.rs b/api/src/types.rs index 82a2a5ca9..379cfc923 100644 --- a/api/src/types.rs +++ b/api/src/types.rs @@ -472,11 +472,11 @@ pub struct BlockHeaderInfo { } impl BlockHeaderInfo { - pub fn from_header(h: &core::BlockHeader) -> BlockHeaderInfo { + pub fn from_header(header: &core::BlockHeader) -> BlockHeaderInfo { BlockHeaderInfo { - hash: util::to_hex(h.hash().to_vec()), - height: h.height, - previous: util::to_hex(h.previous.to_vec()), + hash: util::to_hex(header.hash().to_vec()), + height: header.height, + previous: util::to_hex(header.prev_hash.to_vec()), } } } @@ -516,23 +516,23 @@ pub struct BlockHeaderPrintable { } impl BlockHeaderPrintable { - pub fn from_header(h: &core::BlockHeader) -> BlockHeaderPrintable { + pub fn from_header(header: &core::BlockHeader) -> BlockHeaderPrintable { BlockHeaderPrintable { - hash: util::to_hex(h.hash().to_vec()), - version: h.version, - height: h.height, - previous: util::to_hex(h.previous.to_vec()), - prev_root: util::to_hex(h.prev_root.to_vec()), - timestamp: h.timestamp.to_rfc3339(), - output_root: util::to_hex(h.output_root.to_vec()), - range_proof_root: util::to_hex(h.range_proof_root.to_vec()), - kernel_root: util::to_hex(h.kernel_root.to_vec()), - nonce: h.pow.nonce, - edge_bits: h.pow.edge_bits(), - cuckoo_solution: h.pow.proof.nonces.clone(), - total_difficulty: h.pow.total_difficulty.to_num(), - secondary_scaling: h.pow.secondary_scaling, - total_kernel_offset: h.total_kernel_offset.to_hex(), + hash: util::to_hex(header.hash().to_vec()), + version: header.version, + height: header.height, + previous: util::to_hex(header.prev_hash.to_vec()), + prev_root: util::to_hex(header.prev_root.to_vec()), + timestamp: header.timestamp.to_rfc3339(), + output_root: util::to_hex(header.output_root.to_vec()), + range_proof_root: util::to_hex(header.range_proof_root.to_vec()), + kernel_root: util::to_hex(header.kernel_root.to_vec()), + nonce: header.pow.nonce, + edge_bits: header.pow.edge_bits(), + cuckoo_solution: header.pow.proof.nonces.clone(), + total_difficulty: header.pow.total_difficulty.to_num(), + secondary_scaling: header.pow.secondary_scaling, + total_kernel_offset: header.total_kernel_offset.to_hex(), } } } diff --git a/chain/src/chain.rs b/chain/src/chain.rs index f08c501c1..5ffd58d2e 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -181,13 +181,35 @@ impl Chain { setup_head(genesis.clone(), store.clone(), &mut txhashset)?; - let head = store.head()?; - debug!( - "Chain init: {} @ {} [{}]", - head.total_difficulty.to_num(), - head.height, - head.last_block_h, - ); + { + let head = store.head()?; + debug!( + "init: head: {} @ {} [{}]", + head.total_difficulty.to_num(), + head.height, + head.last_block_h, + ); + } + + { + let header_head = store.header_head()?; + debug!( + "init: header_head: {} @ {} [{}]", + header_head.total_difficulty.to_num(), + header_head.height, + header_head.last_block_h, + ); + } + + { + let sync_head = store.get_sync_head()?; + debug!( + "init: sync_head: {} @ {} [{}]", + sync_head.total_difficulty.to_num(), + sync_head.height, + sync_head.last_block_h, + ); + } Ok(Chain { db_root: db_root, @@ -475,15 +497,23 @@ impl Chain { }) } - /// Sets the txhashset roots on a brand new block by applying the block on - /// the current txhashset state. - pub fn set_txhashset_roots(&self, b: &mut Block, is_fork: bool) -> Result<(), Error> { + /// *** Only used in tests. *** + /// Convenience for setting roots on a block header when + /// creating a chain fork during tests. + pub fn set_txhashset_roots_forked( + &self, + b: &mut Block, + prev: &BlockHeader, + ) -> Result<(), Error> { + let prev_block = self.get_block(&prev.hash())?; let mut txhashset = self.txhashset.write(); let (prev_root, roots, sizes) = txhashset::extending_readonly(&mut txhashset, |extension| { - if is_fork { - pipe::rewind_and_apply_fork(b, extension)?; - } + // Put the txhashset in the correct state as of the previous block. + // We cannot use the new block to do this because we have no + // explicit previous linkage (and prev_root not yet setup). + pipe::rewind_and_apply_fork(&prev_block, extension)?; + extension.apply_block(&prev_block)?; // Retrieve the header root before we apply the new block let prev_root = extension.header_root(); @@ -513,6 +543,40 @@ impl Chain { Ok(()) } + /// Sets the txhashset roots on a brand new block by applying the block on + /// the current txhashset state. + pub fn set_txhashset_roots(&self, b: &mut Block) -> Result<(), Error> { + let mut txhashset = self.txhashset.write(); + let (prev_root, roots, sizes) = + txhashset::extending_readonly(&mut txhashset, |extension| { + // Retrieve the header root before we apply the new block + let prev_root = extension.header_root(); + + // Apply the latest block to the chain state via the extension. + extension.apply_block(b)?; + + Ok((prev_root, extension.roots(), extension.sizes())) + })?; + + // Set the prev_root on the header. + b.header.prev_root = prev_root; + + // Set the output, rangeproof and kernel MMR roots. + b.header.output_root = roots.output_root; + b.header.range_proof_root = roots.rproof_root; + b.header.kernel_root = roots.kernel_root; + + // Set the output and kernel MMR sizes. + { + // Carefully destructure these correctly... + let (_, output_mmr_size, _, kernel_mmr_size) = sizes; + b.header.output_mmr_size = output_mmr_size; + b.header.kernel_mmr_size = kernel_mmr_size; + } + + Ok(()) + } + /// Return a pre-built Merkle proof for the given commitment from the store. pub fn get_merkle_proof( &self, @@ -580,9 +644,7 @@ impl Chain { header: &BlockHeader, txhashset: &txhashset::TxHashSet, ) -> Result<(), Error> { - debug!( - "chain: validate_kernel_history: rewinding and validating kernel history (readonly)" - ); + debug!("validate_kernel_history: rewinding and validating kernel history (readonly)"); let mut count = 0; let mut current = header.clone(); @@ -590,14 +652,14 @@ impl Chain { while current.height > 0 { view.rewind(¤t)?; view.validate_root()?; - current = view.batch().get_block_header(¤t.previous)?; + current = view.batch().get_previous_header(¤t)?; count += 1; } Ok(()) })?; debug!( - "chain: validate_kernel_history: validated kernel root on {} headers", + "validate_kernel_history: validated kernel root on {} headers", count, ); @@ -667,13 +729,13 @@ impl Chain { // The txhashset.zip contains the output, rangeproof and kernel MMRs. // We must rebuild the header MMR ourselves based on the headers in our db. - self.rebuild_header_mmr(&Tip::from_block(&header), &mut txhashset)?; + self.rebuild_header_mmr(&Tip::from_header(&header), &mut txhashset)?; // Validate the full kernel history (kernel MMR root for every block header). self.validate_kernel_history(&header, &txhashset)?; // all good, prepare a new batch and update all the required records - debug!("chain: txhashset_write: rewinding a 2nd time (writeable)"); + debug!("txhashset_write: rewinding a 2nd time (writeable)"); let mut batch = self.store.batch()?; @@ -697,13 +759,13 @@ impl Chain { Ok(()) })?; - debug!("chain: txhashset_write: finished validating and rebuilding"); + debug!("txhashset_write: finished validating and rebuilding"); status.on_save(); // Save the new head to the db and rebuild the header by height index. { - let tip = Tip::from_block(&header); + let tip = Tip::from_header(&header); batch.save_body_head(&tip)?; batch.save_header_height(&header)?; batch.build_by_height_index(&header, true)?; @@ -712,7 +774,7 @@ impl Chain { // Commit all the changes to the db. batch.commit()?; - debug!("chain: txhashset_write: finished committing the batch (head etc.)"); + debug!("txhashset_write: finished committing the batch (head etc.)"); // Replace the chain txhashset with the newly built one. { @@ -720,7 +782,7 @@ impl Chain { *txhashset_ref = txhashset; } - debug!("chain: txhashset_write: replaced our txhashset with the new one"); + debug!("txhashset_write: replaced our txhashset with the new one"); // Check for any orphan blocks and process them based on the new chain state. self.check_orphans(header.height + 1); @@ -761,7 +823,7 @@ impl Chain { let cutoff = head.height.saturating_sub(horizon); debug!( - "chain: compact_blocks_db: head height: {}, horizon: {}, cutoff: {}", + "compact_blocks_db: head height: {}, horizon: {}, cutoff: {}", head.height, horizon, cutoff, ); @@ -791,14 +853,14 @@ impl Chain { if current.height <= 1 { break; } - match batch.get_block_header(¤t.previous) { + match batch.get_previous_header(¤t) { Ok(h) => current = h, Err(NotFoundErr(_)) => break, Err(e) => return Err(From::from(e)), } } batch.commit()?; - debug!("chain: compact_blocks_db: removed {} blocks.", count); + debug!("compact_blocks_db: removed {} blocks.", count); Ok(()) } @@ -909,6 +971,13 @@ impl Chain { .map_err(|e| ErrorKind::StoreErr(e, "chain get header".to_owned()).into()) } + /// Get previous block header. + pub fn get_previous_header(&self, header: &BlockHeader) -> Result { + self.store + .get_previous_header(header) + .map_err(|e| ErrorKind::StoreErr(e, "chain get previous header".to_owned()).into()) + } + /// Get block_sums by header hash. pub fn get_block_sums(&self, h: &Hash) -> Result { self.store @@ -928,12 +997,16 @@ impl Chain { &self, output_ref: &OutputIdentifier, ) -> Result { - let mut txhashset = self.txhashset.write(); - let (_, pos) = txhashset.is_unspent(output_ref)?; + let pos = { + let mut txhashset = self.txhashset.write(); + let (_, pos) = txhashset.is_unspent(output_ref)?; + pos + }; + let mut min = 1; let mut max = { - let h = self.head()?; - h.height + let head = self.head()?; + head.height }; loop { @@ -1034,7 +1107,7 @@ fn setup_head( if header.height > 0 && extension.batch.get_block_sums(&header.hash()).is_err() { debug!( - "chain: init: building (missing) block sums for {} @ {}", + "init: building (missing) block sums for {} @ {}", header.height, header.hash() ); @@ -1054,7 +1127,7 @@ fn setup_head( } debug!( - "chain: init: rewinding and validating before we start... {} at {}", + "init: rewinding and validating before we start... {} at {}", header.hash(), header.height, ); @@ -1070,27 +1143,33 @@ fn setup_head( let prev_header = batch.get_block_header(&head.prev_block_h)?; let _ = batch.delete_block(&header.hash()); let _ = batch.setup_height(&prev_header, &head)?; - head = Tip::from_block(&prev_header); + head = Tip::from_header(&prev_header); batch.save_head(&head)?; } } } Err(NotFoundErr(_)) => { + // Save the genesis header with a "zero" header_root. + // We will update this later once we have the correct header_root. + batch.save_block_header(&genesis.header)?; batch.save_block(&genesis)?; - let tip = Tip::from_block(&genesis.header); + + let tip = Tip::from_header(&genesis.header); batch.save_head(&tip)?; batch.setup_height(&genesis.header, &tip)?; - // Apply the genesis block to our empty MMRs. - txhashset::extending(txhashset, &mut batch, |extension| { - extension.apply_block(&genesis)?; + // Initialize our header MM with the genesis header. + txhashset::header_extending(txhashset, &mut batch, |extension| { + extension.apply_header(&genesis.header)?; Ok(()) })?; + batch.save_block_header(&genesis.header)?; + // Save the block_sums to the db for use later. batch.save_block_sums(&genesis.hash(), &BlockSums::default())?; - info!("chain: init: saved genesis: {:?}", genesis.hash()); + info!("init: saved genesis: {:?}", genesis.hash()); } Err(e) => return Err(ErrorKind::StoreErr(e, "chain init load head".to_owned()))?, }; diff --git a/chain/src/pipe.rs b/chain/src/pipe.rs index 95d98d3ff..64c4259de 100644 --- a/chain/src/pipe.rs +++ b/chain/src/pipe.rs @@ -56,10 +56,35 @@ pub struct BlockContext<'a> { pub orphans: Arc, } -// Check if this block is the next block *immediately* -// after our current chain head. -fn is_next_block(header: &BlockHeader, head: &Tip) -> bool { - header.previous == head.last_block_h +/// Process a block header as part of processing a full block. +/// We want to make sure the header is valid before we process the full block. +fn process_header_for_block(header: &BlockHeader, ctx: &mut BlockContext) -> Result<(), Error> { + // If we do not have the previous header then treat the block for this header + // as an orphan. + if ctx.batch.get_previous_header(header).is_err() { + return Err(ErrorKind::Orphan.into()); + } + + txhashset::header_extending(&mut ctx.txhashset, &mut ctx.batch, |extension| { + extension.force_rollback(); + + // Optimize this if "next" header + rewind_and_apply_header_fork(header, extension)?; + + // Check the current root is correct. + extension.validate_root(header)?; + + // Apply the new header to our header extension. + extension.apply_header(header)?; + + Ok(()) + })?; + + validate_header(header, ctx)?; + add_block_header(header, &ctx.batch)?; + update_header_head(header, ctx)?; + + Ok(()) } /// Runs the block processing pipeline, including validation and finding a @@ -70,7 +95,7 @@ pub fn process_block(b: &Block, ctx: &mut BlockContext) -> Result, E // spend resources reading the full block when its header is invalid debug!( - "pipe: process_block {} at {} with {} inputs, {} outputs, {} kernels", + "pipe: process_block {} at {}, in/out/kern: {}/{}/{}", b.hash(), b.header.height, b.inputs().len(), @@ -96,15 +121,12 @@ pub fn process_block(b: &Block, ctx: &mut BlockContext) -> Result, E } // Header specific processing. - { - validate_header(&b.header, ctx)?; - add_block_header(&b.header, ctx)?; - update_header_head(&b.header, ctx)?; - } + process_header_for_block(&b.header, ctx)?; // Check if are processing the "next" block relative to the current chain head. + let prev_header = ctx.batch.get_previous_header(&b.header)?; let head = ctx.batch.head()?; - if is_next_block(&b.header, &head) { + if prev_header.hash() == head.last_block_h { // If this is the "next" block then either - // * common case where we process blocks sequentially. // * special case where this is the first fast sync full block @@ -123,11 +145,9 @@ pub fn process_block(b: &Block, ctx: &mut BlockContext) -> Result, E // Start a chain extension unit of work dependent on the success of the // internal validation and saving operations txhashset::extending(&mut ctx.txhashset, &mut ctx.batch, |mut extension| { - // First we rewind the txhashset extension if necessary - // to put it into a consistent state for validating the block. - // We can skip this step if the previous header is the latest header we saw. - if is_next_block(&b.header, &head) { - // No need to rewind if we are processing the next block. + let prev = extension.batch.get_previous_header(&b.header)?; + if prev.hash() == head.last_block_h { + // Not a fork so we do not need to rewind or reapply any blocks. } else { // Rewind and re-apply blocks on the forked chain to // put the txhashset in the correct forked state @@ -165,14 +185,10 @@ pub fn process_block(b: &Block, ctx: &mut BlockContext) -> Result, E Ok(()) })?; - trace!( - "pipe: process_block: {} at {} is valid, save and append.", - b.hash(), - b.header.height, - ); - - // Add the newly accepted block and header to our index. - add_block(b, ctx)?; + // Add the validated block to the db. + // We do this even if we have not increased the total cumulative work + // so we can maintain multiple (in progress) forks. + add_block(b, &ctx.batch)?; // Update the chain head if total work is increased. let res = update_head(b, ctx)?; @@ -203,24 +219,30 @@ pub fn sync_block_headers( }; if !all_known { - for header in headers { - validate_header(header, ctx)?; - add_block_header(header, ctx)?; - } - let first_header = headers.first().unwrap(); - let prev_header = ctx.batch.get_block_header(&first_header.previous)?; + let prev_header = ctx.batch.get_previous_header(&first_header)?; txhashset::sync_extending(&mut ctx.txhashset, &mut ctx.batch, |extension| { - // Optimize this if "next" header extension.rewind(&prev_header)?; for header in headers { + // Check the current root is correct. extension.validate_root(header)?; + + // Apply the header to the header MMR. extension.apply_header(header)?; + + // Save the header to the db. + add_block_header(header, &extension.batch)?; } Ok(()) })?; + + // Validate all our headers now that we have added each "previous" + // header to the db in this batch above. + for header in headers { + validate_header(header, ctx)?; + } } // Update header_head (if most work) and sync_head (regardless) in all cases, @@ -329,7 +351,8 @@ fn check_known_store(header: &BlockHeader, ctx: &mut BlockContext) -> Result<(), // We cannot assume we can use the chain head for this // as we may be dealing with a fork (with less work currently). fn check_prev_store(header: &BlockHeader, batch: &mut store::Batch) -> Result<(), Error> { - match batch.block_exists(&header.previous) { + let prev = batch.get_previous_header(&header)?; + match batch.block_exists(&prev.hash()) { Ok(true) => { // We have the previous block in the store, so we can proceed. Ok(()) @@ -381,13 +404,14 @@ fn validate_header(header: &BlockHeader, ctx: &mut BlockContext) -> Result<(), E } // first I/O cost, better as late as possible - let prev = match ctx.batch.get_block_header(&header.previous) { + let prev = match ctx.batch.get_previous_header(&header) { Ok(prev) => prev, Err(grin_store::Error::NotFoundErr(_)) => return Err(ErrorKind::Orphan.into()), Err(e) => { - return Err( - ErrorKind::StoreErr(e, format!("previous header {}", header.previous)).into(), - ) + return Err(ErrorKind::StoreErr( + e, + format!("Failed to find previous header to {}", header.hash()), + ).into()) } }; @@ -424,8 +448,9 @@ fn validate_header(header: &BlockHeader, ctx: &mut BlockContext) -> Result<(), E // explicit check to ensure total_difficulty has increased by exactly // the _network_ difficulty of the previous block // (during testnet1 we use _block_ difficulty here) + let prev = ctx.batch.get_previous_header(&header)?; let child_batch = ctx.batch.child()?; - let diff_iter = store::DifficultyIter::from_batch(header.previous, child_batch); + let diff_iter = store::DifficultyIter::from_batch(prev.hash(), child_batch); let next_header_info = consensus::next_difficulty(header.height, diff_iter); if target_difficulty != next_header_info.difficulty { info!( @@ -439,8 +464,7 @@ fn validate_header(header: &BlockHeader, ctx: &mut BlockContext) -> Result<(), E if header.pow.secondary_scaling != next_header_info.secondary_scaling { info!( "validate_header: header secondary scaling {} != {}", - header.pow.secondary_scaling, - next_header_info.secondary_scaling + header.pow.secondary_scaling, next_header_info.secondary_scaling ); return Err(ErrorKind::InvalidScaling.into()); } @@ -450,7 +474,7 @@ fn validate_header(header: &BlockHeader, ctx: &mut BlockContext) -> Result<(), E } fn validate_block(block: &Block, ctx: &mut BlockContext) -> Result<(), Error> { - let prev = ctx.batch.get_block_header(&block.header.previous)?; + let prev = ctx.batch.get_previous_header(&block.header)?; block .validate(&prev.total_kernel_offset, ctx.verifier_cache.clone()) .map_err(|e| ErrorKind::InvalidBlockProof(e))?; @@ -472,8 +496,10 @@ fn verify_coinbase_maturity(block: &Block, ext: &mut txhashset::Extension) -> Re /// based on block_sums of previous block, accounting for the inputs|outputs|kernels /// of the new block. fn verify_block_sums(b: &Block, ext: &mut txhashset::Extension) -> Result<(), Error> { + // TODO - this is 2 db calls, can we optimize this? // Retrieve the block_sums for the previous block. - let block_sums = ext.batch.get_block_sums(&b.header.previous)?; + let prev = ext.batch.get_previous_header(&b.header)?; + let block_sums = ext.batch.get_block_sums(&prev.hash())?; // Overage is based purely on the new block. // Previous block_sums have taken all previous overage into account. @@ -509,22 +535,20 @@ fn apply_block_to_txhashset(block: &Block, ext: &mut txhashset::Extension) -> Re } /// Officially adds the block to our chain. -fn add_block(b: &Block, ctx: &mut BlockContext) -> Result<(), Error> { - // Save the block itself to the db (via the batch). - ctx.batch +/// Header must be added separately (assume this has been done previously). +fn add_block(b: &Block, batch: &store::Batch) -> Result<(), Error> { + batch .save_block(b) .map_err(|e| ErrorKind::StoreErr(e, "pipe save block".to_owned()))?; - - // Build the block_input_bitmap, save to the db (via the batch) and cache locally. - ctx.batch.build_and_cache_block_input_bitmap(&b)?; Ok(()) } /// Officially adds the block header to our header chain. -fn add_block_header(bh: &BlockHeader, ctx: &mut BlockContext) -> Result<(), Error> { - ctx.batch +fn add_block_header(bh: &BlockHeader, batch: &store::Batch) -> Result<(), Error> { + batch .save_block_header(bh) - .map_err(|e| ErrorKind::StoreErr(e, "pipe save header".to_owned()).into()) + .map_err(|e| ErrorKind::StoreErr(e, "pipe save header".to_owned()))?; + Ok(()) } /// Directly updates the head if we've just appended a new block to it or handle @@ -540,7 +564,7 @@ fn update_head(b: &Block, ctx: &BlockContext) -> Result, Error> { .setup_height(&b.header, &head) .map_err(|e| ErrorKind::StoreErr(e, "pipe setup height".to_owned()))?; - let tip = Tip::from_block(&b.header); + let tip = Tip::from_header(&b.header); ctx.batch .save_body_head(&tip) @@ -564,7 +588,7 @@ fn has_more_work(header: &BlockHeader, head: &Tip) -> bool { /// Update the sync head so we can keep syncing from where we left off. fn update_sync_head(bh: &BlockHeader, batch: &mut store::Batch) -> Result<(), Error> { - let tip = Tip::from_block(bh); + let tip = Tip::from_header(bh); batch .save_sync_head(&tip) .map_err(|e| ErrorKind::StoreErr(e, "pipe save sync head".to_owned()))?; @@ -576,7 +600,7 @@ fn update_sync_head(bh: &BlockHeader, batch: &mut store::Batch) -> Result<(), Er fn update_header_head(bh: &BlockHeader, ctx: &mut BlockContext) -> Result, Error> { let header_head = ctx.batch.header_head()?; if has_more_work(&bh, &header_head) { - let tip = Tip::from_block(bh); + let tip = Tip::from_header(bh); ctx.batch .save_header_head(&tip) .map_err(|e| ErrorKind::StoreErr(e, "pipe save header head".to_owned()))?; @@ -592,6 +616,35 @@ fn update_header_head(bh: &BlockHeader, ctx: &mut BlockContext) -> Result