Refactor rewind single header (#1390)

* refactor txhashset to only take a single header
no need to pass in current head_header each time

* rustfmt

* fix bug introduced in rewind with LMDB

* rustfmt

* add some comments
This commit is contained in:
Antioch Peverell 2018-08-20 20:34:12 +01:00 committed by GitHub
parent 8440aad7ea
commit bb7bc2c4f2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 118 additions and 127 deletions

View file

@ -416,13 +416,12 @@ impl Chain {
pre_tx: Option<Transaction>, pre_tx: Option<Transaction>,
block_hash: &Hash, block_hash: &Hash,
) -> Result<Vec<Transaction>, Error> { ) -> Result<Vec<Transaction>, Error> {
// Get headers so we can rewind chain state correctly. // Get header so we can rewind chain state correctly.
let header = self.store.get_block_header(block_hash)?; let header = self.store.get_block_header(block_hash)?;
let head_header = self.store.head_header()?;
let mut txhashset = self.txhashset.write().unwrap(); let mut txhashset = self.txhashset.write().unwrap();
txhashset::extending_readonly(&mut txhashset, |extension| { txhashset::extending_readonly(&mut txhashset, |extension| {
extension.rewind(&header, &head_header)?; extension.rewind(&header)?;
let valid_txs = extension.validate_raw_txs(txs, pre_tx)?; let valid_txs = extension.validate_raw_txs(txs, pre_tx)?;
Ok(valid_txs) Ok(valid_txs)
}) })
@ -465,7 +464,7 @@ impl Chain {
// latest block header. Rewind the extension to the specified header to // latest block header. Rewind the extension to the specified header to
// ensure the view is consistent. // ensure the view is consistent.
txhashset::extending_readonly(&mut txhashset, |extension| { txhashset::extending_readonly(&mut txhashset, |extension| {
extension.rewind(&header, &header)?; extension.rewind(&header)?;
extension.validate(&header, skip_rproofs, &NoStatus)?; extension.validate(&header, skip_rproofs, &NoStatus)?;
Ok(()) Ok(())
}) })
@ -531,11 +530,10 @@ impl Chain {
// The fast sync client does *not* have the necessary data // The fast sync client does *not* have the necessary data
// to rewind after receiving the txhashset zip. // to rewind after receiving the txhashset zip.
let header = self.store.get_block_header(&h)?; let header = self.store.get_block_header(&h)?;
let head_header = self.store.head_header()?;
{ {
let mut txhashset = self.txhashset.write().unwrap(); let mut txhashset = self.txhashset.write().unwrap();
txhashset::extending_readonly(&mut txhashset, |extension| { txhashset::extending_readonly(&mut txhashset, |extension| {
extension.rewind(&header, &head_header)?; extension.rewind(&header)?;
extension.snapshot(&header)?; extension.snapshot(&header)?;
Ok(()) Ok(())
})?; })?;
@ -579,7 +577,7 @@ impl Chain {
"chain: txhashset_write: rewinding and validating (read-only)" "chain: txhashset_write: rewinding and validating (read-only)"
); );
txhashset::extending_readonly(&mut txhashset, |extension| { txhashset::extending_readonly(&mut txhashset, |extension| {
extension.rewind(&header, &header)?; extension.rewind(&header)?;
extension.validate(&header, false, status)?; extension.validate(&header, false, status)?;
// Now validate kernel sums at each historical header height // Now validate kernel sums at each historical header height
@ -596,7 +594,7 @@ impl Chain {
); );
let mut batch = self.store.batch()?; let mut batch = self.store.batch()?;
txhashset::extending(&mut txhashset, &mut batch, |extension| { txhashset::extending(&mut txhashset, &mut batch, |extension| {
extension.rewind(&header, &header)?; extension.rewind(&header)?;
extension.rebuild_index()?; extension.rebuild_index()?;
Ok(()) Ok(())
})?; })?;
@ -892,7 +890,6 @@ fn setup_head(
match head_res { match head_res {
Ok(h) => { Ok(h) => {
head = h; head = h;
let head_header = store.head_header()?;
loop { loop {
// Use current chain tip if we have one. // Use current chain tip if we have one.
// Note: We are rewinding and validating against a writeable extension. // Note: We are rewinding and validating against a writeable extension.
@ -901,7 +898,7 @@ fn setup_head(
let header = store.get_block_header(&head.last_block_h)?; let header = store.get_block_header(&head.last_block_h)?;
let res = txhashset::extending(txhashset, &mut batch, |extension| { let res = txhashset::extending(txhashset, &mut batch, |extension| {
extension.rewind(&header, &head_header)?; extension.rewind(&header)?;
extension.validate_roots(&header)?; extension.validate_roots(&header)?;
debug!( debug!(
LOGGER, LOGGER,

View file

@ -130,7 +130,7 @@ pub fn process_block(b: &Block, ctx: &mut BlockContext) -> Result<Option<Tip>, E
b.hash(), b.hash(),
b.header.height, b.header.height,
); );
add_block(b, ctx.store.clone(), &mut batch)?; add_block(b, &mut batch)?;
let res = update_head(b, &ctx, &mut batch); let res = update_head(b, &ctx, &mut batch);
if res.is_ok() { if res.is_ok() {
batch.commit()?; batch.commit()?;
@ -370,16 +370,14 @@ fn validate_block_via_txhashset(b: &Block, ext: &mut txhashset::Extension) -> Re
} }
/// Officially adds the block to our chain. /// Officially adds the block to our chain.
fn add_block( fn add_block(b: &Block, batch: &mut store::Batch) -> Result<(), Error> {
b: &Block, // Save the block itself to the db (via the batch).
store: Arc<store::ChainStore>,
batch: &mut store::Batch,
) -> Result<(), Error> {
batch batch
.save_block(b) .save_block(b)
.map_err(|e| ErrorKind::StoreErr(e, "pipe save block".to_owned()))?; .map_err(|e| ErrorKind::StoreErr(e, "pipe save block".to_owned()))?;
let bitmap = store.build_and_cache_block_input_bitmap(&b)?;
batch.save_block_input_bitmap(&b.hash(), &bitmap)?; // Build the block_input_bitmap, save to the db (via the batch) and cache locally.
batch.build_and_cache_block_input_bitmap(&b)?;
Ok(()) Ok(())
} }
@ -490,7 +488,6 @@ pub fn rewind_and_apply_fork(
} }
} }
let head_header = store.head_header()?;
let forked_header = store.get_block_header(&current)?; let forked_header = store.get_block_header(&current)?;
trace!( trace!(
@ -503,7 +500,7 @@ pub fn rewind_and_apply_fork(
); );
// rewind the sum trees up to the forking block // rewind the sum trees up to the forking block
ext.rewind(&forked_header, &head_header)?; ext.rewind(&forked_header)?;
trace!( trace!(
LOGGER, LOGGER,

View file

@ -155,64 +155,6 @@ impl ChainStore {
) )
} }
pub fn build_block_input_bitmap(&self, block: &Block) -> Result<Bitmap, Error> {
let bitmap = block
.inputs()
.iter()
.filter_map(|x| self.get_output_pos(&x.commitment()).ok())
.map(|x| x as u32)
.collect();
Ok(bitmap)
}
pub fn build_and_cache_block_input_bitmap(&self, block: &Block) -> Result<Bitmap, Error> {
let bitmap = self.build_block_input_bitmap(block)?;
let mut cache = self.block_input_bitmap_cache.write().unwrap();
cache.insert(block.hash(), bitmap.serialize());
Ok(bitmap)
}
pub fn get_block_input_bitmap(&self, bh: &Hash) -> Result<(bool, Bitmap), Error> {
{
let mut cache = self.block_input_bitmap_cache.write().unwrap();
// cache hit - return the value from the cache
if let Some(bytes) = cache.get_mut(bh) {
return Ok((true, Bitmap::deserialize(&bytes)));
}
}
// cache miss - get it from db and cache it for next time
// if we found one in db
let res = self.get_block_input_bitmap_db(bh);
if let Ok((found, bitmap)) = res {
let mut cache = self.block_input_bitmap_cache.write().unwrap();
cache.insert(*bh, bitmap.serialize());
return Ok((found, bitmap));
}
res
}
// Get the block input bitmap from the db or build the bitmap from
// the full block from the db (if the block is found).
// (bool, Bitmap) : (false if bitmap was built and not found in db)
fn get_block_input_bitmap_db(&self, bh: &Hash) -> Result<(bool, Bitmap), Error> {
if let Ok(Some(bytes)) = self
.db
.get(&to_key(BLOCK_INPUT_BITMAP_PREFIX, &mut bh.to_vec()))
{
Ok((true, Bitmap::deserialize(&bytes)))
} else {
match self.get_block(bh) {
Ok(block) => {
let bitmap = self.build_and_cache_block_input_bitmap(&block)?;
Ok((false, bitmap))
}
Err(e) => Err(e),
}
}
}
/// Builds a new batch to be used with this store. /// Builds a new batch to be used with this store.
pub fn batch(&self) -> Result<Batch, Error> { pub fn batch(&self) -> Result<Batch, Error> {
Ok(Batch { Ok(Batch {
@ -335,7 +277,7 @@ impl<'a> Batch<'a> {
) )
} }
pub fn save_block_input_bitmap(&self, bh: &Hash, bm: &Bitmap) -> Result<(), Error> { fn save_block_input_bitmap(&self, bh: &Hash, bm: &Bitmap) -> Result<(), Error> {
self.db.put( self.db.put(
&to_key(BLOCK_INPUT_BITMAP_PREFIX, &mut bh.to_vec())[..], &to_key(BLOCK_INPUT_BITMAP_PREFIX, &mut bh.to_vec())[..],
bm.serialize(), bm.serialize(),
@ -381,6 +323,64 @@ impl<'a> Batch<'a> {
Ok(()) Ok(())
} }
fn build_block_input_bitmap(&self, block: &Block) -> Result<Bitmap, Error> {
let bitmap = block
.inputs()
.iter()
.filter_map(|x| self.get_output_pos(&x.commitment()).ok())
.map(|x| x as u32)
.collect();
Ok(bitmap)
}
pub fn build_and_cache_block_input_bitmap(&self, block: &Block) -> Result<Bitmap, Error> {
// Build the bitmap.
let bitmap = self.build_block_input_bitmap(block)?;
// Save the bitmap to the db (via the batch).
self.save_block_input_bitmap(&block.hash(), &bitmap)?;
// Finally cache it locally for use later.
let mut cache = self.store.block_input_bitmap_cache.write().unwrap();
cache.insert(block.hash(), bitmap.serialize());
Ok(bitmap)
}
pub fn get_block_input_bitmap(&self, bh: &Hash) -> Result<Bitmap, Error> {
{
let mut cache = self.store.block_input_bitmap_cache.write().unwrap();
// cache hit - return the value from the cache
if let Some(bytes) = cache.get_mut(bh) {
return Ok(Bitmap::deserialize(&bytes));
}
}
// cache miss - get it from db (build it, store it and cache it as necessary)
self.get_block_input_bitmap_db(bh)
}
// Get the block input bitmap from the db or build the bitmap from
// the full block from the db (if the block is found).
// (bool, Bitmap) : (false if bitmap was built and not found in db)
fn get_block_input_bitmap_db(&self, bh: &Hash) -> Result<Bitmap, Error> {
if let Ok(Some(bytes)) = self
.db
.get(&to_key(BLOCK_INPUT_BITMAP_PREFIX, &mut bh.to_vec()))
{
Ok(Bitmap::deserialize(&bytes))
} else {
match self.get_block(bh) {
Ok(block) => {
let bitmap = self.build_and_cache_block_input_bitmap(&block)?;
Ok(bitmap)
}
Err(e) => Err(e),
}
}
}
/// Commits this batch. If it's a child batch, it will be merged with the /// Commits this batch. If it's a child batch, it will be merged with the
/// parent, otherwise the batch is written to db. /// parent, otherwise the batch is written to db.
pub fn commit(self) -> Result<(), Error> { pub fn commit(self) -> Result<(), Error> {

View file

@ -232,31 +232,34 @@ impl TxHashSet {
let horizon = current_height.saturating_sub(global::cut_through_horizon().into()); let horizon = current_height.saturating_sub(global::cut_through_horizon().into());
let horizon_header = self.commit_index.get_header_by_height(horizon)?; let horizon_header = self.commit_index.get_header_by_height(horizon)?;
let rewind_rm_pos =
input_pos_to_rewind(self.commit_index.clone(), &horizon_header, &head_header)?;
let batch = self.commit_index.batch()?; let batch = self.commit_index.batch()?;
if !rewind_rm_pos.0 {
batch.save_block_input_bitmap(&head_header.hash(), &rewind_rm_pos.1)?; let rewind_rm_pos = input_pos_to_rewind(
} self.commit_index.clone(),
&horizon_header,
&head_header,
&batch,
)?;
{ {
let clean_output_index = |commit: &[u8]| { let clean_output_index = |commit: &[u8]| {
// do we care if this fails?
let _ = batch.delete_output_pos(commit); let _ = batch.delete_output_pos(commit);
}; };
self.output_pmmr_h.backend.check_compact( self.output_pmmr_h.backend.check_compact(
horizon_header.output_mmr_size, horizon_header.output_mmr_size,
&rewind_rm_pos.1, &rewind_rm_pos,
clean_output_index, clean_output_index,
)?; )?;
self.rproof_pmmr_h.backend.check_compact( self.rproof_pmmr_h.backend.check_compact(
horizon_header.output_mmr_size, horizon_header.output_mmr_size,
&rewind_rm_pos.1, &rewind_rm_pos,
&prune_noop, &prune_noop,
)?; )?;
} }
// Finally commit the batch, saving everything to the db.
batch.commit()?; batch.commit()?;
Ok(()) Ok(())
@ -712,8 +715,7 @@ impl<'a> Extension<'a> {
); );
// rewind to the specified block for a consistent view // rewind to the specified block for a consistent view
let head_header = self.commit_index.head_header()?; self.rewind(block_header)?;
self.rewind(block_header, &head_header)?;
// then calculate the Merkle Proof based on the known pos // then calculate the Merkle Proof based on the known pos
let pos = self.batch.get_output_pos(&output.commit)?; let pos = self.batch.get_output_pos(&output.commit)?;
@ -742,11 +744,7 @@ impl<'a> Extension<'a> {
/// Rewinds the MMRs to the provided block, rewinding to the last output pos /// Rewinds the MMRs to the provided block, rewinding to the last output pos
/// and last kernel pos of that block. /// and last kernel pos of that block.
pub fn rewind( pub fn rewind(&mut self, block_header: &BlockHeader) -> Result<(), Error> {
&mut self,
block_header: &BlockHeader,
head_header: &BlockHeader,
) -> Result<(), Error> {
trace!( trace!(
LOGGER, LOGGER,
"Rewind to header {} @ {}", "Rewind to header {} @ {}",
@ -754,23 +752,25 @@ impl<'a> Extension<'a> {
block_header.hash(), block_header.hash(),
); );
let head_header = self.commit_index.head_header()?;
// We need to build bitmaps of added and removed output positions // We need to build bitmaps of added and removed output positions
// so we can correctly rewind all operations applied to the output MMR // so we can correctly rewind all operations applied to the output MMR
// after the position we are rewinding to (these operations will be // after the position we are rewinding to (these operations will be
// undone during rewind). // undone during rewind).
// Rewound output pos will be removed from the MMR. // Rewound output pos will be removed from the MMR.
// Rewound input (spent) pos will be added back to the MMR. // Rewound input (spent) pos will be added back to the MMR.
let rewind_rm_pos = let rewind_rm_pos = input_pos_to_rewind(
input_pos_to_rewind(self.commit_index.clone(), block_header, head_header)?; self.commit_index.clone(),
if !rewind_rm_pos.0 { block_header,
self.batch &head_header,
.save_block_input_bitmap(&head_header.hash(), &rewind_rm_pos.1)?; &self.batch,
} )?;
self.rewind_to_pos( self.rewind_to_pos(
block_header.output_mmr_size, block_header.output_mmr_size,
block_header.kernel_mmr_size, block_header.kernel_mmr_size,
&rewind_rm_pos.1, &rewind_rm_pos,
) )
} }
@ -1206,10 +1206,21 @@ fn input_pos_to_rewind(
commit_index: Arc<ChainStore>, commit_index: Arc<ChainStore>,
block_header: &BlockHeader, block_header: &BlockHeader,
head_header: &BlockHeader, head_header: &BlockHeader,
) -> Result<(bool, Bitmap), Error> { batch: &Batch,
) -> Result<Bitmap, Error> {
let mut bitmap = Bitmap::create(); let mut bitmap = Bitmap::create();
let mut current = head_header.hash(); let mut current = head_header.hash();
let mut found = false;
if head_header.height < block_header.height {
debug!(
LOGGER,
"input_pos_to_rewind: {} < {}, nothing to rewind",
head_header.height,
block_header.height
);
return Ok(bitmap);
}
loop { loop {
if current == block_header.hash() { if current == block_header.hash() {
break; break;
@ -1220,12 +1231,11 @@ fn input_pos_to_rewind(
// I/O should be minimized or eliminated here for most // I/O should be minimized or eliminated here for most
// rewind scenarios. // rewind scenarios.
let current_header = commit_index.get_block_header(&current)?; let current_header = commit_index.get_block_header(&current)?;
let input_bitmap_res = commit_index.get_block_input_bitmap(&current); let input_bitmap_res = batch.get_block_input_bitmap(&current);
if let Ok(b) = input_bitmap_res { if let Ok(b) = input_bitmap_res {
found = b.0; bitmap.or_inplace(&b);
bitmap.or_inplace(&b.1);
} }
current = current_header.previous; current = current_header.previous;
} }
Ok((found, bitmap)) Ok(bitmap)
} }

View file

@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
extern crate chrono;
extern crate env_logger; extern crate env_logger;
extern crate grin_chain as chain; extern crate grin_chain as chain;
extern crate grin_core as core; extern crate grin_core as core;
@ -20,14 +21,13 @@ extern crate grin_store as store;
extern crate grin_util as util; extern crate grin_util as util;
extern crate grin_wallet as wallet; extern crate grin_wallet as wallet;
extern crate rand; extern crate rand;
extern crate chrono;
use chrono::Duration;
use std::fs; use std::fs;
use std::sync::Arc; use std::sync::Arc;
use chrono::Duration;
use chain::Chain;
use chain::types::{NoopAdapter, Tip}; use chain::types::{NoopAdapter, Tip};
use chain::Chain;
use core::core::target::Difficulty; use core::core::target::Difficulty;
use core::core::{Block, BlockHeader, Transaction}; use core::core::{Block, BlockHeader, Transaction};
use core::global::{self, ChainTypes}; use core::global::{self, ChainTypes};
@ -96,8 +96,6 @@ fn data_files() {
.process_block(b.clone(), chain::Options::MINE) .process_block(b.clone(), chain::Options::MINE)
.unwrap(); .unwrap();
let head = Tip::from_block(&b.header);
chain.validate(false).unwrap(); chain.validate(false).unwrap();
} }
} }

View file

@ -242,7 +242,6 @@ fn spend_in_fork_and_compact() {
// mine the first block and keep track of the block_hash // mine the first block and keep track of the block_hash
// so we can spend the coinbase later // so we can spend the coinbase later
let b = prepare_block(&kc, &fork_head, &chain, 2); let b = prepare_block(&kc, &fork_head, &chain, 2);
let block_hash = b.hash();
let out_id = OutputIdentifier::from_output(&b.outputs()[0]); let out_id = OutputIdentifier::from_output(&b.outputs()[0]);
assert!(out_id.features.contains(OutputFeatures::COINBASE_OUTPUT)); assert!(out_id.features.contains(OutputFeatures::COINBASE_OUTPUT));
fork_head = b.header.clone(); fork_head = b.header.clone();
@ -250,8 +249,6 @@ fn spend_in_fork_and_compact() {
.process_block(b.clone(), chain::Options::SKIP_POW) .process_block(b.clone(), chain::Options::SKIP_POW)
.unwrap(); .unwrap();
let merkle_proof = chain.get_merkle_proof(&out_id, &b.header).unwrap();
// now mine three further blocks // now mine three further blocks
for n in 3..6 { for n in 3..6 {
let b = prepare_block(&kc, &fork_head, &chain, n); let b = prepare_block(&kc, &fork_head, &chain, n);

View file

@ -23,7 +23,7 @@ extern crate rand;
use std::fs; use std::fs;
use std::sync::Arc; use std::sync::Arc;
use chain::{ChainStore, Tip}; use chain::Tip;
use core::core::hash::Hashed; use core::core::hash::Hashed;
use core::core::target::Difficulty; use core::core::target::Difficulty;
use core::core::{Block, BlockHeader}; use core::core::{Block, BlockHeader};

View file

@ -26,7 +26,7 @@ use std::fs;
use std::sync::Arc; use std::sync::Arc;
use chain::types::NoopAdapter; use chain::types::NoopAdapter;
use chain::{Error, ErrorKind}; use chain::ErrorKind;
use core::core::target::Difficulty; use core::core::target::Difficulty;
use core::core::{transaction, OutputIdentifier}; use core::core::{transaction, OutputIdentifier};
use core::global::{self, ChainTypes}; use core::global::{self, ChainTypes};
@ -86,17 +86,10 @@ fn test_coinbase_maturity() {
.contains(transaction::OutputFeatures::COINBASE_OUTPUT) .contains(transaction::OutputFeatures::COINBASE_OUTPUT)
); );
let out_id = OutputIdentifier::from_output(&coinbase_output);
// we will need this later when we want to spend the coinbase output
let block_hash = block.hash();
chain chain
.process_block(block.clone(), chain::Options::MINE) .process_block(block.clone(), chain::Options::MINE)
.unwrap(); .unwrap();
let merkle_proof = chain.get_merkle_proof(&out_id, &block.header).unwrap();
let prev = chain.head_header().unwrap(); let prev = chain.head_header().unwrap();
let amount = consensus::REWARD; let amount = consensus::REWARD;

View file

@ -83,11 +83,10 @@ impl BlockChain for ChainAdapter {
.store .store
.get_block_header(&block_hash) .get_block_header(&block_hash)
.map_err(|_| PoolError::Other(format!("failed to get header")))?; .map_err(|_| PoolError::Other(format!("failed to get header")))?;
let head_header = self.chain_head()?;
let mut txhashset = self.txhashset.write().unwrap(); let mut txhashset = self.txhashset.write().unwrap();
let res = txhashset::extending_readonly(&mut txhashset, |extension| { let res = txhashset::extending_readonly(&mut txhashset, |extension| {
extension.rewind(&header, &head_header)?; extension.rewind(&header)?;
let valid_txs = extension.validate_raw_txs(txs, pre_tx)?; let valid_txs = extension.validate_raw_txs(txs, pre_tx)?;
Ok(valid_txs) Ok(valid_txs)
}).map_err(|e| PoolError::Other(format!("Error: test chain adapter: {:?}", e)))?; }).map_err(|e| PoolError::Other(format!("Error: test chain adapter: {:?}", e)))?;