From 66f25451865cfa51cc321e253b077a57f33da71f Mon Sep 17 00:00:00 2001
From: Antioch Peverell <apeverell@protonmail.com>
Date: Wed, 21 Nov 2018 14:35:38 +0000
Subject: [PATCH] Block accepted reorg aware (#2003)

* block_accepted via adapter is now reorg aware
we skip reconciling the txpool is block is not most work
we skip reconciling the reorg_cache is not a reorg

* rustfmt

* logging tweaks

* rework block_accepted interface

* rustfmt

* rework reorg determination

* introduce BlockStatus to represent next vs reorg vs fork

* rustfmt

* cleanup logging

* log from adapter, even during sync
---
 chain/src/chain.rs             | 40 ++++++++++++++----
 chain/src/lib.rs               |  2 +-
 chain/src/types.rs             | 16 ++++++-
 pool/src/transaction_pool.rs   | 40 +++++++++---------
 servers/src/common/adapters.rs | 76 +++++++++++++++++++++++-----------
 5 files changed, 119 insertions(+), 55 deletions(-)

diff --git a/chain/src/chain.rs b/chain/src/chain.rs
index e628b12df..defab16ec 100644
--- a/chain/src/chain.rs
+++ b/chain/src/chain.rs
@@ -38,7 +38,9 @@ use grin_store::Error::NotFoundErr;
 use pipe;
 use store;
 use txhashset;
-use types::{ChainAdapter, NoStatus, Options, Tip, TxHashSetRoots, TxHashsetWriteStatus};
+use types::{
+	BlockStatus, ChainAdapter, NoStatus, Options, Tip, TxHashSetRoots, TxHashsetWriteStatus,
+};
 use util::secp::pedersen::{Commitment, RangeProof};
 
 /// Orphan pool size is limited by MAX_ORPHAN_SIZE
@@ -235,22 +237,43 @@ impl Chain {
 		res
 	}
 
+	fn determine_status(&self, head: Option<Tip>, prev_head: Tip) -> BlockStatus {
+		// We have more work if the chain head is updated.
+		let is_more_work = head.is_some();
+
+		let mut is_next_block = false;
+		if let Some(head) = head {
+			if head.prev_block_h == prev_head.last_block_h {
+				is_next_block = true;
+			}
+		}
+
+		match (is_more_work, is_next_block) {
+			(true, true) => BlockStatus::Next,
+			(true, false) => BlockStatus::Reorg,
+			(false, _) => BlockStatus::Fork,
+		}
+	}
+
 	/// Attempt to add a new block to the chain.
 	/// Returns true if it has been added to the longest chain
 	/// or false if it has added to a fork (or orphan?).
 	fn process_block_single(&self, b: Block, opts: Options) -> Result<Option<Tip>, Error> {
-		let maybe_new_head: Result<Option<Tip>, Error>;
-		{
+		let (maybe_new_head, prev_head) = {
 			let mut txhashset = self.txhashset.write();
 			let batch = self.store.batch()?;
 			let mut ctx = self.new_ctx(opts, batch, &mut txhashset)?;
 
-			maybe_new_head = pipe::process_block(&b, &mut ctx);
+			let prev_head = ctx.batch.head()?;
+
+			let maybe_new_head = pipe::process_block(&b, &mut ctx);
 			if let Ok(_) = maybe_new_head {
 				ctx.batch.commit()?;
 			}
+
 			// release the lock and let the batch go before post-processing
-		}
+			(maybe_new_head, prev_head)
+		};
 
 		let add_to_hash_cache = |hash: Hash| {
 			// only add to hash cache below if block is definitively accepted
@@ -263,8 +286,10 @@ impl Chain {
 			Ok(head) => {
 				add_to_hash_cache(b.hash());
 
+				let status = self.determine_status(head.clone(), prev_head);
+
 				// notifying other parts of the system of the update
-				self.adapter.block_accepted(&b, opts);
+				self.adapter.block_accepted(&b, status, opts);
 
 				Ok(head)
 			}
@@ -983,8 +1008,7 @@ impl Chain {
 		if outputs.0 != rangeproofs.0 || outputs.1.len() != rangeproofs.1.len() {
 			return Err(ErrorKind::TxHashSetErr(String::from(
 				"Output and rangeproof sets don't match",
-			))
-			.into());
+			)).into());
 		}
 		let mut output_vec: Vec<Output> = vec![];
 		for (ref x, &y) in outputs.1.iter().zip(rangeproofs.1.iter()) {
diff --git a/chain/src/lib.rs b/chain/src/lib.rs
index 82db548fb..9b6e65ff6 100644
--- a/chain/src/lib.rs
+++ b/chain/src/lib.rs
@@ -53,4 +53,4 @@ pub mod types;
 pub use chain::{Chain, MAX_ORPHAN_SIZE};
 pub use error::{Error, ErrorKind};
 pub use store::ChainStore;
-pub use types::{ChainAdapter, Options, Tip, TxHashsetWriteStatus};
+pub use types::{BlockStatus, ChainAdapter, Options, Tip, TxHashsetWriteStatus};
diff --git a/chain/src/types.rs b/chain/src/types.rs
index d0a4cdbbf..5b665b17f 100644
--- a/chain/src/types.rs
+++ b/chain/src/types.rs
@@ -118,7 +118,7 @@ impl ser::Readable for Tip {
 pub trait ChainAdapter {
 	/// The blockchain pipeline has accepted this block as valid and added
 	/// it to our chain.
-	fn block_accepted(&self, b: &Block, opts: Options);
+	fn block_accepted(&self, block: &Block, status: BlockStatus, opts: Options);
 }
 
 /// Inform the caller of the current status of a txhashset write operation,
@@ -151,5 +151,17 @@ impl TxHashsetWriteStatus for NoStatus {
 pub struct NoopAdapter {}
 
 impl ChainAdapter for NoopAdapter {
-	fn block_accepted(&self, _: &Block, _: Options) {}
+	fn block_accepted(&self, _b: &Block, _status: BlockStatus, _opts: Options) {}
+}
+
+/// Status of an accepted block.
+#[derive(Debug, Clone, PartialEq)]
+pub enum BlockStatus {
+	/// Block is the "next" block, updating the chain head.
+	Next,
+	/// Block does not update the chain head and is a fork.
+	Fork,
+	/// Block updates the chain head via a (potentially disruptive) "reorg".
+	/// Previous block was not our previous chain head.
+	Reorg,
 }
diff --git a/pool/src/transaction_pool.rs b/pool/src/transaction_pool.rs
index 911af6b56..1432f06f5 100644
--- a/pool/src/transaction_pool.rs
+++ b/pool/src/transaction_pool.rs
@@ -22,7 +22,6 @@ use std::sync::Arc;
 use util::RwLock;
 
 use chrono::prelude::*;
-use chrono::Duration;
 
 use core::core::hash::{Hash, Hashed};
 use core::core::id::ShortId;
@@ -97,15 +96,6 @@ impl TransactionPool {
 		debug!("added tx to reorg_cache: size now {}", cache.len());
 	}
 
-	// Old txs will "age out" after 30 mins.
-	fn truncate_reorg_cache(&mut self, cutoff: DateTime<Utc>) {
-		let mut cache = self.reorg_cache.write();
-
-		while cache.front().map(|x| x.tx_at < cutoff).unwrap_or(false) {
-			let _ = cache.pop_front();
-		}
-	}
-
 	fn add_to_txpool(
 		&mut self,
 		mut entry: PoolEntry,
@@ -179,17 +169,31 @@ impl TransactionPool {
 		Ok(())
 	}
 
-	fn reconcile_reorg_cache(&mut self, header: &BlockHeader) -> Result<(), PoolError> {
-		// First "age out" any old txs in the reorg cache.
-		let cutoff = Utc::now() - Duration::minutes(30);
-		self.truncate_reorg_cache(cutoff);
+	// Old txs will "age out" after 30 mins.
+	pub fn truncate_reorg_cache(&mut self, cutoff: DateTime<Utc>) {
+		let mut cache = self.reorg_cache.write();
 
+		while cache.front().map(|x| x.tx_at < cutoff).unwrap_or(false) {
+			let _ = cache.pop_front();
+		}
+
+		debug!("truncate_reorg_cache: size: {}", cache.len());
+	}
+
+	pub fn reconcile_reorg_cache(&mut self, header: &BlockHeader) -> Result<(), PoolError> {
 		let entries = self.reorg_cache.read().iter().cloned().collect::<Vec<_>>();
-		debug!("reconcile_reorg_cache: size: {} ...", entries.len());
+		debug!(
+			"reconcile_reorg_cache: size: {}, block: {:?} ...",
+			entries.len(),
+			header.hash(),
+		);
 		for entry in entries {
 			let _ = &self.add_to_txpool(entry.clone(), header);
 		}
-		debug!("reconcile_reorg_cache: ... done.");
+		debug!(
+			"reconcile_reorg_cache: block: {:?} ... done.",
+			header.hash()
+		);
 		Ok(())
 	}
 
@@ -200,10 +204,6 @@ impl TransactionPool {
 		self.txpool.reconcile_block(block);
 		self.txpool.reconcile(None, &block.header)?;
 
-		// Take our "reorg_cache" and see if this block means
-		// we need to (re)add old txs due to a fork and re-org.
-		self.reconcile_reorg_cache(&block.header)?;
-
 		// Now reconcile our stempool, accounting for the updated txpool txs.
 		self.stempool.reconcile_block(block);
 		{
diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs
index a8e4c864e..96e9dc2e2 100644
--- a/servers/src/common/adapters.rs
+++ b/servers/src/common/adapters.rs
@@ -22,8 +22,9 @@ use std::thread;
 use std::time::Instant;
 use util::RwLock;
 
-use chain::{self, ChainAdapter, Options};
-use chrono::prelude::{DateTime, Utc};
+use chain::{self, BlockStatus, ChainAdapter, Options};
+use chrono::prelude::*;
+use chrono::Duration;
 use common::types::{self, ChainValidationMode, ServerConfig, SyncState, SyncStatus};
 use core::core::hash::{Hash, Hashed};
 use core::core::transaction::Transaction;
@@ -182,7 +183,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
 					.validate(&prev.total_kernel_offset, self.verifier_cache.clone())
 					.is_ok()
 				{
-					debug!("adapter: successfully hydrated block from tx pool!");
+					debug!("successfully hydrated block from tx pool!");
 					self.process_block(block, addr)
 				} else {
 					if self.sync_state.status() == SyncStatus::NoSync {
@@ -190,14 +191,12 @@ impl p2p::ChainAdapter for NetToChainAdapter {
 						self.request_block(&cb.header, &addr);
 						true
 					} else {
-						debug!(
-							"adapter: block invalid after hydration, ignoring it, cause still syncing"
-						);
+						debug!("block invalid after hydration, ignoring it, cause still syncing");
 						true
 					}
 				}
 			} else {
-				debug!("adapter: failed to retrieve previous block header (still syncing?)");
+				debug!("failed to retrieve previous block header (still syncing?)");
 				true
 			}
 		}
@@ -469,10 +468,7 @@ impl NetToChainAdapter {
 				true
 			}
 			Err(ref e) if e.is_bad_data() => {
-				debug!(
-					"adapter: process_block: {} is a bad block, resetting head",
-					bhash
-				);
+				debug!("process_block: {} is a bad block, resetting head", bhash);
 				let _ = self.chain().reset_head();
 
 				// we potentially changed the state of the system here
@@ -489,7 +485,7 @@ impl NetToChainAdapter {
 							if !self.chain().is_orphan(&previous.hash())
 								&& !self.sync_state.is_syncing()
 							{
-								debug!("adapter: process_block: received an orphan block, checking the parent: {:}", previous.hash());
+								debug!("process_block: received an orphan block, checking the parent: {:}", previous.hash());
 								self.request_block_by_hash(previous.hash(), &addr)
 							}
 						}
@@ -497,7 +493,7 @@ impl NetToChainAdapter {
 					}
 					_ => {
 						debug!(
-							"adapter: process_block: block {} refused by chain: {}",
+							"process_block: block {} refused by chain: {}",
 							bhash,
 							e.kind()
 						);
@@ -521,7 +517,7 @@ impl NetToChainAdapter {
 			let now = Instant::now();
 
 			debug!(
-				"adapter: process_block: ***** validating full chain state at {}",
+				"process_block: ***** validating full chain state at {}",
 				bhash,
 			);
 
@@ -530,7 +526,7 @@ impl NetToChainAdapter {
 				.expect("chain validation failed, hard stop");
 
 			debug!(
-				"adapter: process_block: ***** done validating full chain state, took {}s",
+				"process_block: ***** done validating full chain state, took {}s",
 				now.elapsed().as_secs(),
 			);
 		}
@@ -644,13 +640,38 @@ pub struct ChainToPoolAndNetAdapter {
 }
 
 impl ChainAdapter for ChainToPoolAndNetAdapter {
-	fn block_accepted(&self, b: &core::Block, opts: Options) {
+	fn block_accepted(&self, b: &core::Block, status: BlockStatus, opts: Options) {
+		match status {
+			BlockStatus::Reorg => {
+				warn!(
+					"block_accepted (REORG!): {:?} at {} (diff: {})",
+					b.hash(),
+					b.header.height,
+					b.header.total_difficulty(),
+				);
+			}
+			BlockStatus::Fork => {
+				debug!(
+					"block_accepted (fork?): {:?} at {} (diff: {})",
+					b.hash(),
+					b.header.height,
+					b.header.total_difficulty(),
+				);
+			}
+			BlockStatus::Next => {
+				debug!(
+					"block_accepted (head+): {:?} at {} (diff: {})",
+					b.hash(),
+					b.header.height,
+					b.header.total_difficulty(),
+				);
+			}
+		}
+
 		if self.sync_state.is_syncing() {
 			return;
 		}
 
-		debug!("adapter: block_accepted: {:?}", b.hash());
-
 		// If we mined the block then we want to broadcast the compact block.
 		// If we received the block from another node then broadcast "header first"
 		// to minimize network traffic.
@@ -665,12 +686,19 @@ impl ChainAdapter for ChainToPoolAndNetAdapter {
 
 		// Reconcile the txpool against the new block *after* we have broadcast it too our peers.
 		// This may be slow and we do not want to delay block propagation.
-		if let Err(e) = self.tx_pool.write().reconcile_block(b) {
-			error!(
-				"Pool could not update itself at block {}: {:?}",
-				b.hash(),
-				e,
-			);
+		// We only want to reconcile the txpool against the new block *if* total work has increased.
+		if status == BlockStatus::Next || status == BlockStatus::Reorg {
+			let mut tx_pool = self.tx_pool.write();
+
+			let _ = tx_pool.reconcile_block(b);
+
+			// First "age out" any old txs in the reorg_cache.
+			let cutoff = Utc::now() - Duration::minutes(30);
+			tx_pool.truncate_reorg_cache(cutoff);
+		}
+
+		if status == BlockStatus::Reorg {
+			let _ = self.tx_pool.write().reconcile_reorg_cache(&b.header);
 		}
 	}
 }