Various improvements and cleanup in chain crate ()

* Remove now unnecessary txhashset write lock
* Ring buffer of hashes the chain has already processed
* Specifically report too old blocks as peer should be banned
* Move sync check for block relay, clean TODO
* No use processing transactions when syncing
* Ignore blocks older than horizon in pruning nodes
This commit is contained in:
Ignotus Peverell 2018-07-16 21:58:56 +01:00 committed by GitHub
parent bff0c6b6c6
commit 3d89e86906
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 85 additions and 41 deletions
chain/src
servers/src

View file

@ -15,7 +15,7 @@
//! Facade and handler for the rest of the blockchain implementation
//! and mostly the chain pipeline.
use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::fs::File;
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant};
@ -42,6 +42,9 @@ pub const MAX_ORPHAN_SIZE: usize = 200;
/// When evicting, very old orphans are evicted first
const MAX_ORPHAN_AGE_SECS: u64 = 300;
/// Number of recent hashes we keep to de-duplicate block or header sends
const HASHES_CACHE_SIZE: usize = 50;
#[derive(Debug, Clone)]
struct Orphan {
block: Block,
@ -130,8 +133,11 @@ pub struct Chain {
head: Arc<Mutex<Tip>>,
orphans: Arc<OrphanBlockPool>,
txhashset_lock: Arc<Mutex<bool>>,
txhashset: Arc<RwLock<txhashset::TxHashSet>>,
// Recently processed blocks to avoid double-processing
block_hashes_cache: Arc<RwLock<VecDeque<Hash>>>,
// Recently processed headers to avoid double-processing
header_hashes_cache: Arc<RwLock<VecDeque<Hash>>>,
// POW verification function
pow_verifier: fn(&BlockHeader, u8) -> bool,
@ -177,9 +183,10 @@ impl Chain {
adapter: adapter,
head: Arc::new(Mutex::new(head)),
orphans: Arc::new(OrphanBlockPool::new()),
txhashset_lock: Arc::new(Mutex::new(false)),
txhashset: Arc::new(RwLock::new(txhashset)),
pow_verifier: pow_verifier,
block_hashes_cache: Arc::new(RwLock::new(VecDeque::with_capacity(HASHES_CACHE_SIZE))),
header_hashes_cache: Arc::new(RwLock::new(VecDeque::with_capacity(HASHES_CACHE_SIZE))),
})
}
@ -215,6 +222,12 @@ impl Chain {
let mut ctx = self.ctx_from_head(head, opts)?;
let res = pipe::process_block(&b, &mut ctx);
{
let mut cache = self.block_hashes_cache.write().unwrap();
cache.push_front(b.hash());
cache.truncate(HASHES_CACHE_SIZE);
}
match res {
Ok(Some(ref tip)) => {
@ -226,29 +239,17 @@ impl Chain {
}
// notifying other parts of the system of the update
if !opts.contains(Options::SYNC) {
// broadcast the block
let adapter = self.adapter.clone();
adapter.block_accepted(&b, opts);
}
self.adapter.block_accepted(&b, opts);
Ok((Some(tip.clone()), Some(b)))
}
Ok(None) => {
// block got accepted but we did not extend the head
// so its on a fork (or is the start of a new fork)
// broadcast the block out so everyone knows about the fork
//
// TODO - This opens us to an amplification attack on blocks
// mined at a low difficulty. We should suppress really old blocks
// or less relevant blocks somehow.
// We should also probably consider banning nodes that send us really old
// blocks.
//
if !opts.contains(Options::SYNC) {
// broadcast the block
let adapter = self.adapter.clone();
adapter.block_accepted(&b, opts);
}
self.adapter.block_accepted(&b, opts);
Ok((None, Some(b)))
}
Err(e) => {
@ -304,7 +305,13 @@ impl Chain {
pub fn process_block_header(&self, bh: &BlockHeader, opts: Options) -> Result<(), Error> {
let header_head = self.get_header_head()?;
let mut ctx = self.ctx_from_head(header_head, opts)?;
pipe::process_block_header(bh, &mut ctx)
let res = pipe::process_block_header(bh, &mut ctx);
{
let mut cache = self.header_hashes_cache.write().unwrap();
cache.push_front(bh.hash());
cache.truncate(HASHES_CACHE_SIZE);
}
res
}
/// Attempt to add a new header to the header chain.
@ -328,6 +335,8 @@ impl Chain {
store: self.store.clone(),
head: head,
pow_verifier: self.pow_verifier,
block_hashes_cache: self.block_hashes_cache.clone(),
header_hashes_cache: self.header_hashes_cache.clone(),
txhashset: self.txhashset.clone(),
})
}
@ -529,7 +538,6 @@ impl Chain {
where
T: TxHashsetWriteStatus,
{
let _ = self.txhashset_lock.lock().unwrap();
status.on_setup();
let head = self.head().unwrap();
let header_head = self.get_header_head().unwrap();

View file

@ -51,6 +51,9 @@ pub enum ErrorKind {
/// The proof of work is invalid
#[fail(display = "Invalid PoW")]
InvalidPow,
/// Peer abusively sending us an old block we already have
#[fail(display = "Invalid PoW")]
OldBlock,
/// The block doesn't sum correctly or a tx signature is invalid
#[fail(display = "Invalid Block Proof")]
InvalidBlockProof(block::Error),

View file

@ -14,6 +14,7 @@
//! Implementation of the chain block acceptance (or refusal) pipeline.
use std::collections::VecDeque;
use std::sync::{Arc, RwLock};
use time;
@ -45,6 +46,10 @@ pub struct BlockContext {
pub pow_verifier: fn(&BlockHeader, u8) -> bool,
/// MMR sum tree states
pub txhashset: Arc<RwLock<txhashset::TxHashSet>>,
/// Recently processed blocks to avoid double-processing
pub block_hashes_cache: Arc<RwLock<VecDeque<Hash>>>,
/// Recently processed headers to avoid double-processing
pub header_hashes_cache: Arc<RwLock<VecDeque<Hash>>>,
}
/// Runs the block processing pipeline, including validation and finding a
@ -175,16 +180,12 @@ pub fn process_block_header(bh: &BlockHeader, ctx: &mut BlockContext) -> Result<
/// recently. Keeps duplicates from the network in check.
/// ctx here is specific to the header_head (tip of the header chain)
fn check_header_known(bh: Hash, ctx: &mut BlockContext) -> Result<(), Error> {
// TODO ring buffer of the last few blocks that came through here
if bh == ctx.head.last_block_h || bh == ctx.head.prev_block_h {
return Err(ErrorKind::Unfit("already known".to_string()).into());
}
if let Ok(h) = ctx.store.get_block_header(&bh) {
// there is a window where a block header can be saved but the chain head not
// updated yet, we plug that window here by re-accepting the block
if h.total_difficulty <= ctx.head.total_difficulty {
return Err(ErrorKind::Unfit("already in store".to_string()).into());
}
let cache = ctx.block_hashes_cache.read().unwrap();
if cache.contains(&bh) {
return Err(ErrorKind::Unfit("already known".to_string()).into());
}
Ok(())
}
@ -192,16 +193,12 @@ fn check_header_known(bh: Hash, ctx: &mut BlockContext) -> Result<(), Error> {
/// Quick in-memory check to fast-reject any block we've already handled
/// recently. Keeps duplicates from the network in check.
fn check_known(bh: Hash, ctx: &mut BlockContext) -> Result<(), Error> {
// TODO ring buffer of the last few blocks that came through here
if bh == ctx.head.last_block_h || bh == ctx.head.prev_block_h {
return Err(ErrorKind::Unfit("already known".to_string()).into());
}
if let Ok(b) = ctx.store.get_block(&bh) {
// there is a window where a block can be saved but the chain head not
// updated yet, we plug that window here by re-accepting the block
if b.header.total_difficulty <= ctx.head.total_difficulty {
return Err(ErrorKind::Unfit("already in store".to_string()).into());
}
let cache = ctx.header_hashes_cache.read().unwrap();
if cache.contains(&bh) {
return Err(ErrorKind::Unfit("already known".to_string()).into());
}
Ok(())
}
@ -209,7 +206,6 @@ fn check_known(bh: Hash, ctx: &mut BlockContext) -> Result<(), Error> {
/// First level of block validation that only needs to act on the block header
/// to make it as cheap as possible. The different validations are also
/// arranged by order of cost to have as little DoS surface as possible.
/// TODO require only the block header (with length information)
fn validate_header(header: &BlockHeader, ctx: &mut BlockContext) -> Result<(), Error> {
// check version, enforces scheduled hard fork
if !consensus::valid_header_version(header.height, header.version) {
@ -312,6 +308,13 @@ fn validate_header(header: &BlockHeader, ctx: &mut BlockContext) -> Result<(), E
}
fn validate_block(b: &Block, ctx: &mut BlockContext) -> Result<(), Error> {
if ctx.store.block_exists(&b.hash())? {
if b.header.height < ctx.head.height.saturating_sub(50) {
return Err(ErrorKind::OldBlock.into());
} else {
return Err(ErrorKind::Unfit("already known".to_string()).into());
}
}
let prev = ctx.store.get_block_header(&b.header.previous)?;
b.validate(&prev.total_kernel_offset, &prev.total_kernel_sum)
.map_err(|e| ErrorKind::InvalidBlockProof(e))?;

View file

@ -25,7 +25,7 @@ use std::time::Instant;
use chain::{self, ChainAdapter, Options, Tip};
use common::types::{ChainValidationMode, ServerConfig, SyncState, SyncStatus};
use core::core;
use core::{core, global};
use core::core::block::BlockHeader;
use core::core::hash::{Hash, Hashed};
use core::core::target::Difficulty;
@ -68,6 +68,11 @@ impl p2p::ChainAdapter for NetToChainAdapter {
}
fn transaction_received(&self, tx: core::Transaction, stem: bool) {
// nothing much we can do with a new transaction while syncing
if self.sync_state.is_syncing() {
return;
}
let source = pool::TxSource {
debug_name: "p2p".to_string(),
identifier: "?.?.?.?".to_string(),
@ -339,6 +344,11 @@ impl p2p::ChainAdapter for NetToChainAdapter {
txhashset_data: File,
_peer_addr: SocketAddr,
) -> bool {
// check status again after download, in case 2 txhashsets made it somehow
if self.sync_state.status() != SyncStatus::TxHashsetDownload {
return true;
}
if let Err(e) =
w(&self.chain).txhashset_write(h, txhashset_data, self.sync_state.as_ref())
{
@ -416,9 +426,18 @@ impl NetToChainAdapter {
// pushing the new block through the chain pipeline
// remembering to reset the head if we have a bad block
fn process_block(&self, b: core::Block, addr: SocketAddr) -> bool {
let chain = w(&self.chain);
if !self.archive_mode {
let head = chain.head().unwrap();
// we have a fast sync'd node and are sent a block older than our horizon,
// only sync can do something with that
if b.header.height < head.height.saturating_sub(global::cut_through_horizon() as u64) {
return true;
}
}
let prev_hash = b.header.previous;
let bhash = b.hash();
let chain = w(&self.chain);
match chain.process_block(b, self.chain_opts()) {
Ok((tip, _)) => {
self.validate_chain(bhash);
@ -574,12 +593,17 @@ impl NetToChainAdapter {
/// blockchain accepted a new block, asking the pool to update its state and
/// the network to broadcast the block
pub struct ChainToPoolAndNetAdapter {
sync_state: Arc<SyncState>,
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
peers: OneTime<Weak<p2p::Peers>>,
}
impl ChainAdapter for ChainToPoolAndNetAdapter {
fn block_accepted(&self, b: &core::Block, opts: Options) {
if self.sync_state.is_syncing() {
return;
}
debug!(LOGGER, "adapter: block_accepted: {:?}", b.hash());
if let Err(e) = self.tx_pool.write().unwrap().reconcile_block(b) {
@ -633,10 +657,12 @@ impl ChainAdapter for ChainToPoolAndNetAdapter {
impl ChainToPoolAndNetAdapter {
/// Construct a ChainToPoolAndNetAdapter instance.
pub fn new(
sync_state: Arc<SyncState>,
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
) -> ChainToPoolAndNetAdapter {
ChainToPoolAndNetAdapter {
tx_pool: tx_pool,
sync_state,
tx_pool,
peers: OneTime::new(),
}
}

View file

@ -119,7 +119,12 @@ impl Server {
pool_net_adapter.clone(),
)));
let chain_adapter = Arc::new(ChainToPoolAndNetAdapter::new(tx_pool.clone()));
let sync_state = Arc::new(SyncState::new());
let chain_adapter = Arc::new(ChainToPoolAndNetAdapter::new(
sync_state.clone(),
tx_pool.clone(),
));
let genesis = match config.chain_type {
global::ChainTypes::Testnet1 => genesis::genesis_testnet1(),
@ -143,7 +148,6 @@ impl Server {
pool_adapter.set_chain(Arc::downgrade(&shared_chain));
let sync_state = Arc::new(SyncState::new());
let awaiting_peers = Arc::new(AtomicBool::new(false));
let net_adapter = Arc::new(NetToChainAdapter::new(