Automatically trigger compaction every 2000 blocks (#1054)

Does so in a separate thread as it can take some time. Also
remove validation pre-compaction as it's hopefully unecessary
now.
This commit is contained in:
Ignotus Peverell 2018-05-11 17:58:52 +01:00 committed by GitHub
parent 4121ea1240
commit d3a33c790e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 45 additions and 12 deletions

View file

@ -636,12 +636,8 @@ impl Chain {
/// Meanwhile, the chain will not be able to accept new blocks. It should /// Meanwhile, the chain will not be able to accept new blocks. It should
/// therefore be called judiciously. /// therefore be called judiciously.
pub fn compact(&self) -> Result<(), Error> { pub fn compact(&self) -> Result<(), Error> {
// First check we can successfully validate the full chain state. debug!(LOGGER, "Starting blockchain compaction.");
// If we cannot then do not attempt to compact. // Compact the txhashset via the extension.
// This should not be required long term - but doing this for debug purposes.
self.validate(true)?;
// Now compact the txhashset via the extension.
{ {
let mut txhashes = self.txhashset.write().unwrap(); let mut txhashes = self.txhashset.write().unwrap();
txhashes.compact()?; txhashes.compact()?;
@ -654,7 +650,8 @@ impl Chain {
} }
// Now check we can still successfully validate the chain state after // Now check we can still successfully validate the chain state after
// compacting. // compacting, shouldn't be necessary once all of this is well-oiled
debug!(LOGGER, "Validating state after compaction.");
self.validate(true)?; self.validate(true)?;
// we need to be careful here in testing as 20 blocks is not that long // we need to be careful here in testing as 20 blocks is not that long
@ -666,10 +663,17 @@ impl Chain {
return Ok(()); return Ok(());
} }
debug!(
LOGGER,
"Compaction remove blocks older than {}.",
head.height - horizon
);
let mut count = 0;
let mut current = self.store.get_header_by_height(head.height - horizon - 1)?; let mut current = self.store.get_header_by_height(head.height - horizon - 1)?;
loop { loop {
match self.store.get_block(&current.hash()) { match self.store.get_block(&current.hash()) {
Ok(b) => { Ok(b) => {
count += 1;
self.store.delete_block(&b.hash())?; self.store.delete_block(&b.hash())?;
self.store.delete_block_marker(&b.hash())?; self.store.delete_block_marker(&b.hash())?;
self.store.delete_block_sums(&b.hash())?; self.store.delete_block_sums(&b.hash())?;
@ -688,6 +692,7 @@ impl Chain {
Err(e) => return Err(From::from(e)), Err(e) => return Err(From::from(e)),
} }
} }
debug!(LOGGER, "Compaction removed {} blocks, done.", count);
Ok(()) Ok(())
} }

View file

@ -20,11 +20,12 @@ use std::net::SocketAddr;
use std::ops::Deref; use std::ops::Deref;
use std::sync::{Arc, RwLock, Weak}; use std::sync::{Arc, RwLock, Weak};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::time::Instant; use std::time::Instant;
use rand; use rand;
use rand::Rng; use rand::Rng;
use chain::{self, ChainAdapter, Options}; use chain::{self, ChainAdapter, Options, Tip};
use core::core; use core::core;
use core::core::block::BlockHeader; use core::core::block::BlockHeader;
use core::core::hash::{Hash, Hashed}; use core::core::hash::{Hash, Hashed};
@ -53,6 +54,7 @@ fn wo<T>(weak_one: &OneTime<Weak<T>>) -> Arc<T> {
/// implementations. /// implementations.
pub struct NetToChainAdapter { pub struct NetToChainAdapter {
currently_syncing: Arc<AtomicBool>, currently_syncing: Arc<AtomicBool>,
archive_mode: bool,
chain: Weak<chain::Chain>, chain: Weak<chain::Chain>,
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>, tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
peers: OneTime<Weak<p2p::Peers>>, peers: OneTime<Weak<p2p::Peers>>,
@ -354,16 +356,18 @@ impl NetToChainAdapter {
/// Construct a new NetToChainAdapter instance /// Construct a new NetToChainAdapter instance
pub fn new( pub fn new(
currently_syncing: Arc<AtomicBool>, currently_syncing: Arc<AtomicBool>,
archive_mode: bool,
chain_ref: Weak<chain::Chain>, chain_ref: Weak<chain::Chain>,
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>, tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
config: ServerConfig, config: ServerConfig,
) -> NetToChainAdapter { ) -> NetToChainAdapter {
NetToChainAdapter { NetToChainAdapter {
currently_syncing: currently_syncing, currently_syncing,
archive_mode,
chain: chain_ref, chain: chain_ref,
tx_pool: tx_pool, tx_pool,
peers: OneTime::new(), peers: OneTime::new(),
config: config, config,
} }
} }
@ -415,8 +419,9 @@ impl NetToChainAdapter {
let bhash = b.hash(); let bhash = b.hash();
let chain = w(&self.chain); let chain = w(&self.chain);
match chain.process_block(b, self.chain_opts()) { match chain.process_block(b, self.chain_opts()) {
Ok(_) => { Ok((tip, _)) => {
self.validate_chain(bhash); self.validate_chain(bhash);
self.check_compact(tip);
true true
} }
Err(chain::Error::Orphan) => { Err(chain::Error::Orphan) => {
@ -480,6 +485,28 @@ impl NetToChainAdapter {
} }
} }
fn check_compact(&self, tip_res: Option<Tip>) {
// no compaction during sync or if we're in historical mode
if self.archive_mode || self.currently_syncing.load(Ordering::Relaxed) {
return;
}
if let Some(tip) = tip_res {
// trigger compaction every 2000 blocks, uses a different thread to avoid
// blocking the caller thread (likely a peer)
if tip.height % 2000 == 0 {
let chain = w(&self.chain);
let _ = thread::Builder::new()
.name("compactor".to_string())
.spawn(move || {
if let Err(e) = chain.compact() {
error!(LOGGER, "Could not compact chain: {:?}", e);
}
});
}
}
}
// After receiving a compact block if we cannot successfully hydrate // After receiving a compact block if we cannot successfully hydrate
// it into a full block then fallback to requesting the full block // it into a full block then fallback to requesting the full block
// from the same peer that gave us the compact block // from the same peer that gave us the compact block

View file

@ -146,6 +146,7 @@ impl Server {
let net_adapter = Arc::new(NetToChainAdapter::new( let net_adapter = Arc::new(NetToChainAdapter::new(
currently_syncing.clone(), currently_syncing.clone(),
archive_mode,
Arc::downgrade(&shared_chain), Arc::downgrade(&shared_chain),
tx_pool.clone(), tx_pool.clone(),
config.clone(), config.clone(),