From d3a33c790e5eee86ac31a8f942cde362245142e1 Mon Sep 17 00:00:00 2001 From: Ignotus Peverell Date: Fri, 11 May 2018 17:58:52 +0100 Subject: [PATCH] Automatically trigger compaction every 2000 blocks (#1054) Does so in a separate thread as it can take some time. Also remove validation pre-compaction as it's hopefully unecessary now. --- chain/src/chain.rs | 19 ++++++++++------- servers/src/common/adapters.rs | 37 +++++++++++++++++++++++++++++----- servers/src/grin/server.rs | 1 + 3 files changed, 45 insertions(+), 12 deletions(-) diff --git a/chain/src/chain.rs b/chain/src/chain.rs index 50ddd74a4..709425c93 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -636,12 +636,8 @@ impl Chain { /// Meanwhile, the chain will not be able to accept new blocks. It should /// therefore be called judiciously. pub fn compact(&self) -> Result<(), Error> { - // First check we can successfully validate the full chain state. - // If we cannot then do not attempt to compact. - // This should not be required long term - but doing this for debug purposes. - self.validate(true)?; - - // Now compact the txhashset via the extension. + debug!(LOGGER, "Starting blockchain compaction."); + // Compact the txhashset via the extension. { let mut txhashes = self.txhashset.write().unwrap(); txhashes.compact()?; @@ -654,7 +650,8 @@ impl Chain { } // Now check we can still successfully validate the chain state after - // compacting. + // compacting, shouldn't be necessary once all of this is well-oiled + debug!(LOGGER, "Validating state after compaction."); self.validate(true)?; // we need to be careful here in testing as 20 blocks is not that long @@ -666,10 +663,17 @@ impl Chain { return Ok(()); } + debug!( + LOGGER, + "Compaction remove blocks older than {}.", + head.height - horizon + ); + let mut count = 0; let mut current = self.store.get_header_by_height(head.height - horizon - 1)?; loop { match self.store.get_block(¤t.hash()) { Ok(b) => { + count += 1; self.store.delete_block(&b.hash())?; self.store.delete_block_marker(&b.hash())?; self.store.delete_block_sums(&b.hash())?; @@ -688,6 +692,7 @@ impl Chain { Err(e) => return Err(From::from(e)), } } + debug!(LOGGER, "Compaction removed {} blocks, done.", count); Ok(()) } diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index 2a549671a..cbb74a286 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -20,11 +20,12 @@ use std::net::SocketAddr; use std::ops::Deref; use std::sync::{Arc, RwLock, Weak}; use std::sync::atomic::{AtomicBool, Ordering}; +use std::thread; use std::time::Instant; use rand; use rand::Rng; -use chain::{self, ChainAdapter, Options}; +use chain::{self, ChainAdapter, Options, Tip}; use core::core; use core::core::block::BlockHeader; use core::core::hash::{Hash, Hashed}; @@ -53,6 +54,7 @@ fn wo(weak_one: &OneTime>) -> Arc { /// implementations. pub struct NetToChainAdapter { currently_syncing: Arc, + archive_mode: bool, chain: Weak, tx_pool: Arc>>, peers: OneTime>, @@ -354,16 +356,18 @@ impl NetToChainAdapter { /// Construct a new NetToChainAdapter instance pub fn new( currently_syncing: Arc, + archive_mode: bool, chain_ref: Weak, tx_pool: Arc>>, config: ServerConfig, ) -> NetToChainAdapter { NetToChainAdapter { - currently_syncing: currently_syncing, + currently_syncing, + archive_mode, chain: chain_ref, - tx_pool: tx_pool, + tx_pool, peers: OneTime::new(), - config: config, + config, } } @@ -415,8 +419,9 @@ impl NetToChainAdapter { let bhash = b.hash(); let chain = w(&self.chain); match chain.process_block(b, self.chain_opts()) { - Ok(_) => { + Ok((tip, _)) => { self.validate_chain(bhash); + self.check_compact(tip); true } Err(chain::Error::Orphan) => { @@ -480,6 +485,28 @@ impl NetToChainAdapter { } } + fn check_compact(&self, tip_res: Option) { + // no compaction during sync or if we're in historical mode + if self.archive_mode || self.currently_syncing.load(Ordering::Relaxed) { + return; + } + + if let Some(tip) = tip_res { + // trigger compaction every 2000 blocks, uses a different thread to avoid + // blocking the caller thread (likely a peer) + if tip.height % 2000 == 0 { + let chain = w(&self.chain); + let _ = thread::Builder::new() + .name("compactor".to_string()) + .spawn(move || { + if let Err(e) = chain.compact() { + error!(LOGGER, "Could not compact chain: {:?}", e); + } + }); + } + } + } + // After receiving a compact block if we cannot successfully hydrate // it into a full block then fallback to requesting the full block // from the same peer that gave us the compact block diff --git a/servers/src/grin/server.rs b/servers/src/grin/server.rs index 813970f54..45fc07196 100644 --- a/servers/src/grin/server.rs +++ b/servers/src/grin/server.rs @@ -146,6 +146,7 @@ impl Server { let net_adapter = Arc::new(NetToChainAdapter::new( currently_syncing.clone(), + archive_mode, Arc::downgrade(&shared_chain), tx_pool.clone(), config.clone(),