Simplify chain, remove head. (#1657)

* Go back to db for head and header_head
* Use batch consistently for db access
* Pass active txhashset and batch in the ctx
* Only update head in db if total work increases
* Only updated header_head if total work (on header chain) increases
This commit is contained in:
Antioch Peverell 2018-10-05 08:29:33 +01:00 committed by GitHub
parent 77dfff7c94
commit bcf41438dc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 232 additions and 281 deletions

View file

@ -18,7 +18,7 @@
use std::collections::HashMap;
use std::fs::File;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use lmdb;
@ -29,12 +29,11 @@ use core::core::merkle_proof::MerkleProof;
use core::core::verifier_cache::VerifierCache;
use core::core::{Block, BlockHeader, BlockSums, Output, OutputIdentifier, Transaction, TxKernel};
use core::global;
use core::pow::{self, Difficulty};
use core::pow;
use error::{Error, ErrorKind};
use grin_store::Error::NotFoundErr;
use pipe;
use store;
use store::Batch;
use txhashset;
use types::{ChainAdapter, NoStatus, Options, Tip, TxHashsetWriteStatus};
use util::secp::pedersen::{Commitment, RangeProof};
@ -146,8 +145,6 @@ pub struct Chain {
db_root: String,
store: Arc<store::ChainStore>,
adapter: Arc<ChainAdapter>,
head: Arc<Mutex<Tip>>,
orphans: Arc<OrphanBlockPool>,
txhashset: Arc<RwLock<txhashset::TxHashSet>>,
// Recently processed blocks to avoid double-processing
@ -183,9 +180,7 @@ impl Chain {
setup_head(genesis, store.clone(), &mut txhashset)?;
// Now reload the chain head (either existing head or genesis from above)
let head = store.head()?;
debug!(
LOGGER,
"Chain init: {} @ {} [{}]",
@ -198,7 +193,6 @@ impl Chain {
db_root: db_root,
store: store,
adapter: adapter,
head: Arc::new(Mutex::new(head)),
orphans: Arc::new(OrphanBlockPool::new()),
txhashset: Arc::new(RwLock::new(txhashset)),
pow_verifier,
@ -210,73 +204,42 @@ impl Chain {
/// Processes a single block, then checks for orphans, processing
/// those as well if they're found
pub fn process_block(
&self,
b: Block,
opts: Options,
) -> Result<(Option<Tip>, Option<Block>), Error> {
match self.process_block_single(b, opts) {
Ok((t, b)) => {
// We accepted a block, so see if we can accept any orphans
if let Some(ref b) = b {
self.check_orphans(b.header.height + 1);
}
Ok((t, b))
}
Err(e) => Err(e),
pub fn process_block(&self, b: Block, opts: Options) -> Result<Option<Tip>, Error> {
let height = b.header.height;
let res = self.process_block_single(b, opts);
if res.is_ok() {
self.check_orphans(height + 1);
}
res
}
/// Attempt to add a new block to the chain. Returns the new chain tip if it
/// has been added to the longest chain, None if it's added to an (as of
/// now) orphan chain.
fn process_block_single(
&self,
b: Block,
opts: Options,
) -> Result<(Option<Tip>, Option<Block>), Error> {
let mut batch = self.store.batch()?;
let bhash = b.hash();
let mut ctx = self.new_ctx(opts, &mut batch)?;
/// Attempt to add a new block to the chain.
/// Returns true if it has been added to the longest chain
/// or false if it has added to a fork (or orphan?).
fn process_block_single(&self, b: Block, opts: Options) -> Result<Option<Tip>, Error> {
let batch = self.store.batch()?;
let mut txhashset = self.txhashset.write().unwrap();
let mut ctx = self.new_ctx(opts, batch, &mut txhashset)?;
let res = pipe::process_block(&b, &mut ctx, &mut batch);
let add_to_hash_cache = || {
// let hash = b.hash();
let add_to_hash_cache = |hash: Hash| {
// only add to hash cache below if block is definitively accepted
// or rejected
let mut cache = self.block_hashes_cache.write().unwrap();
cache.insert(bhash, true);
cache.insert(hash, true);
};
match res {
Ok(Some(ref tip)) => {
batch.commit()?;
match pipe::process_block(&b, &mut ctx) {
Ok(head) => {
// Commit the batch in the ctx to the db.
ctx.batch.commit()?;
// block got accepted and extended the head, updating our head
let chain_head = self.head.clone();
{
let mut head = chain_head.lock().unwrap();
*head = tip.clone();
}
add_to_hash_cache();
add_to_hash_cache(b.hash());
// notifying other parts of the system of the update
self.adapter.block_accepted(&b, opts);
Ok((Some(tip.clone()), Some(b)))
}
Ok(None) => {
batch.commit()?;
add_to_hash_cache();
// block got accepted but we did not extend the head
// so its on a fork (or is the start of a new fork)
// broadcast the block out so everyone knows about the fork
// broadcast the block
self.adapter.block_accepted(&b, opts);
Ok((None, Some(b)))
Ok(head)
}
Err(e) => {
match e.kind() {
@ -325,7 +288,7 @@ impl Chain {
b.header.height,
e
);
add_to_hash_cache();
add_to_hash_cache(b.hash());
Err(ErrorKind::Other(format!("{:?}", e).to_owned()).into())
}
}
@ -335,38 +298,45 @@ impl Chain {
/// Process a block header received during "header first" propagation.
pub fn process_block_header(&self, bh: &BlockHeader, opts: Options) -> Result<(), Error> {
let mut batch = self.store.batch()?;
let mut ctx = self.new_ctx(opts, &mut batch)?;
pipe::process_block_header(bh, &mut ctx, &mut batch)?;
batch.commit()?;
let batch = self.store.batch()?;
let mut txhashset = self.txhashset.write().unwrap();
let mut ctx = self.new_ctx(opts, batch, &mut txhashset)?;
pipe::process_block_header(bh, &mut ctx)?;
ctx.batch.commit()?;
Ok(())
}
/// Attempt to add a new header to the header chain.
/// This is only ever used during sync and uses sync_head.
/// Attempt to add new headers to the header chain (or fork).
/// This is only ever used during sync and is based on sync_head.
/// We update header_head here if our total work increases.
pub fn sync_block_headers(
&self,
headers: &Vec<BlockHeader>,
opts: Options,
) -> Result<(), Error> {
let mut batch = self.store.batch()?;
let mut ctx = self.new_ctx(opts, &mut batch)?;
pipe::sync_block_headers(headers, &mut ctx, &mut batch)?;
batch.commit()?;
let batch = self.store.batch()?;
let mut txhashset = self.txhashset.write().unwrap();
let mut ctx = self.new_ctx(opts, batch, &mut txhashset)?;
pipe::sync_block_headers(headers, &mut ctx)?;
ctx.batch.commit()?;
Ok(())
}
fn new_ctx(&self, opts: Options, batch: &mut Batch) -> Result<pipe::BlockContext, Error> {
let head = batch.head()?;
let header_head = batch.get_header_head()?;
fn new_ctx<'a>(
&self,
opts: Options,
batch: store::Batch<'a>,
txhashset: &'a mut txhashset::TxHashSet,
) -> Result<pipe::BlockContext<'a>, Error> {
Ok(pipe::BlockContext {
opts,
head,
header_head,
pow_verifier: self.pow_verifier,
block_hashes_cache: self.block_hashes_cache.clone(),
verifier_cache: self.verifier_cache.clone(),
txhashset: self.txhashset.clone(),
txhashset,
batch,
orphans: self.orphans.clone(),
})
}
@ -411,10 +381,11 @@ impl Chain {
String::new()
},
);
let height = orphan.block.header.height;
let res = self.process_block_single(orphan.block, orphan.opts);
if let Ok((_, Some(b))) = res {
if res.is_ok() {
orphan_accepted = true;
height_accepted = b.header.height;
height_accepted = height;
}
}
@ -568,7 +539,7 @@ impl Chain {
// so we can send these across as part of the zip file.
// The fast sync client does *not* have the necessary data
// to rewind after receiving the txhashset zip.
let header = self.store.get_block_header(&h)?;
let header = self.get_block_header(&h)?;
{
let mut txhashset = self.txhashset.write().unwrap();
txhashset::extending_readonly(&mut txhashset, |extension| {
@ -633,13 +604,17 @@ impl Chain {
status: &TxHashsetWriteStatus,
) -> Result<(), Error> {
status.on_setup();
let head = self.head().unwrap();
let header_head = self.get_header_head().unwrap();
if header_head.height - head.height < global::cut_through_horizon() as u64 {
return Err(ErrorKind::InvalidTxHashSet("not needed".to_owned()).into());
// Initial check based on relative heights of current head and header_head.
{
let head = self.head().unwrap();
let header_head = self.header_head().unwrap();
if header_head.height - head.height < global::cut_through_horizon() as u64 {
return Err(ErrorKind::InvalidTxHashSet("not needed".to_owned()).into());
}
}
let header = self.store.get_block_header(&h)?;
let header = self.get_block_header(&h)?;
txhashset::zip_write(self.db_root.clone(), txhashset_data, &header)?;
let mut txhashset =
@ -653,7 +628,9 @@ impl Chain {
LOGGER,
"chain: txhashset_write: rewinding a 2nd time (writeable)"
);
let mut batch = self.store.batch()?;
txhashset::extending(&mut txhashset, &mut batch, |extension| {
extension.rewind(&header)?;
@ -686,23 +663,26 @@ impl Chain {
);
status.on_save();
// Replace the chain txhashset with the newly built one.
{
let mut txhashset_ref = self.txhashset.write().unwrap();
*txhashset_ref = txhashset;
}
// Setup new head.
let head = {
let mut head = self.head.lock().unwrap();
*head = Tip::from_block(&header);
head.clone()
};
debug!(
LOGGER,
"chain: txhashset_write: replaced our txhashset with the new one"
);
// Save the new head to the db and rebuild the header by height index.
{
batch.save_body_head(&head)?;
let tip = Tip::from_block(&header);
batch.save_body_head(&tip)?;
batch.save_header_height(&header)?;
batch.build_by_height_index(&header, true)?;
}
// Commit all the changes to the db.
batch.commit()?;
@ -741,11 +721,11 @@ impl Chain {
debug!(LOGGER, "Starting blockchain compaction.");
// Compact the txhashset via the extension.
{
let mut txhashes = self.txhashset.write().unwrap();
txhashes.compact()?;
let mut txhashset = self.txhashset.write().unwrap();
txhashset.compact()?;
// print out useful debug info after compaction
txhashset::extending_readonly(&mut txhashes, |extension| {
txhashset::extending_readonly(&mut txhashset, |extension| {
extension.dump_output_pmmr();
Ok(())
})?;
@ -771,9 +751,10 @@ impl Chain {
head.height - horizon
);
let mut count = 0;
let mut current = self.store.get_header_by_height(head.height - horizon - 1)?;
let batch = self.store.batch()?;
let mut current = batch.get_header_by_height(head.height - horizon - 1)?;
loop {
// Go to the store directly so we can handle NotFoundErr robustly.
match self.store.get_block(&current.hash()) {
Ok(b) => {
batch.delete_block(&b.hash())?;
@ -791,7 +772,7 @@ impl Chain {
if current.height <= 1 {
break;
}
match self.store.get_block_header(&current.previous) {
match batch.get_block_header(&current.previous) {
Ok(h) => current = h,
Err(NotFoundErr(_)) => break,
Err(e) => return Err(From::from(e)),
@ -846,21 +827,11 @@ impl Chain {
Ok((outputs.0, max_index, output_vec))
}
/// Total difficulty at the head of the chain
pub fn total_difficulty(&self) -> Difficulty {
self.head.lock().unwrap().clone().total_difficulty
}
/// Orphans pool size
pub fn orphans_len(&self) -> usize {
self.orphans.len()
}
/// Total difficulty at the head of the header chain
pub fn total_header_difficulty(&self) -> Result<Difficulty, Error> {
Ok(self.store.get_header_head()?.total_difficulty)
}
/// Reset header_head and sync_head to head of current body chain
pub fn reset_head(&self) -> Result<(), Error> {
let batch = self.store.batch()?;
@ -869,9 +840,18 @@ impl Chain {
Ok(())
}
/// Get the tip that's also the head of the chain
/// Tip (head) of the block chain.
pub fn head(&self) -> Result<Tip, Error> {
Ok(self.head.lock().unwrap().clone())
self.store
.head()
.map_err(|e| ErrorKind::StoreErr(e, "chain head".to_owned()).into())
}
/// Tip (head) of the header chain.
pub fn header_head(&self) -> Result<Tip, Error> {
self.store
.header_head()
.map_err(|e| ErrorKind::StoreErr(e, "chain header head".to_owned()).into())
}
/// Block header for the chain head
@ -918,7 +898,7 @@ impl Chain {
let (_, pos) = txhashset.is_unspent(output_ref)?;
let mut min = 1;
let mut max = {
let h = self.head.lock().unwrap();
let h = self.head()?;
h.height
};
@ -957,19 +937,12 @@ impl Chain {
.map_err(|e| ErrorKind::StoreErr(e, "chain get sync head".to_owned()).into())
}
/// Get the tip of the header chain.
pub fn get_header_head(&self) -> Result<Tip, Error> {
self.store
.get_header_head()
.map_err(|e| ErrorKind::StoreErr(e, "chain get header head".to_owned()).into())
}
/// Builds an iterator on blocks starting from the current chain head and
/// running backward. Specialized to return information pertaining to block
/// difficulty calculation (timestamp and previous difficulties).
pub fn difficulty_iter(&self) -> store::DifficultyIter {
let head = self.head().unwrap();
let batch = self.store.batch().unwrap();
let head = self.head.lock().unwrap();
store::DifficultyIter::from(head.last_block_h, batch)
}

View file

@ -40,17 +40,16 @@ use failure::ResultExt;
/// Contextual information required to process a new block and either reject or
/// accept it.
pub struct BlockContext {
pub struct BlockContext<'a> {
/// The options
pub opts: Options,
/// The head
pub head: Tip,
/// The header head
pub header_head: Tip,
/// The POW verification function
/// The pow verifier to use when processing a block.
pub pow_verifier: fn(&BlockHeader, u8) -> Result<(), pow::Error>,
/// MMR sum tree states
pub txhashset: Arc<RwLock<txhashset::TxHashSet>>,
/// The active txhashset (rewindable MMRs) to use for block processing.
pub txhashset: &'a mut txhashset::TxHashSet,
/// The active batch to use for block processing.
pub batch: store::Batch<'a>,
/// Recently processed blocks to avoid double-processing
pub block_hashes_cache: Arc<RwLock<LruCache<Hash, bool>>>,
/// The verifier cache (caching verifier for rangeproofs and kernel signatures)
@ -61,18 +60,14 @@ pub struct BlockContext {
// Check if this block is the next block *immediately*
// after our current chain head.
fn is_next_block(header: &BlockHeader, ctx: &mut BlockContext) -> bool {
header.previous == ctx.head.last_block_h
fn is_next_block(header: &BlockHeader, head: &Tip) -> bool {
header.previous == head.last_block_h
}
/// Runs the block processing pipeline, including validation and finding a
/// place for the new block in the chain. Returns the new chain head if
/// updated.
pub fn process_block(
b: &Block,
ctx: &mut BlockContext,
batch: &mut store::Batch,
) -> Result<Option<Tip>, Error> {
/// place for the new block in the chain.
/// Returns new head if chain head updated.
pub fn process_block(b: &Block, ctx: &mut BlockContext) -> Result<Option<Tip>, Error> {
// TODO should just take a promise for a block with a full header so we don't
// spend resources reading the full block when its header is invalid
@ -86,13 +81,6 @@ pub fn process_block(
b.kernels().len(),
);
// First thing we do is take a write lock on the txhashset.
// We may receive the same block from multiple peers simultaneously.
// We want to process the first one fully to avoid redundant work
// processing the duplicates.
let txhashset = ctx.txhashset.clone();
let mut txhashset = txhashset.write().unwrap();
// Fast in-memory checks to avoid re-processing a block we recently processed.
{
// Check if we have recently processed this block (via ctx chain head).
@ -106,16 +94,11 @@ pub fn process_block(
}
// Header specific processing.
{
handle_block_header(&b.header, ctx, batch)?;
// Update header_head (but only if this header increases our total known work).
// i.e. Only if this header is now the head of the current "most work" chain.
update_header_head(&b.header, ctx, batch)?;
}
handle_block_header(&b.header, ctx)?;
// Check if are processing the "next" block relative to the current chain head.
if is_next_block(&b.header, ctx) {
let head = ctx.batch.head()?;
if is_next_block(&b.header, &head) {
// If this is the "next" block then either -
// * common case where we process blocks sequentially.
// * special case where this is the first fast sync full block
@ -124,32 +107,32 @@ pub fn process_block(
// Check we have *this* block in the store.
// Stop if we have processed this block previously (it is in the store).
// This is more expensive than the earlier check_known() as we hit the store.
check_known_store(&b.header, ctx, batch)?;
check_known_store(&b.header, ctx)?;
// Check existing MMR (via rewind) to see if this block is known to us already.
// This should catch old blocks before we check to see if they appear to be
// orphaned due to compacting/pruning on a fast-sync node.
// This is more expensive than check_known_store() as we rewind the txhashset.
// But we only incur the cost of the rewind if this is an earlier block on the same chain.
check_known_mmr(&b.header, ctx, batch, &mut txhashset)?;
check_known_mmr(&b.header, ctx)?;
// At this point it looks like this is a new block that we have not yet processed.
// Check we have the *previous* block in the store.
// If we do not then treat this block as an orphan.
check_prev_store(&b.header, batch)?;
check_prev_store(&b.header, &mut ctx.batch)?;
}
// Validate the block itself, make sure it is internally consistent.
// Use the verifier_cache for verifying rangeproofs and kernel signatures.
validate_block(b, batch, ctx.verifier_cache.clone())?;
validate_block(b, ctx)?;
// Start a chain extension unit of work dependent on the success of the
// internal validation and saving operations
txhashset::extending(&mut txhashset, batch, |mut extension| {
txhashset::extending(&mut ctx.txhashset, &mut ctx.batch, |mut extension| {
// First we rewind the txhashset extension if necessary
// to put it into a consistent state for validating the block.
// We can skip this step if the previous header is the latest header we saw.
if is_next_block(&b.header, ctx) {
if is_next_block(&b.header, &head) {
// No need to rewind if we are processing the next block.
} else {
// Rewind the re-apply blocks on the forked chain to
@ -180,7 +163,8 @@ pub fn process_block(
// If applying this block does not increase the work on the chain then
// we know we have not yet updated the chain to produce a new chain head.
if !block_has_more_work(&b.header, &ctx.head) {
let head = extension.batch.head()?;
if !has_more_work(&b.header, &head) {
extension.force_rollback();
}
@ -195,13 +179,14 @@ pub fn process_block(
);
// Add the newly accepted block and header to our index.
add_block(b, batch)?;
add_block(b, ctx)?;
// Update the chain head in the index (if necessary)
let res = update_head(b, ctx, batch)?;
// Return the new chain tip if we added work, or
// None if this block has not added work.
// Update the chain head (and header_head) if total work is increased.
let res = {
let _ = update_header_head(&b.header, ctx)?;
let res = update_head(b, ctx)?;
res
};
Ok(res)
}
@ -210,8 +195,7 @@ pub fn process_block(
pub fn sync_block_headers(
headers: &Vec<BlockHeader>,
ctx: &mut BlockContext,
batch: &mut store::Batch,
) -> Result<(), Error> {
) -> Result<Option<Tip>, Error> {
if let Some(header) = headers.first() {
debug!(
LOGGER,
@ -221,18 +205,18 @@ pub fn sync_block_headers(
header.height,
);
} else {
return Ok(());
return Ok(None);
}
let all_known = if let Some(last_header) = headers.last() {
batch.get_block_header(&last_header.hash()).is_ok()
ctx.batch.get_block_header(&last_header.hash()).is_ok()
} else {
false
};
if !all_known {
for header in headers {
handle_block_header(header, ctx, batch)?;
handle_block_header(header, ctx)?;
}
}
@ -242,24 +226,21 @@ pub fn sync_block_headers(
// progressing.
// We only need to do this once at the end of this batch of headers.
if let Some(header) = headers.last() {
// Update sync_head regardless of total work.
update_sync_head(header, &mut ctx.batch)?;
// Update header_head (but only if this header increases our total known work).
// i.e. Only if this header is now the head of the current "most work" chain.
update_header_head(header, ctx, batch)?;
// Update sync_head regardless of total work.
update_sync_head(header, batch)?;
let res = update_header_head(header, ctx)?;
Ok(res)
} else {
Ok(None)
}
Ok(())
}
fn handle_block_header(
header: &BlockHeader,
ctx: &mut BlockContext,
batch: &mut store::Batch,
) -> Result<(), Error> {
validate_header(header, ctx, batch)?;
add_block_header(header, batch)?;
fn handle_block_header(header: &BlockHeader, ctx: &mut BlockContext) -> Result<(), Error> {
validate_header(header, ctx)?;
add_block_header(header, ctx)?;
Ok(())
}
@ -267,27 +248,25 @@ fn handle_block_header(
/// We validate the header but we do not store it or update header head based
/// on this. We will update these once we get the block back after requesting
/// it.
pub fn process_block_header(
bh: &BlockHeader,
ctx: &mut BlockContext,
batch: &mut store::Batch,
) -> Result<(), Error> {
pub fn process_block_header(header: &BlockHeader, ctx: &mut BlockContext) -> Result<(), Error> {
debug!(
LOGGER,
"pipe: process_block_header at {} [{}]",
bh.height,
bh.hash()
"pipe: process_block_header: {} at {}",
header.hash(),
header.height,
); // keep this
check_header_known(bh.hash(), ctx)?;
validate_header(&bh, ctx, batch)
check_header_known(header, ctx)?;
validate_header(header, ctx)?;
Ok(())
}
/// Quick in-memory check to fast-reject any block header we've already handled
/// recently. Keeps duplicates from the network in check.
/// ctx here is specific to the header_head (tip of the header chain)
fn check_header_known(bh: Hash, ctx: &mut BlockContext) -> Result<(), Error> {
if bh == ctx.header_head.last_block_h || bh == ctx.header_head.prev_block_h {
fn check_header_known(header: &BlockHeader, ctx: &mut BlockContext) -> Result<(), Error> {
let header_head = ctx.batch.header_head()?;
if header.hash() == header_head.last_block_h || header.hash() == header_head.prev_block_h {
return Err(ErrorKind::Unfit("header already known".to_string()).into());
}
Ok(())
@ -297,8 +276,9 @@ fn check_header_known(bh: Hash, ctx: &mut BlockContext) -> Result<(), Error> {
/// Keeps duplicates from the network in check.
/// Checks against the last_block_h and prev_block_h of the chain head.
fn check_known_head(header: &BlockHeader, ctx: &mut BlockContext) -> Result<(), Error> {
let head = ctx.batch.head()?;
let bh = header.hash();
if bh == ctx.head.last_block_h || bh == ctx.head.prev_block_h {
if bh == head.last_block_h || bh == head.prev_block_h {
return Err(ErrorKind::Unfit("already known in head".to_string()).into());
}
Ok(())
@ -325,14 +305,11 @@ fn check_known_orphans(header: &BlockHeader, ctx: &mut BlockContext) -> Result<(
}
// Check if this block is in the store already.
fn check_known_store(
header: &BlockHeader,
ctx: &mut BlockContext,
batch: &mut store::Batch,
) -> Result<(), Error> {
match batch.block_exists(&header.hash()) {
fn check_known_store(header: &BlockHeader, ctx: &mut BlockContext) -> Result<(), Error> {
match ctx.batch.block_exists(&header.hash()) {
Ok(true) => {
if header.height < ctx.head.height.saturating_sub(50) {
let head = ctx.batch.head()?;
if header.height < head.height.saturating_sub(50) {
// TODO - we flag this as an "abusive peer" but only in the case
// where we have the full block in our store.
// So this is not a particularly exhaustive check.
@ -377,20 +354,16 @@ fn check_prev_store(header: &BlockHeader, batch: &mut store::Batch) -> Result<()
// First check the header matches via current height index.
// Then peek directly into the MMRs at the appropriate pos.
// We can avoid a full rewind in this case.
fn check_known_mmr(
header: &BlockHeader,
ctx: &mut BlockContext,
batch: &mut store::Batch,
write_txhashset: &mut txhashset::TxHashSet,
) -> Result<(), Error> {
fn check_known_mmr(header: &BlockHeader, ctx: &mut BlockContext) -> Result<(), Error> {
// No point checking the MMR if this block is not earlier in the chain.
if header.height > ctx.head.height {
let head = ctx.batch.head()?;
if header.height > head.height {
return Ok(());
}
// Use "header by height" index to look at current most work chain.
// Header is not "known if the header differs at the given height.
let local_header = batch.get_header_by_height(header.height)?;
let local_header = ctx.batch.get_header_by_height(header.height)?;
if local_header.hash() != header.hash() {
return Ok(());
}
@ -399,7 +372,7 @@ fn check_known_mmr(
// roots and sizes against the header.
// If everything matches then this is a "known" block
// and we do not need to spend any more effort
txhashset::extending_readonly(write_txhashset, |extension| {
txhashset::extending_readonly(&mut ctx.txhashset, |extension| {
extension.rewind(header)?;
// We want to return an error here (block already known)
@ -423,11 +396,7 @@ fn check_known_mmr(
/// First level of block validation that only needs to act on the block header
/// to make it as cheap as possible. The different validations are also
/// arranged by order of cost to have as little DoS surface as possible.
fn validate_header(
header: &BlockHeader,
ctx: &mut BlockContext,
batch: &mut store::Batch,
) -> Result<(), Error> {
fn validate_header(header: &BlockHeader, ctx: &mut BlockContext) -> Result<(), Error> {
// check version, enforces scheduled hard fork
if !consensus::valid_header_version(header.height, header.version) {
error!(
@ -467,7 +436,7 @@ fn validate_header(
}
// first I/O cost, better as late as possible
let prev = match batch.get_block_header(&header.previous) {
let prev = match ctx.batch.get_block_header(&header.previous) {
Ok(prev) => prev,
Err(grin_store::Error::NotFoundErr(_)) => return Err(ErrorKind::Orphan.into()),
Err(e) => {
@ -516,7 +485,7 @@ fn validate_header(
// explicit check to ensure total_difficulty has increased by exactly
// the _network_ difficulty of the previous block
// (during testnet1 we use _block_ difficulty here)
let child_batch = batch.child()?;
let child_batch = ctx.batch.child()?;
let diff_iter = store::DifficultyIter::from(header.previous, child_batch);
let network_difficulty = consensus::next_difficulty(diff_iter)
.context(ErrorKind::Other("network difficulty".to_owned()))?;
@ -534,17 +503,13 @@ fn validate_header(
Ok(())
}
fn validate_block(
block: &Block,
batch: &mut store::Batch,
verifier_cache: Arc<RwLock<VerifierCache>>,
) -> Result<(), Error> {
let prev = batch.get_block_header(&block.header.previous)?;
fn validate_block(block: &Block, ctx: &mut BlockContext) -> Result<(), Error> {
let prev = ctx.batch.get_block_header(&block.header.previous)?;
block
.validate(
&prev.total_kernel_offset,
&prev.total_kernel_sum,
verifier_cache,
ctx.verifier_cache.clone(),
).map_err(|e| ErrorKind::InvalidBlockProof(e))?;
Ok(())
}
@ -610,20 +575,20 @@ fn apply_block_to_txhashset(block: &Block, ext: &mut txhashset::Extension) -> Re
}
/// Officially adds the block to our chain.
fn add_block(b: &Block, batch: &mut store::Batch) -> Result<(), Error> {
fn add_block(b: &Block, ctx: &mut BlockContext) -> Result<(), Error> {
// Save the block itself to the db (via the batch).
batch
ctx.batch
.save_block(b)
.map_err(|e| ErrorKind::StoreErr(e, "pipe save block".to_owned()))?;
// Build the block_input_bitmap, save to the db (via the batch) and cache locally.
batch.build_and_cache_block_input_bitmap(&b)?;
ctx.batch.build_and_cache_block_input_bitmap(&b)?;
Ok(())
}
/// Officially adds the block header to our header chain.
fn add_block_header(bh: &BlockHeader, batch: &mut store::Batch) -> Result<(), Error> {
batch
fn add_block_header(bh: &BlockHeader, ctx: &mut BlockContext) -> Result<(), Error> {
ctx.batch
.save_block_header(bh)
.map_err(|e| ErrorKind::StoreErr(e, "pipe save header".to_owned()).into())
}
@ -631,38 +596,27 @@ fn add_block_header(bh: &BlockHeader, batch: &mut store::Batch) -> Result<(), Er
/// Directly updates the head if we've just appended a new block to it or handle
/// the situation where we've just added enough work to have a fork with more
/// work than the head.
fn update_head(
b: &Block,
ctx: &BlockContext,
batch: &mut store::Batch,
) -> Result<Option<Tip>, Error> {
fn update_head(b: &Block, ctx: &BlockContext) -> Result<Option<Tip>, Error> {
// if we made a fork with more work than the head (which should also be true
// when extending the head), update it
if block_has_more_work(&b.header, &ctx.head) {
// update the block height index
batch
.setup_height(&b.header, &ctx.head)
let head = ctx.batch.head()?;
if has_more_work(&b.header, &head) {
// Update the block height index based on this new head.
ctx.batch
.setup_height(&b.header, &head)
.map_err(|e| ErrorKind::StoreErr(e, "pipe setup height".to_owned()))?;
// in sync mode, only update the "body chain", otherwise update both the
// "header chain" and "body chain", updating the header chain in sync resets
// all additional "future" headers we've received
let tip = Tip::from_block(&b.header);
if ctx.opts.contains(Options::SYNC) {
batch
.save_body_head(&tip)
.map_err(|e| ErrorKind::StoreErr(e, "pipe save body".to_owned()))?;
} else {
batch
.save_head(&tip)
.map_err(|e| ErrorKind::StoreErr(e, "pipe save head".to_owned()))?;
}
ctx.batch
.save_body_head(&tip)
.map_err(|e| ErrorKind::StoreErr(e, "pipe save body".to_owned()))?;
debug!(
LOGGER,
"pipe: chain head {} @ {}",
b.hash(),
b.header.height
"pipe: head updated to {} at {}", tip.last_block_h, tip.height
);
Ok(Some(tip))
} else {
Ok(None)
@ -670,9 +624,8 @@ fn update_head(
}
// Whether the provided block totals more work than the chain tip
fn block_has_more_work(header: &BlockHeader, tip: &Tip) -> bool {
let block_tip = Tip::from_block(header);
block_tip.total_difficulty > tip.total_difficulty
fn has_more_work(header: &BlockHeader, head: &Tip) -> bool {
header.total_difficulty() > head.total_difficulty
}
/// Update the sync head so we can keep syncing from where we left off.
@ -686,18 +639,19 @@ fn update_sync_head(bh: &BlockHeader, batch: &mut store::Batch) -> Result<(), Er
}
/// Update the header head if this header has most work.
fn update_header_head(
bh: &BlockHeader,
ctx: &mut BlockContext,
batch: &mut store::Batch,
) -> Result<Option<Tip>, Error> {
let tip = Tip::from_block(bh);
if tip.total_difficulty > ctx.header_head.total_difficulty {
batch
fn update_header_head(bh: &BlockHeader, ctx: &mut BlockContext) -> Result<Option<Tip>, Error> {
let header_head = ctx.batch.header_head()?;
if has_more_work(&bh, &header_head) {
let tip = Tip::from_block(bh);
ctx.batch
.save_header_head(&tip)
.map_err(|e| ErrorKind::StoreErr(e, "pipe save header head".to_owned()))?;
ctx.header_head = tip.clone();
debug!(LOGGER, "header head {} @ {}", bh.hash(), bh.height);
debug!(
LOGGER,
"pipe: header_head updated to {} at {}", tip.last_block_h, tip.height
);
Ok(Some(tip))
} else {
Ok(None)

View file

@ -67,11 +67,13 @@ impl ChainStore {
option_to_not_found(self.db.get_ser(&vec![HEAD_PREFIX]), "HEAD")
}
/// Header of the block at the head of the block chain (not the same thing as header_head).
pub fn head_header(&self) -> Result<BlockHeader, Error> {
self.get_block_header(&self.head()?.last_block_h)
}
pub fn get_header_head(&self) -> Result<Tip, Error> {
/// Head of the header chain (not the same thing as head_header).
pub fn header_head(&self) -> Result<Tip, Error> {
option_to_not_found(self.db.get_ser(&vec![HEADER_HEAD_PREFIX]), "HEADER_HEAD")
}
@ -170,11 +172,13 @@ impl<'a> Batch<'a> {
option_to_not_found(self.db.get_ser(&vec![HEAD_PREFIX]), "HEAD")
}
/// Header of the block at the head of the block chain (not the same thing as header_head).
pub fn head_header(&self) -> Result<BlockHeader, Error> {
self.get_block_header(&self.head()?.last_block_h)
}
pub fn get_header_head(&self) -> Result<Tip, Error> {
/// Head of the header chain (not the same thing as head_header).
pub fn header_head(&self) -> Result<Tip, Error> {
option_to_not_found(self.db.get_ser(&vec![HEADER_HEAD_PREFIX]), "HEADER_HEAD")
}
@ -207,7 +211,7 @@ impl<'a> Batch<'a> {
}
pub fn reset_sync_head(&self) -> Result<(), Error> {
let head = self.get_header_head()?;
let head = self.header_head()?;
self.save_sync_head(&head)
}

View file

@ -61,7 +61,7 @@ pub struct NetToChainAdapter {
impl p2p::ChainAdapter for NetToChainAdapter {
fn total_difficulty(&self) -> Difficulty {
w(&self.chain).total_difficulty()
w(&self.chain).head().unwrap().total_difficulty
}
fn total_height(&self) -> u64 {
@ -450,7 +450,7 @@ impl NetToChainAdapter {
let prev_hash = b.header.previous;
let bhash = b.hash();
match chain.process_block(b, self.chain_opts()) {
Ok((tip, _)) => {
Ok(tip) => {
self.validate_chain(bhash);
self.check_compact(tip);
true
@ -523,13 +523,13 @@ impl NetToChainAdapter {
}
}
fn check_compact(&self, tip_res: Option<Tip>) {
fn check_compact(&self, tip: Option<Tip>) {
// no compaction during sync or if we're in historical mode
if self.archive_mode || self.sync_state.is_syncing() {
return;
}
if let Some(tip) = tip_res {
if let Some(tip) = tip {
// trigger compaction every 2000 blocks, uses a different thread to avoid
// blocking the caller thread (likely a peer)
if tip.height % 2000 == 0 {

View file

@ -379,7 +379,7 @@ impl Server {
/// The head of the block header chain
pub fn header_head(&self) -> chain::Tip {
self.chain.get_header_head().unwrap()
self.chain.header_head().unwrap()
}
/// Returns a set of stats about this server. This and the ServerStats

View file

@ -148,7 +148,7 @@ pub fn run_sync(
// if syncing is needed
let head = chain.head().unwrap();
let header_head = chain.get_header_head().unwrap();
let header_head = chain.header_head().unwrap();
// run the header sync in every 10s at least
if si.header_sync_due(sync_state.as_ref(), &header_head) {
@ -365,7 +365,7 @@ impl BodySyncInfo {
fn body_sync(peers: Arc<Peers>, chain: Arc<chain::Chain>, body_sync_info: &mut BodySyncInfo) {
let horizon = global::cut_through_horizon() as u64;
let body_head: chain::Tip = chain.head().unwrap();
let header_head: chain::Tip = chain.get_header_head().unwrap();
let header_head: chain::Tip = chain.header_head().unwrap();
let sync_head: chain::Tip = chain.get_sync_head().unwrap();
body_sync_info.reset();
@ -455,7 +455,7 @@ fn header_sync(
chain: Arc<chain::Chain>,
history_locators: &mut Vec<(u64, Hash)>,
) {
if let Ok(header_head) = chain.get_header_head() {
if let Ok(header_head) = chain.header_head() {
let difficulty = header_head.total_difficulty;
if let Some(peer) = peers.most_work_peer() {
@ -522,7 +522,7 @@ fn needs_syncing(
peers: Arc<Peers>,
chain: Arc<chain::Chain>,
) -> (bool, u64) {
let local_diff = chain.total_difficulty();
let local_diff = chain.head().unwrap().total_difficulty;
let peer = peers.most_work_peer();
let is_syncing = sync_state.is_syncing();
let mut most_work_height = 0;
@ -596,7 +596,7 @@ fn get_locator(
// for security, clear history_locators[] in any case of header chain rollback,
// the easiest way is to check whether the sync head and the header head are identical.
if history_locators.len() > 0 && tip.hash() != chain.get_header_head()?.hash() {
if history_locators.len() > 0 && tip.hash() != chain.header_head()?.hash() {
history_locators.clear();
}

View file

@ -41,13 +41,27 @@ impl TUIStatusListener for TUIStatusView {
LinearLayout::new(Orientation::Horizontal)
.child(TextView::new("Connected Peers: "))
.child(TextView::new("0").with_id("connected_peers")),
).child(
LinearLayout::new(Orientation::Horizontal)
.child(TextView::new("------------------------")),
).child(
LinearLayout::new(Orientation::Horizontal)
.child(TextView::new("Header Chain Height: "))
.child(TextView::new(" ").with_id("basic_header_chain_height")),
).child(
LinearLayout::new(Orientation::Horizontal)
.child(TextView::new("Header Cumulative Difficulty: "))
.child(TextView::new(" ").with_id("basic_header_total_difficulty")),
).child(
LinearLayout::new(Orientation::Horizontal)
.child(TextView::new("------------------------")),
).child(
LinearLayout::new(Orientation::Horizontal)
.child(TextView::new("Chain Height: "))
.child(TextView::new(" ").with_id("chain_height")),
).child(
LinearLayout::new(Orientation::Horizontal)
.child(TextView::new("Total Difficulty: "))
.child(TextView::new("Cumulative Difficulty: "))
.child(TextView::new(" ").with_id("basic_total_difficulty")),
).child(
LinearLayout::new(Orientation::Horizontal)
@ -178,6 +192,12 @@ impl TUIStatusListener for TUIStatusView {
c.call_on_id("basic_total_difficulty", |t: &mut TextView| {
t.set_content(stats.head.total_difficulty.to_string());
});
c.call_on_id("basic_header_chain_height", |t: &mut TextView| {
t.set_content(stats.header_head.height.to_string());
});
c.call_on_id("basic_header_total_difficulty", |t: &mut TextView| {
t.set_content(stats.header_head.total_difficulty.to_string());
});
/*c.call_on_id("basic_mining_config_status", |t: &mut TextView| {
t.set_content(basic_mining_config_status);
});