mirror of
https://github.com/mimblewimble/grin.git
synced 2025-02-08 12:21:09 +03:00
Block accepted reorg aware (#2003)
* block_accepted via adapter is now reorg aware we skip reconciling the txpool is block is not most work we skip reconciling the reorg_cache is not a reorg * rustfmt * logging tweaks * rework block_accepted interface * rustfmt * rework reorg determination * introduce BlockStatus to represent next vs reorg vs fork * rustfmt * cleanup logging * log from adapter, even during sync
This commit is contained in:
parent
61f1f6103b
commit
66f2545186
5 changed files with 119 additions and 55 deletions
|
@ -38,7 +38,9 @@ use grin_store::Error::NotFoundErr;
|
||||||
use pipe;
|
use pipe;
|
||||||
use store;
|
use store;
|
||||||
use txhashset;
|
use txhashset;
|
||||||
use types::{ChainAdapter, NoStatus, Options, Tip, TxHashSetRoots, TxHashsetWriteStatus};
|
use types::{
|
||||||
|
BlockStatus, ChainAdapter, NoStatus, Options, Tip, TxHashSetRoots, TxHashsetWriteStatus,
|
||||||
|
};
|
||||||
use util::secp::pedersen::{Commitment, RangeProof};
|
use util::secp::pedersen::{Commitment, RangeProof};
|
||||||
|
|
||||||
/// Orphan pool size is limited by MAX_ORPHAN_SIZE
|
/// Orphan pool size is limited by MAX_ORPHAN_SIZE
|
||||||
|
@ -235,22 +237,43 @@ impl Chain {
|
||||||
res
|
res
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn determine_status(&self, head: Option<Tip>, prev_head: Tip) -> BlockStatus {
|
||||||
|
// We have more work if the chain head is updated.
|
||||||
|
let is_more_work = head.is_some();
|
||||||
|
|
||||||
|
let mut is_next_block = false;
|
||||||
|
if let Some(head) = head {
|
||||||
|
if head.prev_block_h == prev_head.last_block_h {
|
||||||
|
is_next_block = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
match (is_more_work, is_next_block) {
|
||||||
|
(true, true) => BlockStatus::Next,
|
||||||
|
(true, false) => BlockStatus::Reorg,
|
||||||
|
(false, _) => BlockStatus::Fork,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Attempt to add a new block to the chain.
|
/// Attempt to add a new block to the chain.
|
||||||
/// Returns true if it has been added to the longest chain
|
/// Returns true if it has been added to the longest chain
|
||||||
/// or false if it has added to a fork (or orphan?).
|
/// or false if it has added to a fork (or orphan?).
|
||||||
fn process_block_single(&self, b: Block, opts: Options) -> Result<Option<Tip>, Error> {
|
fn process_block_single(&self, b: Block, opts: Options) -> Result<Option<Tip>, Error> {
|
||||||
let maybe_new_head: Result<Option<Tip>, Error>;
|
let (maybe_new_head, prev_head) = {
|
||||||
{
|
|
||||||
let mut txhashset = self.txhashset.write();
|
let mut txhashset = self.txhashset.write();
|
||||||
let batch = self.store.batch()?;
|
let batch = self.store.batch()?;
|
||||||
let mut ctx = self.new_ctx(opts, batch, &mut txhashset)?;
|
let mut ctx = self.new_ctx(opts, batch, &mut txhashset)?;
|
||||||
|
|
||||||
maybe_new_head = pipe::process_block(&b, &mut ctx);
|
let prev_head = ctx.batch.head()?;
|
||||||
|
|
||||||
|
let maybe_new_head = pipe::process_block(&b, &mut ctx);
|
||||||
if let Ok(_) = maybe_new_head {
|
if let Ok(_) = maybe_new_head {
|
||||||
ctx.batch.commit()?;
|
ctx.batch.commit()?;
|
||||||
}
|
}
|
||||||
|
|
||||||
// release the lock and let the batch go before post-processing
|
// release the lock and let the batch go before post-processing
|
||||||
}
|
(maybe_new_head, prev_head)
|
||||||
|
};
|
||||||
|
|
||||||
let add_to_hash_cache = |hash: Hash| {
|
let add_to_hash_cache = |hash: Hash| {
|
||||||
// only add to hash cache below if block is definitively accepted
|
// only add to hash cache below if block is definitively accepted
|
||||||
|
@ -263,8 +286,10 @@ impl Chain {
|
||||||
Ok(head) => {
|
Ok(head) => {
|
||||||
add_to_hash_cache(b.hash());
|
add_to_hash_cache(b.hash());
|
||||||
|
|
||||||
|
let status = self.determine_status(head.clone(), prev_head);
|
||||||
|
|
||||||
// notifying other parts of the system of the update
|
// notifying other parts of the system of the update
|
||||||
self.adapter.block_accepted(&b, opts);
|
self.adapter.block_accepted(&b, status, opts);
|
||||||
|
|
||||||
Ok(head)
|
Ok(head)
|
||||||
}
|
}
|
||||||
|
@ -983,8 +1008,7 @@ impl Chain {
|
||||||
if outputs.0 != rangeproofs.0 || outputs.1.len() != rangeproofs.1.len() {
|
if outputs.0 != rangeproofs.0 || outputs.1.len() != rangeproofs.1.len() {
|
||||||
return Err(ErrorKind::TxHashSetErr(String::from(
|
return Err(ErrorKind::TxHashSetErr(String::from(
|
||||||
"Output and rangeproof sets don't match",
|
"Output and rangeproof sets don't match",
|
||||||
))
|
)).into());
|
||||||
.into());
|
|
||||||
}
|
}
|
||||||
let mut output_vec: Vec<Output> = vec![];
|
let mut output_vec: Vec<Output> = vec![];
|
||||||
for (ref x, &y) in outputs.1.iter().zip(rangeproofs.1.iter()) {
|
for (ref x, &y) in outputs.1.iter().zip(rangeproofs.1.iter()) {
|
||||||
|
|
|
@ -53,4 +53,4 @@ pub mod types;
|
||||||
pub use chain::{Chain, MAX_ORPHAN_SIZE};
|
pub use chain::{Chain, MAX_ORPHAN_SIZE};
|
||||||
pub use error::{Error, ErrorKind};
|
pub use error::{Error, ErrorKind};
|
||||||
pub use store::ChainStore;
|
pub use store::ChainStore;
|
||||||
pub use types::{ChainAdapter, Options, Tip, TxHashsetWriteStatus};
|
pub use types::{BlockStatus, ChainAdapter, Options, Tip, TxHashsetWriteStatus};
|
||||||
|
|
|
@ -118,7 +118,7 @@ impl ser::Readable for Tip {
|
||||||
pub trait ChainAdapter {
|
pub trait ChainAdapter {
|
||||||
/// The blockchain pipeline has accepted this block as valid and added
|
/// The blockchain pipeline has accepted this block as valid and added
|
||||||
/// it to our chain.
|
/// it to our chain.
|
||||||
fn block_accepted(&self, b: &Block, opts: Options);
|
fn block_accepted(&self, block: &Block, status: BlockStatus, opts: Options);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Inform the caller of the current status of a txhashset write operation,
|
/// Inform the caller of the current status of a txhashset write operation,
|
||||||
|
@ -151,5 +151,17 @@ impl TxHashsetWriteStatus for NoStatus {
|
||||||
pub struct NoopAdapter {}
|
pub struct NoopAdapter {}
|
||||||
|
|
||||||
impl ChainAdapter for NoopAdapter {
|
impl ChainAdapter for NoopAdapter {
|
||||||
fn block_accepted(&self, _: &Block, _: Options) {}
|
fn block_accepted(&self, _b: &Block, _status: BlockStatus, _opts: Options) {}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Status of an accepted block.
|
||||||
|
#[derive(Debug, Clone, PartialEq)]
|
||||||
|
pub enum BlockStatus {
|
||||||
|
/// Block is the "next" block, updating the chain head.
|
||||||
|
Next,
|
||||||
|
/// Block does not update the chain head and is a fork.
|
||||||
|
Fork,
|
||||||
|
/// Block updates the chain head via a (potentially disruptive) "reorg".
|
||||||
|
/// Previous block was not our previous chain head.
|
||||||
|
Reorg,
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,7 +22,6 @@ use std::sync::Arc;
|
||||||
use util::RwLock;
|
use util::RwLock;
|
||||||
|
|
||||||
use chrono::prelude::*;
|
use chrono::prelude::*;
|
||||||
use chrono::Duration;
|
|
||||||
|
|
||||||
use core::core::hash::{Hash, Hashed};
|
use core::core::hash::{Hash, Hashed};
|
||||||
use core::core::id::ShortId;
|
use core::core::id::ShortId;
|
||||||
|
@ -97,15 +96,6 @@ impl TransactionPool {
|
||||||
debug!("added tx to reorg_cache: size now {}", cache.len());
|
debug!("added tx to reorg_cache: size now {}", cache.len());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Old txs will "age out" after 30 mins.
|
|
||||||
fn truncate_reorg_cache(&mut self, cutoff: DateTime<Utc>) {
|
|
||||||
let mut cache = self.reorg_cache.write();
|
|
||||||
|
|
||||||
while cache.front().map(|x| x.tx_at < cutoff).unwrap_or(false) {
|
|
||||||
let _ = cache.pop_front();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn add_to_txpool(
|
fn add_to_txpool(
|
||||||
&mut self,
|
&mut self,
|
||||||
mut entry: PoolEntry,
|
mut entry: PoolEntry,
|
||||||
|
@ -179,17 +169,31 @@ impl TransactionPool {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn reconcile_reorg_cache(&mut self, header: &BlockHeader) -> Result<(), PoolError> {
|
// Old txs will "age out" after 30 mins.
|
||||||
// First "age out" any old txs in the reorg cache.
|
pub fn truncate_reorg_cache(&mut self, cutoff: DateTime<Utc>) {
|
||||||
let cutoff = Utc::now() - Duration::minutes(30);
|
let mut cache = self.reorg_cache.write();
|
||||||
self.truncate_reorg_cache(cutoff);
|
|
||||||
|
|
||||||
|
while cache.front().map(|x| x.tx_at < cutoff).unwrap_or(false) {
|
||||||
|
let _ = cache.pop_front();
|
||||||
|
}
|
||||||
|
|
||||||
|
debug!("truncate_reorg_cache: size: {}", cache.len());
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn reconcile_reorg_cache(&mut self, header: &BlockHeader) -> Result<(), PoolError> {
|
||||||
let entries = self.reorg_cache.read().iter().cloned().collect::<Vec<_>>();
|
let entries = self.reorg_cache.read().iter().cloned().collect::<Vec<_>>();
|
||||||
debug!("reconcile_reorg_cache: size: {} ...", entries.len());
|
debug!(
|
||||||
|
"reconcile_reorg_cache: size: {}, block: {:?} ...",
|
||||||
|
entries.len(),
|
||||||
|
header.hash(),
|
||||||
|
);
|
||||||
for entry in entries {
|
for entry in entries {
|
||||||
let _ = &self.add_to_txpool(entry.clone(), header);
|
let _ = &self.add_to_txpool(entry.clone(), header);
|
||||||
}
|
}
|
||||||
debug!("reconcile_reorg_cache: ... done.");
|
debug!(
|
||||||
|
"reconcile_reorg_cache: block: {:?} ... done.",
|
||||||
|
header.hash()
|
||||||
|
);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -200,10 +204,6 @@ impl TransactionPool {
|
||||||
self.txpool.reconcile_block(block);
|
self.txpool.reconcile_block(block);
|
||||||
self.txpool.reconcile(None, &block.header)?;
|
self.txpool.reconcile(None, &block.header)?;
|
||||||
|
|
||||||
// Take our "reorg_cache" and see if this block means
|
|
||||||
// we need to (re)add old txs due to a fork and re-org.
|
|
||||||
self.reconcile_reorg_cache(&block.header)?;
|
|
||||||
|
|
||||||
// Now reconcile our stempool, accounting for the updated txpool txs.
|
// Now reconcile our stempool, accounting for the updated txpool txs.
|
||||||
self.stempool.reconcile_block(block);
|
self.stempool.reconcile_block(block);
|
||||||
{
|
{
|
||||||
|
|
|
@ -22,8 +22,9 @@ use std::thread;
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
use util::RwLock;
|
use util::RwLock;
|
||||||
|
|
||||||
use chain::{self, ChainAdapter, Options};
|
use chain::{self, BlockStatus, ChainAdapter, Options};
|
||||||
use chrono::prelude::{DateTime, Utc};
|
use chrono::prelude::*;
|
||||||
|
use chrono::Duration;
|
||||||
use common::types::{self, ChainValidationMode, ServerConfig, SyncState, SyncStatus};
|
use common::types::{self, ChainValidationMode, ServerConfig, SyncState, SyncStatus};
|
||||||
use core::core::hash::{Hash, Hashed};
|
use core::core::hash::{Hash, Hashed};
|
||||||
use core::core::transaction::Transaction;
|
use core::core::transaction::Transaction;
|
||||||
|
@ -182,7 +183,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
|
||||||
.validate(&prev.total_kernel_offset, self.verifier_cache.clone())
|
.validate(&prev.total_kernel_offset, self.verifier_cache.clone())
|
||||||
.is_ok()
|
.is_ok()
|
||||||
{
|
{
|
||||||
debug!("adapter: successfully hydrated block from tx pool!");
|
debug!("successfully hydrated block from tx pool!");
|
||||||
self.process_block(block, addr)
|
self.process_block(block, addr)
|
||||||
} else {
|
} else {
|
||||||
if self.sync_state.status() == SyncStatus::NoSync {
|
if self.sync_state.status() == SyncStatus::NoSync {
|
||||||
|
@ -190,14 +191,12 @@ impl p2p::ChainAdapter for NetToChainAdapter {
|
||||||
self.request_block(&cb.header, &addr);
|
self.request_block(&cb.header, &addr);
|
||||||
true
|
true
|
||||||
} else {
|
} else {
|
||||||
debug!(
|
debug!("block invalid after hydration, ignoring it, cause still syncing");
|
||||||
"adapter: block invalid after hydration, ignoring it, cause still syncing"
|
|
||||||
);
|
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
debug!("adapter: failed to retrieve previous block header (still syncing?)");
|
debug!("failed to retrieve previous block header (still syncing?)");
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -469,10 +468,7 @@ impl NetToChainAdapter {
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
Err(ref e) if e.is_bad_data() => {
|
Err(ref e) if e.is_bad_data() => {
|
||||||
debug!(
|
debug!("process_block: {} is a bad block, resetting head", bhash);
|
||||||
"adapter: process_block: {} is a bad block, resetting head",
|
|
||||||
bhash
|
|
||||||
);
|
|
||||||
let _ = self.chain().reset_head();
|
let _ = self.chain().reset_head();
|
||||||
|
|
||||||
// we potentially changed the state of the system here
|
// we potentially changed the state of the system here
|
||||||
|
@ -489,7 +485,7 @@ impl NetToChainAdapter {
|
||||||
if !self.chain().is_orphan(&previous.hash())
|
if !self.chain().is_orphan(&previous.hash())
|
||||||
&& !self.sync_state.is_syncing()
|
&& !self.sync_state.is_syncing()
|
||||||
{
|
{
|
||||||
debug!("adapter: process_block: received an orphan block, checking the parent: {:}", previous.hash());
|
debug!("process_block: received an orphan block, checking the parent: {:}", previous.hash());
|
||||||
self.request_block_by_hash(previous.hash(), &addr)
|
self.request_block_by_hash(previous.hash(), &addr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -497,7 +493,7 @@ impl NetToChainAdapter {
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
debug!(
|
debug!(
|
||||||
"adapter: process_block: block {} refused by chain: {}",
|
"process_block: block {} refused by chain: {}",
|
||||||
bhash,
|
bhash,
|
||||||
e.kind()
|
e.kind()
|
||||||
);
|
);
|
||||||
|
@ -521,7 +517,7 @@ impl NetToChainAdapter {
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
"adapter: process_block: ***** validating full chain state at {}",
|
"process_block: ***** validating full chain state at {}",
|
||||||
bhash,
|
bhash,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -530,7 +526,7 @@ impl NetToChainAdapter {
|
||||||
.expect("chain validation failed, hard stop");
|
.expect("chain validation failed, hard stop");
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
"adapter: process_block: ***** done validating full chain state, took {}s",
|
"process_block: ***** done validating full chain state, took {}s",
|
||||||
now.elapsed().as_secs(),
|
now.elapsed().as_secs(),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -644,13 +640,38 @@ pub struct ChainToPoolAndNetAdapter {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ChainAdapter for ChainToPoolAndNetAdapter {
|
impl ChainAdapter for ChainToPoolAndNetAdapter {
|
||||||
fn block_accepted(&self, b: &core::Block, opts: Options) {
|
fn block_accepted(&self, b: &core::Block, status: BlockStatus, opts: Options) {
|
||||||
|
match status {
|
||||||
|
BlockStatus::Reorg => {
|
||||||
|
warn!(
|
||||||
|
"block_accepted (REORG!): {:?} at {} (diff: {})",
|
||||||
|
b.hash(),
|
||||||
|
b.header.height,
|
||||||
|
b.header.total_difficulty(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
BlockStatus::Fork => {
|
||||||
|
debug!(
|
||||||
|
"block_accepted (fork?): {:?} at {} (diff: {})",
|
||||||
|
b.hash(),
|
||||||
|
b.header.height,
|
||||||
|
b.header.total_difficulty(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
BlockStatus::Next => {
|
||||||
|
debug!(
|
||||||
|
"block_accepted (head+): {:?} at {} (diff: {})",
|
||||||
|
b.hash(),
|
||||||
|
b.header.height,
|
||||||
|
b.header.total_difficulty(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if self.sync_state.is_syncing() {
|
if self.sync_state.is_syncing() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!("adapter: block_accepted: {:?}", b.hash());
|
|
||||||
|
|
||||||
// If we mined the block then we want to broadcast the compact block.
|
// If we mined the block then we want to broadcast the compact block.
|
||||||
// If we received the block from another node then broadcast "header first"
|
// If we received the block from another node then broadcast "header first"
|
||||||
// to minimize network traffic.
|
// to minimize network traffic.
|
||||||
|
@ -665,12 +686,19 @@ impl ChainAdapter for ChainToPoolAndNetAdapter {
|
||||||
|
|
||||||
// Reconcile the txpool against the new block *after* we have broadcast it too our peers.
|
// Reconcile the txpool against the new block *after* we have broadcast it too our peers.
|
||||||
// This may be slow and we do not want to delay block propagation.
|
// This may be slow and we do not want to delay block propagation.
|
||||||
if let Err(e) = self.tx_pool.write().reconcile_block(b) {
|
// We only want to reconcile the txpool against the new block *if* total work has increased.
|
||||||
error!(
|
if status == BlockStatus::Next || status == BlockStatus::Reorg {
|
||||||
"Pool could not update itself at block {}: {:?}",
|
let mut tx_pool = self.tx_pool.write();
|
||||||
b.hash(),
|
|
||||||
e,
|
let _ = tx_pool.reconcile_block(b);
|
||||||
);
|
|
||||||
|
// First "age out" any old txs in the reorg_cache.
|
||||||
|
let cutoff = Utc::now() - Duration::minutes(30);
|
||||||
|
tx_pool.truncate_reorg_cache(cutoff);
|
||||||
|
}
|
||||||
|
|
||||||
|
if status == BlockStatus::Reorg {
|
||||||
|
let _ = self.tx_pool.write().reconcile_reorg_cache(&b.header);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue