diff --git a/chain/src/chain.rs b/chain/src/chain.rs index a9e115924..4ccc0fd89 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -32,7 +32,7 @@ use crate::types::{ BlockStatus, ChainAdapter, NoStatus, Options, Tip, TxHashSetRoots, TxHashsetWriteStatus, }; use crate::util::secp::pedersen::{Commitment, RangeProof}; -use crate::util::RwLock; +use crate::util::{Mutex, RwLock, StopState}; use grin_store::Error::NotFoundErr; use std::collections::HashMap; use std::fs::File; @@ -149,6 +149,7 @@ pub struct Chain { // POW verification function pow_verifier: fn(&BlockHeader) -> Result<(), pow::Error>, archive_mode: bool, + stop_state: Arc>, genesis: BlockHeader, } @@ -164,59 +165,66 @@ impl Chain { pow_verifier: fn(&BlockHeader) -> Result<(), pow::Error>, verifier_cache: Arc>, archive_mode: bool, + stop_state: Arc>, ) -> Result { - let chain_store = store::ChainStore::new(db_env)?; + // Note: We take a lock on the stop_state here and do not release it until + // we have finished chain initialization. + let stop_state_local = stop_state.clone(); + let stop_lock = stop_state_local.lock(); + if stop_lock.is_stopped() { + return Err(ErrorKind::Stopped.into()); + } - let store = Arc::new(chain_store); + let store = Arc::new(store::ChainStore::new(db_env)?); // open the txhashset, creating a new one if necessary let mut txhashset = txhashset::TxHashSet::open(db_root.clone(), store.clone(), None)?; - setup_head(genesis.clone(), store.clone(), &mut txhashset)?; - - { - let head = store.head()?; - debug!( - "init: head: {} @ {} [{}]", - head.total_difficulty.to_num(), - head.height, - head.last_block_h, - ); - } - - { - let header_head = store.header_head()?; - debug!( - "init: header_head: {} @ {} [{}]", - header_head.total_difficulty.to_num(), - header_head.height, - header_head.last_block_h, - ); - } - - { - let sync_head = store.get_sync_head()?; - debug!( - "init: sync_head: {} @ {} [{}]", - sync_head.total_difficulty.to_num(), - sync_head.height, - sync_head.last_block_h, - ); - } + setup_head(&genesis, &store, &mut txhashset)?; + Chain::log_heads(&store)?; Ok(Chain { - db_root: db_root, - store: store, - adapter: adapter, + db_root, + store, + adapter, orphans: Arc::new(OrphanBlockPool::new()), txhashset: Arc::new(RwLock::new(txhashset)), pow_verifier, verifier_cache, archive_mode, + stop_state, genesis: genesis.header.clone(), }) } + fn log_heads(store: &store::ChainStore) -> Result<(), Error> { + let head = store.head()?; + debug!( + "init: head: {} @ {} [{}]", + head.total_difficulty.to_num(), + head.height, + head.last_block_h, + ); + + let header_head = store.header_head()?; + debug!( + "init: header_head: {} @ {} [{}]", + header_head.total_difficulty.to_num(), + header_head.height, + header_head.last_block_h, + ); + + let sync_head = store.get_sync_head()?; + debug!( + "init: sync_head: {} @ {} [{}]", + sync_head.total_difficulty.to_num(), + sync_head.height, + sync_head.last_block_h, + ); + + Ok(()) + } + /// Processes a single block, then checks for orphans, processing /// those as well if they're found pub fn process_block(&self, b: Block, opts: Options) -> Result, Error> { @@ -251,6 +259,15 @@ impl Chain { /// or false if it has added to a fork (or orphan?). fn process_block_single(&self, b: Block, opts: Options) -> Result, Error> { let (maybe_new_head, prev_head) = { + // Note: We take a lock on the stop_state here and do not release it until + // we have finished processing this single block. + // We take care to write both the txhashset *and* the batch while we + // have the stop_state lock. + let stop_lock = self.stop_state.lock(); + if stop_lock.is_stopped() { + return Err(ErrorKind::Stopped.into()); + } + let mut txhashset = self.txhashset.write(); let batch = self.store.batch()?; let mut ctx = self.new_ctx(opts, batch, &mut txhashset)?; @@ -258,6 +275,11 @@ impl Chain { let prev_head = ctx.batch.head()?; let maybe_new_head = pipe::process_block(&b, &mut ctx); + + // We have flushed txhashset extension changes to disk + // but not yet committed the batch. + // A node shutdown at this point can be catastrophic... + // We prevent this via the stop_lock (see above). if let Ok(_) = maybe_new_head { ctx.batch.commit()?; } @@ -322,11 +344,12 @@ impl Chain { /// Process a block header received during "header first" propagation. pub fn process_block_header(&self, bh: &BlockHeader, opts: Options) -> Result<(), Error> { + // We take a write lock on the txhashset and create a new batch + // but this is strictly readonly so we do not commit the batch. let mut txhashset = self.txhashset.write(); let batch = self.store.batch()?; let mut ctx = self.new_ctx(opts, batch, &mut txhashset)?; pipe::process_block_header(bh, &mut ctx)?; - ctx.batch.commit()?; Ok(()) } @@ -334,6 +357,15 @@ impl Chain { /// This is only ever used during sync and is based on sync_head. /// We update header_head here if our total work increases. pub fn sync_block_headers(&self, headers: &[BlockHeader], opts: Options) -> Result<(), Error> { + // Note: We take a lock on the stop_state here and do not release it until + // we have finished processing this single block. + // We take care to write both the txhashset *and* the batch while we + // have the stop_state lock. + let stop_lock = self.stop_state.lock(); + if stop_lock.is_stopped() { + return Err(ErrorKind::Stopped.into()); + } + let mut txhashset = self.txhashset.write(); let batch = self.store.batch()?; let mut ctx = self.new_ctx(opts, batch, &mut txhashset)?; @@ -865,6 +897,13 @@ impl Chain { fn compact_txhashset(&self) -> Result<(), Error> { debug!("Starting blockchain compaction."); { + // Note: We take a lock on the stop_state here and do not release it until + // we have finished processing this compaction operation. + let stop_lock = self.stop_state.lock(); + if stop_lock.is_stopped() { + return Err(ErrorKind::Stopped.into()); + } + let mut txhashset = self.txhashset.write(); txhashset.compact()?; txhashset::extending_readonly(&mut txhashset, |extension| { @@ -1145,8 +1184,8 @@ impl Chain { } fn setup_head( - genesis: Block, - store: Arc, + genesis: &Block, + store: &store::ChainStore, txhashset: &mut txhashset::TxHashSet, ) -> Result<(), Error> { let mut batch = store.batch()?; @@ -1240,10 +1279,8 @@ fn setup_head( let tip = Tip::from_header(&genesis.header); batch.save_head(&tip)?; - batch.save_block_header(&genesis.header)?; - if genesis.kernels().len() > 0 { - let (utxo_sum, kernel_sum) = (sums, &genesis as &Committed).verify_kernel_sums( + let (utxo_sum, kernel_sum) = (sums, genesis as &Committed).verify_kernel_sums( genesis.header.overage(), genesis.header.total_kernel_offset(), )?; diff --git a/chain/src/error.rs b/chain/src/error.rs index 1ef741280..99f570e2c 100644 --- a/chain/src/error.rs +++ b/chain/src/error.rs @@ -128,6 +128,9 @@ pub enum ErrorKind { /// Error from summing and verifying kernel sums via committed trait. #[fail(display = "Committed Trait: Error summing and verifying kernel sums")] Committed(committed::Error), + /// We cannot process data once the Grin server has been stopped. + #[fail(display = "Stopped (Grin Shutting Down)")] + Stopped, } impl Display for Error { diff --git a/chain/src/store.rs b/chain/src/store.rs index 0de164326..a3ed032df 100644 --- a/chain/src/store.rs +++ b/chain/src/store.rs @@ -199,8 +199,8 @@ impl<'a> Batch<'a> { /// Save the block and the associated input bitmap. /// Note: the block header is not saved to the db here, assumes this has already been done. pub fn save_block(&self, b: &Block) -> Result<(), Error> { - // Build the "input bitmap" for this new block and cache it locally. - self.build_and_cache_block_input_bitmap(&b)?; + // Build the "input bitmap" for this new block and store it in the db. + self.build_and_store_block_input_bitmap(&b)?; // Save the block itself to the db. self.db @@ -305,7 +305,7 @@ impl<'a> Batch<'a> { Ok(bitmap) } - fn build_and_cache_block_input_bitmap(&self, block: &Block) -> Result { + fn build_and_store_block_input_bitmap(&self, block: &Block) -> Result { // Build the bitmap. let bitmap = self.build_block_input_bitmap(block)?; @@ -326,7 +326,7 @@ impl<'a> Batch<'a> { } else { match self.get_block(bh) { Ok(block) => { - let bitmap = self.build_and_cache_block_input_bitmap(&block)?; + let bitmap = self.build_and_store_block_input_bitmap(&block)?; Ok(bitmap) } Err(e) => Err(e), diff --git a/chain/tests/data_file_integrity.rs b/chain/tests/data_file_integrity.rs index 93c45451e..fdd10273c 100644 --- a/chain/tests/data_file_integrity.rs +++ b/chain/tests/data_file_integrity.rs @@ -21,7 +21,7 @@ use self::core::libtx; use self::core::pow::{self, Difficulty}; use self::core::{consensus, genesis}; use self::keychain::{ExtKeychain, ExtKeychainPath, Keychain}; -use self::util::RwLock; +use self::util::{Mutex, RwLock, StopState}; use chrono::Duration; use grin_chain as chain; use grin_core as core; @@ -50,6 +50,7 @@ fn setup(dir_name: &str) -> Chain { pow::verify_size, verifier_cache, false, + Arc::new(Mutex::new(StopState::new())), ) .unwrap() } @@ -65,6 +66,7 @@ fn reload_chain(dir_name: &str) -> Chain { pow::verify_size, verifier_cache, false, + Arc::new(Mutex::new(StopState::new())), ) .unwrap() } diff --git a/chain/tests/mine_simple_chain.rs b/chain/tests/mine_simple_chain.rs index 18325847c..9ab310bf5 100644 --- a/chain/tests/mine_simple_chain.rs +++ b/chain/tests/mine_simple_chain.rs @@ -23,7 +23,7 @@ use self::core::libtx::{self, build, reward}; use self::core::pow::Difficulty; use self::core::{consensus, global, pow}; use self::keychain::{ExtKeychain, ExtKeychainPath, Keychain}; -use self::util::RwLock; +use self::util::{Mutex, RwLock, StopState}; use chrono::Duration; use grin_chain as chain; use grin_core as core; @@ -50,6 +50,7 @@ fn setup(dir_name: &str, genesis: Block) -> Chain { pow::verify_size, verifier_cache, false, + Arc::new(Mutex::new(StopState::new())), ) .unwrap() } @@ -541,6 +542,7 @@ fn actual_diff_iter_output() { pow::verify_size, verifier_cache, false, + Arc::new(Mutex::new(StopState::new())), ) .unwrap(); let iter = chain.difficulty_iter(); diff --git a/chain/tests/test_coinbase_maturity.rs b/chain/tests/test_coinbase_maturity.rs index ae60ae6c8..14d948a49 100644 --- a/chain/tests/test_coinbase_maturity.rs +++ b/chain/tests/test_coinbase_maturity.rs @@ -21,7 +21,7 @@ use self::core::libtx::{self, build}; use self::core::pow::Difficulty; use self::core::{consensus, pow}; use self::keychain::{ExtKeychain, ExtKeychainPath, Keychain}; -use self::util::RwLock; +use self::util::{Mutex, RwLock, StopState}; use chrono::Duration; use env_logger; use grin_chain as chain; @@ -55,6 +55,7 @@ fn test_coinbase_maturity() { pow::verify_size, verifier_cache, false, + Arc::new(Mutex::new(StopState::new())), ) .unwrap(); diff --git a/p2p/src/serv.rs b/p2p/src/serv.rs index f52251025..75bb3a701 100644 --- a/p2p/src/serv.rs +++ b/p2p/src/serv.rs @@ -14,7 +14,6 @@ use std::fs::File; use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream}; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; use std::{io, thread}; @@ -29,6 +28,7 @@ use crate::peer::Peer; use crate::peers::Peers; use crate::store::PeerStore; use crate::types::{Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, TxHashSetRead}; +use crate::util::{Mutex, StopState}; use chrono::prelude::{DateTime, Utc}; /// P2P server implementation, handling bootstrapping to find and connect to @@ -38,8 +38,7 @@ pub struct Server { capabilities: Capabilities, handshake: Arc, pub peers: Arc, - stop: Arc, - pause: Arc, + stop_state: Arc>, } // TODO TLS @@ -51,16 +50,14 @@ impl Server { config: P2PConfig, adapter: Arc, genesis: Hash, - stop: Arc, - pause: Arc, + stop_state: Arc>, ) -> Result { Ok(Server { config: config.clone(), capabilities: capab, handshake: Arc::new(Handshake::new(genesis, config.clone())), peers: Arc::new(Peers::new(PeerStore::new(db_env)?, adapter, config)), - stop, - pause, + stop_state, }) } @@ -75,7 +72,7 @@ impl Server { let sleep_time = Duration::from_millis(1); loop { // Pause peer ingress connection request. Only for tests. - if self.pause.load(Ordering::Relaxed) { + if self.stop_state.lock().is_paused() { thread::sleep(Duration::from_secs(1)); continue; } @@ -95,7 +92,7 @@ impl Server { warn!("Couldn't establish new client connection: {:?}", e); } } - if self.stop.load(Ordering::Relaxed) { + if self.stop_state.lock().is_stopped() { break; } thread::sleep(sleep_time); @@ -194,7 +191,7 @@ impl Server { } pub fn stop(&self) { - self.stop.store(true, Ordering::Relaxed); + self.stop_state.lock().stop(); self.peers.stop(); } diff --git a/p2p/tests/peer_handshake.rs b/p2p/tests/peer_handshake.rs index e0874a2a4..ae229f473 100644 --- a/p2p/tests/peer_handshake.rs +++ b/p2p/tests/peer_handshake.rs @@ -17,9 +17,9 @@ use grin_p2p as p2p; use grin_store as store; use grin_util as util; +use grin_util::{Mutex, StopState}; use std::net::{SocketAddr, TcpListener, TcpStream}; -use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::{thread, time}; @@ -57,8 +57,7 @@ fn peer_handshake() { p2p_config.clone(), net_adapter.clone(), Hash::from_vec(&vec![]), - Arc::new(AtomicBool::new(false)), - Arc::new(AtomicBool::new(false)), + Arc::new(Mutex::new(StopState::new())), ) .unwrap(), ); diff --git a/pool/tests/block_building.rs b/pool/tests/block_building.rs index ad7b071be..f31824a41 100644 --- a/pool/tests/block_building.rs +++ b/pool/tests/block_building.rs @@ -24,7 +24,6 @@ use self::util::RwLock; use crate::common::*; use grin_core as core; use grin_keychain as keychain; -use grin_pool as pool; use grin_util as util; use std::sync::Arc; diff --git a/pool/tests/block_reconciliation.rs b/pool/tests/block_reconciliation.rs index 3689f152c..b9a324f80 100644 --- a/pool/tests/block_reconciliation.rs +++ b/pool/tests/block_reconciliation.rs @@ -25,7 +25,6 @@ use crate::common::ChainAdapter; use crate::common::*; use grin_core as core; use grin_keychain as keychain; -use grin_pool as pool; use grin_util as util; use std::sync::Arc; diff --git a/pool/tests/common.rs b/pool/tests/common.rs index 4329eda0c..b4bd83554 100644 --- a/pool/tests/common.rs +++ b/pool/tests/common.rs @@ -25,7 +25,6 @@ use self::pool::types::*; use self::pool::TransactionPool; use self::util::secp::pedersen::Commitment; use self::util::RwLock; -use crate::pool::types::*; use grin_chain as chain; use grin_core as core; use grin_keychain as keychain; diff --git a/pool/tests/transaction_pool.rs b/pool/tests/transaction_pool.rs index 7a3d4c552..20799dfdf 100644 --- a/pool/tests/transaction_pool.rs +++ b/pool/tests/transaction_pool.rs @@ -23,7 +23,6 @@ use self::util::RwLock; use crate::common::*; use grin_core as core; use grin_keychain as keychain; -use grin_pool as pool; use grin_util as util; use std::sync::Arc; diff --git a/servers/src/grin/dandelion_monitor.rs b/servers/src/grin/dandelion_monitor.rs index fa8e90527..c859e9758 100644 --- a/servers/src/grin/dandelion_monitor.rs +++ b/servers/src/grin/dandelion_monitor.rs @@ -12,10 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::util::RwLock; +use crate::util::{Mutex, RwLock, StopState}; use chrono::prelude::Utc; use rand::{thread_rng, Rng}; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::thread; use std::time::Duration; @@ -37,7 +36,7 @@ pub fn monitor_transactions( dandelion_config: DandelionConfig, tx_pool: Arc>, verifier_cache: Arc>, - stop: Arc, + stop_state: Arc>, ) { debug!("Started Dandelion transaction monitor."); @@ -45,7 +44,7 @@ pub fn monitor_transactions( .name("dandelion".to_string()) .spawn(move || { loop { - if stop.load(Ordering::Relaxed) { + if stop_state.lock().is_stopped() { break; } diff --git a/servers/src/grin/seed.rs b/servers/src/grin/seed.rs index c8f8103ac..3e5a39117 100644 --- a/servers/src/grin/seed.rs +++ b/servers/src/grin/seed.rs @@ -21,13 +21,13 @@ use chrono::prelude::Utc; use chrono::{Duration, MIN_DATE}; use rand::{thread_rng, Rng}; use std::net::{SocketAddr, ToSocketAddrs}; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{mpsc, Arc}; use std::{cmp, io, str, thread, time}; use crate::p2p; use crate::p2p::ChainAdapter; use crate::pool::DandelionConfig; +use crate::util::{Mutex, StopState}; // DNS Seeds with contact email associated const DNS_SEEDS: &'static [&'static str] = &[ @@ -40,8 +40,7 @@ pub fn connect_and_monitor( dandelion_config: DandelionConfig, seed_list: Box Vec + Send>, preferred_peers: Option>, - stop: Arc, - pause: Arc, + stop_state: Arc>, ) { let _ = thread::Builder::new() .name("seed".to_string()) @@ -65,9 +64,13 @@ pub fn connect_and_monitor( let mut prev_ping = Utc::now(); let mut start_attempt = 0; - while !stop.load(Ordering::Relaxed) { + loop { + if stop_state.lock().is_stopped() { + break; + } + // Pause egress peer connection request. Only for tests. - if pause.load(Ordering::Relaxed) { + if stop_state.lock().is_paused() { thread::sleep(time::Duration::from_secs(1)); continue; } diff --git a/servers/src/grin/server.rs b/servers/src/grin/server.rs index 965aacf5c..df798633a 100644 --- a/servers/src/grin/server.rs +++ b/servers/src/grin/server.rs @@ -16,7 +16,6 @@ //! the peer-to-peer server, the blockchain and the transaction pool) and acts //! as a facade. -use crate::util::RwLock; use std::net::SocketAddr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -38,6 +37,7 @@ use crate::p2p; use crate::pool; use crate::store; use crate::util::file::get_first_line; +use crate::util::{Mutex, RwLock, StopState}; /// Grin server holding internal structures. pub struct Server { @@ -57,9 +57,7 @@ pub struct Server { /// To be passed around to collect stats and info state_info: ServerStateInfo, /// Stop flag - pub stop: Arc, - /// Pause flag - pub pause: Arc, + pub stop_state: Arc>, } impl Server { @@ -90,14 +88,14 @@ impl Server { if let Some(s) = enable_test_miner { if s { - serv.start_test_miner(test_miner_wallet_url, serv.stop.clone()); + serv.start_test_miner(test_miner_wallet_url, serv.stop_state.clone()); } } info_callback(serv.clone()); loop { thread::sleep(time::Duration::from_secs(1)); - if serv.stop.load(Ordering::Relaxed) { + if serv.stop_state.lock().is_stopped() { return Ok(()); } } @@ -112,8 +110,7 @@ impl Server { Some(b) => b, }; - let stop = Arc::new(AtomicBool::new(false)); - let pause = Arc::new(AtomicBool::new(false)); + let stop_state = Arc::new(Mutex::new(StopState::new())); // Shared cache for verification results. // We cache rangeproof verification and kernel signature verification. @@ -156,6 +153,7 @@ impl Server { pow::verify_size, verifier_cache.clone(), archive_mode, + stop_state.clone(), )?); pool_adapter.set_chain(shared_chain.clone()); @@ -175,8 +173,7 @@ impl Server { config.p2p_config.clone(), net_adapter.clone(), genesis.hash(), - stop.clone(), - pause.clone(), + stop_state.clone(), )?); chain_adapter.init(p2p_server.peers.clone()); pool_net_adapter.init(p2p_server.peers.clone()); @@ -206,8 +203,7 @@ impl Server { config.dandelion_config.clone(), seeder, peers_preferred, - stop.clone(), - pause.clone(), + stop_state.clone(), ); } @@ -220,7 +216,7 @@ impl Server { sync_state.clone(), p2p_server.peers.clone(), shared_chain.clone(), - stop.clone(), + stop_state.clone(), ); let p2p_inner = p2p_server.clone(); @@ -244,7 +240,7 @@ impl Server { config.dandelion_config.clone(), tx_pool.clone(), verifier_cache.clone(), - stop.clone(), + stop_state.clone(), ); warn!("Grin server started."); @@ -258,8 +254,7 @@ impl Server { state_info: ServerStateInfo { ..Default::default() }, - stop, - pause, + stop_state, }) } @@ -305,7 +300,11 @@ impl Server { /// Start mining for blocks internally on a separate thread. Relies on /// internal miner, and should only be used for automated testing. Burns /// reward if wallet_listener_url is 'None' - pub fn start_test_miner(&self, wallet_listener_url: Option, stop: Arc) { + pub fn start_test_miner( + &self, + wallet_listener_url: Option, + stop_state: Arc>, + ) { info!("start_test_miner - start",); let sync_state = self.sync_state.clone(); let config_wallet_url = match wallet_listener_url.clone() { @@ -327,7 +326,7 @@ impl Server { self.chain.clone(), self.tx_pool.clone(), self.verifier_cache.clone(), - stop, + stop_state, ); miner.set_debug_output_id(format!("Port {}", self.config.p2p_config.port)); let _ = thread::Builder::new() @@ -429,24 +428,25 @@ impl Server { /// Stop the server. pub fn stop(&self) { self.p2p.stop(); - self.stop.store(true, Ordering::Relaxed); + self.stop_state.lock().stop(); } /// Pause the p2p server. pub fn pause(&self) { - self.pause.store(true, Ordering::Relaxed); + self.stop_state.lock().pause(); thread::sleep(time::Duration::from_secs(1)); self.p2p.pause(); } - /// Resume the p2p server. + /// Resume p2p server. + /// TODO - We appear not to resume the p2p server (peer connections) here? pub fn resume(&self) { - self.pause.store(false, Ordering::Relaxed); + self.stop_state.lock().resume(); } /// Stops the test miner without stopping the p2p layer - pub fn stop_test_miner(&self, stop: Arc) { - stop.store(true, Ordering::Relaxed); + pub fn stop_test_miner(&self, stop: Arc>) { + stop.lock().stop(); info!("stop_test_miner - stop",); } } diff --git a/servers/src/grin/sync/syncer.rs b/servers/src/grin/sync/syncer.rs index 62eeb1903..4caa03071 100644 --- a/servers/src/grin/sync/syncer.rs +++ b/servers/src/grin/sync/syncer.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::thread; use std::time; @@ -24,17 +23,18 @@ use crate::grin::sync::body_sync::BodySync; use crate::grin::sync::header_sync::HeaderSync; use crate::grin::sync::state_sync::StateSync; use crate::p2p; +use crate::util::{Mutex, StopState}; pub fn run_sync( sync_state: Arc, peers: Arc, chain: Arc, - stop: Arc, + stop_state: Arc>, ) { let _ = thread::Builder::new() .name("sync".to_string()) .spawn(move || { - let runner = SyncRunner::new(sync_state, peers, chain, stop); + let runner = SyncRunner::new(sync_state, peers, chain, stop_state); runner.sync_loop(); }); } @@ -43,7 +43,7 @@ pub struct SyncRunner { sync_state: Arc, peers: Arc, chain: Arc, - stop: Arc, + stop_state: Arc>, } impl SyncRunner { @@ -51,13 +51,13 @@ impl SyncRunner { sync_state: Arc, peers: Arc, chain: Arc, - stop: Arc, + stop_state: Arc>, ) -> SyncRunner { SyncRunner { sync_state, peers, chain, - stop, + stop_state, } } @@ -121,7 +121,11 @@ impl SyncRunner { let mut highest_height = 0; // Main syncing loop - while !self.stop.load(Ordering::Relaxed) { + loop { + if self.stop_state.lock().is_stopped() { + break; + } + thread::sleep(time::Duration::from_millis(10)); // check whether syncing is generally needed, when we compare our state with others diff --git a/servers/src/mining/test_miner.rs b/servers/src/mining/test_miner.rs index 45774cc17..d671855ea 100644 --- a/servers/src/mining/test_miner.rs +++ b/servers/src/mining/test_miner.rs @@ -19,7 +19,6 @@ use crate::util::RwLock; use chrono::prelude::Utc; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use crate::chain; @@ -30,13 +29,14 @@ use crate::core::core::{Block, BlockHeader}; use crate::core::global; use crate::mining::mine_block; use crate::pool; +use crate::util::{Mutex, StopState}; pub struct Miner { config: StratumServerConfig, chain: Arc, tx_pool: Arc>, verifier_cache: Arc>, - stop: Arc, + stop_state: Arc>, // Just to hold the port we're on, so this miner can be identified // while watching debug output @@ -51,7 +51,7 @@ impl Miner { chain: Arc, tx_pool: Arc>, verifier_cache: Arc>, - stop: Arc, + stop_state: Arc>, ) -> Miner { Miner { config, @@ -59,7 +59,7 @@ impl Miner { tx_pool, verifier_cache, debug_output_id: String::from("none"), - stop, + stop_state, } } @@ -135,7 +135,11 @@ impl Miner { // nothing has changed. We only want to create a new key_id for each new block. let mut key_id = None; - while !self.stop.load(Ordering::Relaxed) { + loop { + if self.stop_state.lock().is_stopped() { + break; + } + trace!("in miner loop. key_id: {:?}", key_id); // get the latest chain state and build a block on top of it diff --git a/servers/tests/framework.rs b/servers/tests/framework.rs index ff9fee94a..8b645cb42 100644 --- a/servers/tests/framework.rs +++ b/servers/tests/framework.rs @@ -222,7 +222,7 @@ impl LocalServerContainer { "starting test Miner on port {}", self.config.p2p_server_port ); - s.start_test_miner(wallet_url, s.stop.clone()); + s.start_test_miner(wallet_url, s.stop_state.clone()); } for p in &mut self.peer_list { diff --git a/servers/tests/simulnet.rs b/servers/tests/simulnet.rs index 7a3370e2b..a91157221 100644 --- a/servers/tests/simulnet.rs +++ b/servers/tests/simulnet.rs @@ -19,7 +19,7 @@ mod framework; use self::core::core::hash::Hashed; use self::core::global::{self, ChainTypes}; -use self::util::Mutex; +use self::util::{Mutex, StopState}; use self::wallet::controller; use self::wallet::libwallet::types::{WalletBackend, WalletInst}; use self::wallet::lmdb_wallet::LMDBBackend; @@ -35,7 +35,6 @@ use grin_wallet as wallet; use std::cmp; use std::default::Default; use std::process::exit; -use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::{thread, time}; @@ -219,7 +218,7 @@ fn simulate_block_propagation() { } // start mining - let stop = Arc::new(AtomicBool::new(false)); + let stop = Arc::new(Mutex::new(StopState::new())); servers[0].start_test_miner(None, stop.clone()); // monitor for a change of head on a different server and check whether @@ -272,7 +271,7 @@ fn simulate_full_sync() { let s1 = servers::Server::new(framework::config(1000, "grin-sync", 1000)).unwrap(); // mine a few blocks on server 1 - let stop = Arc::new(AtomicBool::new(false)); + let stop = Arc::new(Mutex::new(StopState::new())); s1.start_test_miner(None, stop.clone()); thread::sleep(time::Duration::from_secs(8)); s1.stop_test_miner(stop); @@ -328,7 +327,7 @@ fn simulate_fast_sync() { // start s1 and mine enough blocks to get beyond the fast sync horizon let s1 = servers::Server::new(framework::config(2000, "grin-fast", 2000)).unwrap(); - let stop = Arc::new(AtomicBool::new(false)); + let stop = Arc::new(Mutex::new(StopState::new())); s1.start_test_miner(None, stop.clone()); while s1.head().height < 20 { @@ -460,7 +459,7 @@ fn long_fork_test_preparation() -> Vec { let s0 = servers::Server::new(conf).unwrap(); thread::sleep(time::Duration::from_millis(1_000)); s.push(s0); - let stop = Arc::new(AtomicBool::new(false)); + let stop = Arc::new(Mutex::new(StopState::new())); s[0].start_test_miner(None, stop.clone()); while s[0].head().height < global::cut_through_horizon() as u64 + 10 { @@ -545,7 +544,7 @@ fn long_fork_test_mining(blocks: u64, n: u16, s: &servers::Server) { let sn_header = s.chain.head().unwrap(); // Mining - let stop = Arc::new(AtomicBool::new(false)); + let stop = Arc::new(Mutex::new(StopState::new())); s.start_test_miner(None, stop.clone()); while s.head().height < sn_header.height + blocks { @@ -928,7 +927,7 @@ fn replicate_tx_fluff_failure() { s1_config.dandelion_config.relay_secs = Some(1); let s1 = servers::Server::new(s1_config.clone()).unwrap(); // Mine off of server 1 - s1.start_test_miner(s1_config.test_miner_wallet_url, s1.stop.clone()); + s1.start_test_miner(s1_config.test_miner_wallet_url, s1.stop_state.clone()); thread::sleep(time::Duration::from_secs(5)); // Server 2 (another node) diff --git a/servers/tests/stratum.rs b/servers/tests/stratum.rs index ced5dc6aa..938dcf5e3 100644 --- a/servers/tests/stratum.rs +++ b/servers/tests/stratum.rs @@ -23,11 +23,11 @@ use bufstream::BufStream; use grin_core as core; use grin_servers as servers; use grin_util as util; +use grin_util::{Mutex, StopState}; use serde_json::Value; use std::io::prelude::{BufRead, Write}; use std::net::TcpStream; use std::process; -use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::{thread, time}; @@ -140,7 +140,7 @@ fn basic_stratum_server() { info!("stratum server and worker stats verification ok"); // Start mining blocks - let stop = Arc::new(AtomicBool::new(false)); + let stop = Arc::new(Mutex::new(StopState::new())); s.start_test_miner(None, stop.clone()); info!("test miner started"); diff --git a/store/src/pmmr.rs b/store/src/pmmr.rs index d2b7aeb99..c21681a58 100644 --- a/store/src/pmmr.rs +++ b/store/src/pmmr.rs @@ -13,7 +13,7 @@ //! Implementation of the persistent Backend for the prunable MMR tree. -use std::{fs, io, marker, time}; +use std::{fs, io, time}; use crate::core::core::hash::{Hash, Hashed}; use crate::core::core::pmmr::{self, family, Backend}; diff --git a/util/src/lib.rs b/util/src/lib.rs index 2725fa835..ba5ed0e0d 100644 --- a/util/src/lib.rs +++ b/util/src/lib.rs @@ -109,3 +109,52 @@ where pub fn to_base64(s: &str) -> String { base64::encode(s) } + +/// Global stopped/paused state shared across various subcomponents of Grin. +/// +/// Arc> allows the chain to lock the stop_state during critical processing. +/// Other subcomponents cannot abruptly shutdown the server during block/header processing. +/// This should prevent the chain ever ending up in an inconsistent state on restart. +/// +/// "Stopped" allows a clean shutdown of the Grin server. +/// "Paused" is used in some tests to allow nodes to reach steady state etc. +/// +pub struct StopState { + stopped: bool, + paused: bool, +} + +impl StopState { + /// Create a new stop_state in default "running" state. + pub fn new() -> StopState { + StopState { + stopped: false, + paused: false, + } + } + + /// Check if we are stopped. + pub fn is_stopped(&self) -> bool { + self.stopped + } + + /// Check if we are paused. + pub fn is_paused(&self) -> bool { + self.paused + } + + /// Stop the server. + pub fn stop(&mut self) { + self.stopped = true; + } + + /// Pause the server (only used in tests). + pub fn pause(&mut self) { + self.paused = true; + } + + /// Resume a paused server (only used in tests). + pub fn resume(&mut self) { + self.paused = false; + } +} diff --git a/wallet/src/test_framework/testclient.rs b/wallet/src/test_framework/testclient.rs index 18723be5b..2af6d8b6e 100644 --- a/wallet/src/test_framework/testclient.rs +++ b/wallet/src/test_framework/testclient.rs @@ -26,7 +26,7 @@ use self::core::{pow, ser}; use self::keychain::Keychain; use self::util::secp::pedersen; use self::util::secp::pedersen::Commitment; -use self::util::{Mutex, RwLock}; +use self::util::{Mutex, RwLock, StopState}; use crate::libwallet::types::*; use crate::{controller, libwallet, WalletCommAdapter, WalletConfig}; use failure::ResultExt; @@ -110,8 +110,8 @@ where pow::verify_size, verifier_cache, false, - ) - .unwrap(); + Arc::new(Mutex::new(StopState::new())), + ).unwrap(); let (tx, rx) = channel(); let retval = WalletProxy { chain_dir: chain_dir.to_owned(),