Clean shutdown via stop_state (#2117)

* clean shutdown wip

* rustfmt

* introduce StopState that we can lock on

* rustfmt

* take lock on stop_state during critical processing (process_block_single etc.)

* rustfmt

* take lock on stop_state during chain::init()

* cleanup

* cleanup

* rustfmt

* docs/comments

* fixup servers tests

* cleanup p2p tests
This commit is contained in:
Antioch Peverell 2018-12-11 11:07:41 +00:00 committed by GitHub
parent ec9cdf0ecd
commit 2d4538c428
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 222 additions and 127 deletions

View file

@ -32,7 +32,7 @@ use crate::types::{
BlockStatus, ChainAdapter, NoStatus, Options, Tip, TxHashSetRoots, TxHashsetWriteStatus, BlockStatus, ChainAdapter, NoStatus, Options, Tip, TxHashSetRoots, TxHashsetWriteStatus,
}; };
use crate::util::secp::pedersen::{Commitment, RangeProof}; use crate::util::secp::pedersen::{Commitment, RangeProof};
use crate::util::RwLock; use crate::util::{Mutex, RwLock, StopState};
use grin_store::Error::NotFoundErr; use grin_store::Error::NotFoundErr;
use std::collections::HashMap; use std::collections::HashMap;
use std::fs::File; use std::fs::File;
@ -149,6 +149,7 @@ pub struct Chain {
// POW verification function // POW verification function
pow_verifier: fn(&BlockHeader) -> Result<(), pow::Error>, pow_verifier: fn(&BlockHeader) -> Result<(), pow::Error>,
archive_mode: bool, archive_mode: bool,
stop_state: Arc<Mutex<StopState>>,
genesis: BlockHeader, genesis: BlockHeader,
} }
@ -164,17 +165,39 @@ impl Chain {
pow_verifier: fn(&BlockHeader) -> Result<(), pow::Error>, pow_verifier: fn(&BlockHeader) -> Result<(), pow::Error>,
verifier_cache: Arc<RwLock<dyn VerifierCache>>, verifier_cache: Arc<RwLock<dyn VerifierCache>>,
archive_mode: bool, archive_mode: bool,
stop_state: Arc<Mutex<StopState>>,
) -> Result<Chain, Error> { ) -> Result<Chain, Error> {
let chain_store = store::ChainStore::new(db_env)?; // Note: We take a lock on the stop_state here and do not release it until
// we have finished chain initialization.
let stop_state_local = stop_state.clone();
let stop_lock = stop_state_local.lock();
if stop_lock.is_stopped() {
return Err(ErrorKind::Stopped.into());
}
let store = Arc::new(chain_store); let store = Arc::new(store::ChainStore::new(db_env)?);
// open the txhashset, creating a new one if necessary // open the txhashset, creating a new one if necessary
let mut txhashset = txhashset::TxHashSet::open(db_root.clone(), store.clone(), None)?; let mut txhashset = txhashset::TxHashSet::open(db_root.clone(), store.clone(), None)?;
setup_head(genesis.clone(), store.clone(), &mut txhashset)?; setup_head(&genesis, &store, &mut txhashset)?;
Chain::log_heads(&store)?;
{ Ok(Chain {
db_root,
store,
adapter,
orphans: Arc::new(OrphanBlockPool::new()),
txhashset: Arc::new(RwLock::new(txhashset)),
pow_verifier,
verifier_cache,
archive_mode,
stop_state,
genesis: genesis.header.clone(),
})
}
fn log_heads(store: &store::ChainStore) -> Result<(), Error> {
let head = store.head()?; let head = store.head()?;
debug!( debug!(
"init: head: {} @ {} [{}]", "init: head: {} @ {} [{}]",
@ -182,9 +205,7 @@ impl Chain {
head.height, head.height,
head.last_block_h, head.last_block_h,
); );
}
{
let header_head = store.header_head()?; let header_head = store.header_head()?;
debug!( debug!(
"init: header_head: {} @ {} [{}]", "init: header_head: {} @ {} [{}]",
@ -192,9 +213,7 @@ impl Chain {
header_head.height, header_head.height,
header_head.last_block_h, header_head.last_block_h,
); );
}
{
let sync_head = store.get_sync_head()?; let sync_head = store.get_sync_head()?;
debug!( debug!(
"init: sync_head: {} @ {} [{}]", "init: sync_head: {} @ {} [{}]",
@ -202,19 +221,8 @@ impl Chain {
sync_head.height, sync_head.height,
sync_head.last_block_h, sync_head.last_block_h,
); );
}
Ok(Chain { Ok(())
db_root: db_root,
store: store,
adapter: adapter,
orphans: Arc::new(OrphanBlockPool::new()),
txhashset: Arc::new(RwLock::new(txhashset)),
pow_verifier,
verifier_cache,
archive_mode,
genesis: genesis.header.clone(),
})
} }
/// Processes a single block, then checks for orphans, processing /// Processes a single block, then checks for orphans, processing
@ -251,6 +259,15 @@ impl Chain {
/// or false if it has added to a fork (or orphan?). /// or false if it has added to a fork (or orphan?).
fn process_block_single(&self, b: Block, opts: Options) -> Result<Option<Tip>, Error> { fn process_block_single(&self, b: Block, opts: Options) -> Result<Option<Tip>, Error> {
let (maybe_new_head, prev_head) = { let (maybe_new_head, prev_head) = {
// Note: We take a lock on the stop_state here and do not release it until
// we have finished processing this single block.
// We take care to write both the txhashset *and* the batch while we
// have the stop_state lock.
let stop_lock = self.stop_state.lock();
if stop_lock.is_stopped() {
return Err(ErrorKind::Stopped.into());
}
let mut txhashset = self.txhashset.write(); let mut txhashset = self.txhashset.write();
let batch = self.store.batch()?; let batch = self.store.batch()?;
let mut ctx = self.new_ctx(opts, batch, &mut txhashset)?; let mut ctx = self.new_ctx(opts, batch, &mut txhashset)?;
@ -258,6 +275,11 @@ impl Chain {
let prev_head = ctx.batch.head()?; let prev_head = ctx.batch.head()?;
let maybe_new_head = pipe::process_block(&b, &mut ctx); let maybe_new_head = pipe::process_block(&b, &mut ctx);
// We have flushed txhashset extension changes to disk
// but not yet committed the batch.
// A node shutdown at this point can be catastrophic...
// We prevent this via the stop_lock (see above).
if let Ok(_) = maybe_new_head { if let Ok(_) = maybe_new_head {
ctx.batch.commit()?; ctx.batch.commit()?;
} }
@ -322,11 +344,12 @@ impl Chain {
/// Process a block header received during "header first" propagation. /// Process a block header received during "header first" propagation.
pub fn process_block_header(&self, bh: &BlockHeader, opts: Options) -> Result<(), Error> { pub fn process_block_header(&self, bh: &BlockHeader, opts: Options) -> Result<(), Error> {
// We take a write lock on the txhashset and create a new batch
// but this is strictly readonly so we do not commit the batch.
let mut txhashset = self.txhashset.write(); let mut txhashset = self.txhashset.write();
let batch = self.store.batch()?; let batch = self.store.batch()?;
let mut ctx = self.new_ctx(opts, batch, &mut txhashset)?; let mut ctx = self.new_ctx(opts, batch, &mut txhashset)?;
pipe::process_block_header(bh, &mut ctx)?; pipe::process_block_header(bh, &mut ctx)?;
ctx.batch.commit()?;
Ok(()) Ok(())
} }
@ -334,6 +357,15 @@ impl Chain {
/// This is only ever used during sync and is based on sync_head. /// This is only ever used during sync and is based on sync_head.
/// We update header_head here if our total work increases. /// We update header_head here if our total work increases.
pub fn sync_block_headers(&self, headers: &[BlockHeader], opts: Options) -> Result<(), Error> { pub fn sync_block_headers(&self, headers: &[BlockHeader], opts: Options) -> Result<(), Error> {
// Note: We take a lock on the stop_state here and do not release it until
// we have finished processing this single block.
// We take care to write both the txhashset *and* the batch while we
// have the stop_state lock.
let stop_lock = self.stop_state.lock();
if stop_lock.is_stopped() {
return Err(ErrorKind::Stopped.into());
}
let mut txhashset = self.txhashset.write(); let mut txhashset = self.txhashset.write();
let batch = self.store.batch()?; let batch = self.store.batch()?;
let mut ctx = self.new_ctx(opts, batch, &mut txhashset)?; let mut ctx = self.new_ctx(opts, batch, &mut txhashset)?;
@ -865,6 +897,13 @@ impl Chain {
fn compact_txhashset(&self) -> Result<(), Error> { fn compact_txhashset(&self) -> Result<(), Error> {
debug!("Starting blockchain compaction."); debug!("Starting blockchain compaction.");
{ {
// Note: We take a lock on the stop_state here and do not release it until
// we have finished processing this compaction operation.
let stop_lock = self.stop_state.lock();
if stop_lock.is_stopped() {
return Err(ErrorKind::Stopped.into());
}
let mut txhashset = self.txhashset.write(); let mut txhashset = self.txhashset.write();
txhashset.compact()?; txhashset.compact()?;
txhashset::extending_readonly(&mut txhashset, |extension| { txhashset::extending_readonly(&mut txhashset, |extension| {
@ -1145,8 +1184,8 @@ impl Chain {
} }
fn setup_head( fn setup_head(
genesis: Block, genesis: &Block,
store: Arc<store::ChainStore>, store: &store::ChainStore,
txhashset: &mut txhashset::TxHashSet, txhashset: &mut txhashset::TxHashSet,
) -> Result<(), Error> { ) -> Result<(), Error> {
let mut batch = store.batch()?; let mut batch = store.batch()?;
@ -1240,10 +1279,8 @@ fn setup_head(
let tip = Tip::from_header(&genesis.header); let tip = Tip::from_header(&genesis.header);
batch.save_head(&tip)?; batch.save_head(&tip)?;
batch.save_block_header(&genesis.header)?;
if genesis.kernels().len() > 0 { if genesis.kernels().len() > 0 {
let (utxo_sum, kernel_sum) = (sums, &genesis as &Committed).verify_kernel_sums( let (utxo_sum, kernel_sum) = (sums, genesis as &Committed).verify_kernel_sums(
genesis.header.overage(), genesis.header.overage(),
genesis.header.total_kernel_offset(), genesis.header.total_kernel_offset(),
)?; )?;

View file

@ -128,6 +128,9 @@ pub enum ErrorKind {
/// Error from summing and verifying kernel sums via committed trait. /// Error from summing and verifying kernel sums via committed trait.
#[fail(display = "Committed Trait: Error summing and verifying kernel sums")] #[fail(display = "Committed Trait: Error summing and verifying kernel sums")]
Committed(committed::Error), Committed(committed::Error),
/// We cannot process data once the Grin server has been stopped.
#[fail(display = "Stopped (Grin Shutting Down)")]
Stopped,
} }
impl Display for Error { impl Display for Error {

View file

@ -199,8 +199,8 @@ impl<'a> Batch<'a> {
/// Save the block and the associated input bitmap. /// Save the block and the associated input bitmap.
/// Note: the block header is not saved to the db here, assumes this has already been done. /// Note: the block header is not saved to the db here, assumes this has already been done.
pub fn save_block(&self, b: &Block) -> Result<(), Error> { pub fn save_block(&self, b: &Block) -> Result<(), Error> {
// Build the "input bitmap" for this new block and cache it locally. // Build the "input bitmap" for this new block and store it in the db.
self.build_and_cache_block_input_bitmap(&b)?; self.build_and_store_block_input_bitmap(&b)?;
// Save the block itself to the db. // Save the block itself to the db.
self.db self.db
@ -305,7 +305,7 @@ impl<'a> Batch<'a> {
Ok(bitmap) Ok(bitmap)
} }
fn build_and_cache_block_input_bitmap(&self, block: &Block) -> Result<Bitmap, Error> { fn build_and_store_block_input_bitmap(&self, block: &Block) -> Result<Bitmap, Error> {
// Build the bitmap. // Build the bitmap.
let bitmap = self.build_block_input_bitmap(block)?; let bitmap = self.build_block_input_bitmap(block)?;
@ -326,7 +326,7 @@ impl<'a> Batch<'a> {
} else { } else {
match self.get_block(bh) { match self.get_block(bh) {
Ok(block) => { Ok(block) => {
let bitmap = self.build_and_cache_block_input_bitmap(&block)?; let bitmap = self.build_and_store_block_input_bitmap(&block)?;
Ok(bitmap) Ok(bitmap)
} }
Err(e) => Err(e), Err(e) => Err(e),

View file

@ -21,7 +21,7 @@ use self::core::libtx;
use self::core::pow::{self, Difficulty}; use self::core::pow::{self, Difficulty};
use self::core::{consensus, genesis}; use self::core::{consensus, genesis};
use self::keychain::{ExtKeychain, ExtKeychainPath, Keychain}; use self::keychain::{ExtKeychain, ExtKeychainPath, Keychain};
use self::util::RwLock; use self::util::{Mutex, RwLock, StopState};
use chrono::Duration; use chrono::Duration;
use grin_chain as chain; use grin_chain as chain;
use grin_core as core; use grin_core as core;
@ -50,6 +50,7 @@ fn setup(dir_name: &str) -> Chain {
pow::verify_size, pow::verify_size,
verifier_cache, verifier_cache,
false, false,
Arc::new(Mutex::new(StopState::new())),
) )
.unwrap() .unwrap()
} }
@ -65,6 +66,7 @@ fn reload_chain(dir_name: &str) -> Chain {
pow::verify_size, pow::verify_size,
verifier_cache, verifier_cache,
false, false,
Arc::new(Mutex::new(StopState::new())),
) )
.unwrap() .unwrap()
} }

View file

@ -23,7 +23,7 @@ use self::core::libtx::{self, build, reward};
use self::core::pow::Difficulty; use self::core::pow::Difficulty;
use self::core::{consensus, global, pow}; use self::core::{consensus, global, pow};
use self::keychain::{ExtKeychain, ExtKeychainPath, Keychain}; use self::keychain::{ExtKeychain, ExtKeychainPath, Keychain};
use self::util::RwLock; use self::util::{Mutex, RwLock, StopState};
use chrono::Duration; use chrono::Duration;
use grin_chain as chain; use grin_chain as chain;
use grin_core as core; use grin_core as core;
@ -50,6 +50,7 @@ fn setup(dir_name: &str, genesis: Block) -> Chain {
pow::verify_size, pow::verify_size,
verifier_cache, verifier_cache,
false, false,
Arc::new(Mutex::new(StopState::new())),
) )
.unwrap() .unwrap()
} }
@ -541,6 +542,7 @@ fn actual_diff_iter_output() {
pow::verify_size, pow::verify_size,
verifier_cache, verifier_cache,
false, false,
Arc::new(Mutex::new(StopState::new())),
) )
.unwrap(); .unwrap();
let iter = chain.difficulty_iter(); let iter = chain.difficulty_iter();

View file

@ -21,7 +21,7 @@ use self::core::libtx::{self, build};
use self::core::pow::Difficulty; use self::core::pow::Difficulty;
use self::core::{consensus, pow}; use self::core::{consensus, pow};
use self::keychain::{ExtKeychain, ExtKeychainPath, Keychain}; use self::keychain::{ExtKeychain, ExtKeychainPath, Keychain};
use self::util::RwLock; use self::util::{Mutex, RwLock, StopState};
use chrono::Duration; use chrono::Duration;
use env_logger; use env_logger;
use grin_chain as chain; use grin_chain as chain;
@ -55,6 +55,7 @@ fn test_coinbase_maturity() {
pow::verify_size, pow::verify_size,
verifier_cache, verifier_cache,
false, false,
Arc::new(Mutex::new(StopState::new())),
) )
.unwrap(); .unwrap();

View file

@ -14,7 +14,6 @@
use std::fs::File; use std::fs::File;
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream}; use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use std::{io, thread}; use std::{io, thread};
@ -29,6 +28,7 @@ use crate::peer::Peer;
use crate::peers::Peers; use crate::peers::Peers;
use crate::store::PeerStore; use crate::store::PeerStore;
use crate::types::{Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, TxHashSetRead}; use crate::types::{Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, TxHashSetRead};
use crate::util::{Mutex, StopState};
use chrono::prelude::{DateTime, Utc}; use chrono::prelude::{DateTime, Utc};
/// P2P server implementation, handling bootstrapping to find and connect to /// P2P server implementation, handling bootstrapping to find and connect to
@ -38,8 +38,7 @@ pub struct Server {
capabilities: Capabilities, capabilities: Capabilities,
handshake: Arc<Handshake>, handshake: Arc<Handshake>,
pub peers: Arc<Peers>, pub peers: Arc<Peers>,
stop: Arc<AtomicBool>, stop_state: Arc<Mutex<StopState>>,
pause: Arc<AtomicBool>,
} }
// TODO TLS // TODO TLS
@ -51,16 +50,14 @@ impl Server {
config: P2PConfig, config: P2PConfig,
adapter: Arc<dyn ChainAdapter>, adapter: Arc<dyn ChainAdapter>,
genesis: Hash, genesis: Hash,
stop: Arc<AtomicBool>, stop_state: Arc<Mutex<StopState>>,
pause: Arc<AtomicBool>,
) -> Result<Server, Error> { ) -> Result<Server, Error> {
Ok(Server { Ok(Server {
config: config.clone(), config: config.clone(),
capabilities: capab, capabilities: capab,
handshake: Arc::new(Handshake::new(genesis, config.clone())), handshake: Arc::new(Handshake::new(genesis, config.clone())),
peers: Arc::new(Peers::new(PeerStore::new(db_env)?, adapter, config)), peers: Arc::new(Peers::new(PeerStore::new(db_env)?, adapter, config)),
stop, stop_state,
pause,
}) })
} }
@ -75,7 +72,7 @@ impl Server {
let sleep_time = Duration::from_millis(1); let sleep_time = Duration::from_millis(1);
loop { loop {
// Pause peer ingress connection request. Only for tests. // Pause peer ingress connection request. Only for tests.
if self.pause.load(Ordering::Relaxed) { if self.stop_state.lock().is_paused() {
thread::sleep(Duration::from_secs(1)); thread::sleep(Duration::from_secs(1));
continue; continue;
} }
@ -95,7 +92,7 @@ impl Server {
warn!("Couldn't establish new client connection: {:?}", e); warn!("Couldn't establish new client connection: {:?}", e);
} }
} }
if self.stop.load(Ordering::Relaxed) { if self.stop_state.lock().is_stopped() {
break; break;
} }
thread::sleep(sleep_time); thread::sleep(sleep_time);
@ -194,7 +191,7 @@ impl Server {
} }
pub fn stop(&self) { pub fn stop(&self) {
self.stop.store(true, Ordering::Relaxed); self.stop_state.lock().stop();
self.peers.stop(); self.peers.stop();
} }

View file

@ -17,9 +17,9 @@ use grin_p2p as p2p;
use grin_store as store; use grin_store as store;
use grin_util as util; use grin_util as util;
use grin_util::{Mutex, StopState};
use std::net::{SocketAddr, TcpListener, TcpStream}; use std::net::{SocketAddr, TcpListener, TcpStream};
use std::sync::atomic::AtomicBool;
use std::sync::Arc; use std::sync::Arc;
use std::{thread, time}; use std::{thread, time};
@ -57,8 +57,7 @@ fn peer_handshake() {
p2p_config.clone(), p2p_config.clone(),
net_adapter.clone(), net_adapter.clone(),
Hash::from_vec(&vec![]), Hash::from_vec(&vec![]),
Arc::new(AtomicBool::new(false)), Arc::new(Mutex::new(StopState::new())),
Arc::new(AtomicBool::new(false)),
) )
.unwrap(), .unwrap(),
); );

View file

@ -24,7 +24,6 @@ use self::util::RwLock;
use crate::common::*; use crate::common::*;
use grin_core as core; use grin_core as core;
use grin_keychain as keychain; use grin_keychain as keychain;
use grin_pool as pool;
use grin_util as util; use grin_util as util;
use std::sync::Arc; use std::sync::Arc;

View file

@ -25,7 +25,6 @@ use crate::common::ChainAdapter;
use crate::common::*; use crate::common::*;
use grin_core as core; use grin_core as core;
use grin_keychain as keychain; use grin_keychain as keychain;
use grin_pool as pool;
use grin_util as util; use grin_util as util;
use std::sync::Arc; use std::sync::Arc;

View file

@ -25,7 +25,6 @@ use self::pool::types::*;
use self::pool::TransactionPool; use self::pool::TransactionPool;
use self::util::secp::pedersen::Commitment; use self::util::secp::pedersen::Commitment;
use self::util::RwLock; use self::util::RwLock;
use crate::pool::types::*;
use grin_chain as chain; use grin_chain as chain;
use grin_core as core; use grin_core as core;
use grin_keychain as keychain; use grin_keychain as keychain;

View file

@ -23,7 +23,6 @@ use self::util::RwLock;
use crate::common::*; use crate::common::*;
use grin_core as core; use grin_core as core;
use grin_keychain as keychain; use grin_keychain as keychain;
use grin_pool as pool;
use grin_util as util; use grin_util as util;
use std::sync::Arc; use std::sync::Arc;

View file

@ -12,10 +12,9 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use crate::util::RwLock; use crate::util::{Mutex, RwLock, StopState};
use chrono::prelude::Utc; use chrono::prelude::Utc;
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
@ -37,7 +36,7 @@ pub fn monitor_transactions(
dandelion_config: DandelionConfig, dandelion_config: DandelionConfig,
tx_pool: Arc<RwLock<TransactionPool>>, tx_pool: Arc<RwLock<TransactionPool>>,
verifier_cache: Arc<RwLock<dyn VerifierCache>>, verifier_cache: Arc<RwLock<dyn VerifierCache>>,
stop: Arc<AtomicBool>, stop_state: Arc<Mutex<StopState>>,
) { ) {
debug!("Started Dandelion transaction monitor."); debug!("Started Dandelion transaction monitor.");
@ -45,7 +44,7 @@ pub fn monitor_transactions(
.name("dandelion".to_string()) .name("dandelion".to_string())
.spawn(move || { .spawn(move || {
loop { loop {
if stop.load(Ordering::Relaxed) { if stop_state.lock().is_stopped() {
break; break;
} }

View file

@ -21,13 +21,13 @@ use chrono::prelude::Utc;
use chrono::{Duration, MIN_DATE}; use chrono::{Duration, MIN_DATE};
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use std::net::{SocketAddr, ToSocketAddrs}; use std::net::{SocketAddr, ToSocketAddrs};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{mpsc, Arc}; use std::sync::{mpsc, Arc};
use std::{cmp, io, str, thread, time}; use std::{cmp, io, str, thread, time};
use crate::p2p; use crate::p2p;
use crate::p2p::ChainAdapter; use crate::p2p::ChainAdapter;
use crate::pool::DandelionConfig; use crate::pool::DandelionConfig;
use crate::util::{Mutex, StopState};
// DNS Seeds with contact email associated // DNS Seeds with contact email associated
const DNS_SEEDS: &'static [&'static str] = &[ const DNS_SEEDS: &'static [&'static str] = &[
@ -40,8 +40,7 @@ pub fn connect_and_monitor(
dandelion_config: DandelionConfig, dandelion_config: DandelionConfig,
seed_list: Box<dyn Fn() -> Vec<SocketAddr> + Send>, seed_list: Box<dyn Fn() -> Vec<SocketAddr> + Send>,
preferred_peers: Option<Vec<SocketAddr>>, preferred_peers: Option<Vec<SocketAddr>>,
stop: Arc<AtomicBool>, stop_state: Arc<Mutex<StopState>>,
pause: Arc<AtomicBool>,
) { ) {
let _ = thread::Builder::new() let _ = thread::Builder::new()
.name("seed".to_string()) .name("seed".to_string())
@ -65,9 +64,13 @@ pub fn connect_and_monitor(
let mut prev_ping = Utc::now(); let mut prev_ping = Utc::now();
let mut start_attempt = 0; let mut start_attempt = 0;
while !stop.load(Ordering::Relaxed) { loop {
if stop_state.lock().is_stopped() {
break;
}
// Pause egress peer connection request. Only for tests. // Pause egress peer connection request. Only for tests.
if pause.load(Ordering::Relaxed) { if stop_state.lock().is_paused() {
thread::sleep(time::Duration::from_secs(1)); thread::sleep(time::Duration::from_secs(1));
continue; continue;
} }

View file

@ -16,7 +16,6 @@
//! the peer-to-peer server, the blockchain and the transaction pool) and acts //! the peer-to-peer server, the blockchain and the transaction pool) and acts
//! as a facade. //! as a facade.
use crate::util::RwLock;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
@ -38,6 +37,7 @@ use crate::p2p;
use crate::pool; use crate::pool;
use crate::store; use crate::store;
use crate::util::file::get_first_line; use crate::util::file::get_first_line;
use crate::util::{Mutex, RwLock, StopState};
/// Grin server holding internal structures. /// Grin server holding internal structures.
pub struct Server { pub struct Server {
@ -57,9 +57,7 @@ pub struct Server {
/// To be passed around to collect stats and info /// To be passed around to collect stats and info
state_info: ServerStateInfo, state_info: ServerStateInfo,
/// Stop flag /// Stop flag
pub stop: Arc<AtomicBool>, pub stop_state: Arc<Mutex<StopState>>,
/// Pause flag
pub pause: Arc<AtomicBool>,
} }
impl Server { impl Server {
@ -90,14 +88,14 @@ impl Server {
if let Some(s) = enable_test_miner { if let Some(s) = enable_test_miner {
if s { if s {
serv.start_test_miner(test_miner_wallet_url, serv.stop.clone()); serv.start_test_miner(test_miner_wallet_url, serv.stop_state.clone());
} }
} }
info_callback(serv.clone()); info_callback(serv.clone());
loop { loop {
thread::sleep(time::Duration::from_secs(1)); thread::sleep(time::Duration::from_secs(1));
if serv.stop.load(Ordering::Relaxed) { if serv.stop_state.lock().is_stopped() {
return Ok(()); return Ok(());
} }
} }
@ -112,8 +110,7 @@ impl Server {
Some(b) => b, Some(b) => b,
}; };
let stop = Arc::new(AtomicBool::new(false)); let stop_state = Arc::new(Mutex::new(StopState::new()));
let pause = Arc::new(AtomicBool::new(false));
// Shared cache for verification results. // Shared cache for verification results.
// We cache rangeproof verification and kernel signature verification. // We cache rangeproof verification and kernel signature verification.
@ -156,6 +153,7 @@ impl Server {
pow::verify_size, pow::verify_size,
verifier_cache.clone(), verifier_cache.clone(),
archive_mode, archive_mode,
stop_state.clone(),
)?); )?);
pool_adapter.set_chain(shared_chain.clone()); pool_adapter.set_chain(shared_chain.clone());
@ -175,8 +173,7 @@ impl Server {
config.p2p_config.clone(), config.p2p_config.clone(),
net_adapter.clone(), net_adapter.clone(),
genesis.hash(), genesis.hash(),
stop.clone(), stop_state.clone(),
pause.clone(),
)?); )?);
chain_adapter.init(p2p_server.peers.clone()); chain_adapter.init(p2p_server.peers.clone());
pool_net_adapter.init(p2p_server.peers.clone()); pool_net_adapter.init(p2p_server.peers.clone());
@ -206,8 +203,7 @@ impl Server {
config.dandelion_config.clone(), config.dandelion_config.clone(),
seeder, seeder,
peers_preferred, peers_preferred,
stop.clone(), stop_state.clone(),
pause.clone(),
); );
} }
@ -220,7 +216,7 @@ impl Server {
sync_state.clone(), sync_state.clone(),
p2p_server.peers.clone(), p2p_server.peers.clone(),
shared_chain.clone(), shared_chain.clone(),
stop.clone(), stop_state.clone(),
); );
let p2p_inner = p2p_server.clone(); let p2p_inner = p2p_server.clone();
@ -244,7 +240,7 @@ impl Server {
config.dandelion_config.clone(), config.dandelion_config.clone(),
tx_pool.clone(), tx_pool.clone(),
verifier_cache.clone(), verifier_cache.clone(),
stop.clone(), stop_state.clone(),
); );
warn!("Grin server started."); warn!("Grin server started.");
@ -258,8 +254,7 @@ impl Server {
state_info: ServerStateInfo { state_info: ServerStateInfo {
..Default::default() ..Default::default()
}, },
stop, stop_state,
pause,
}) })
} }
@ -305,7 +300,11 @@ impl Server {
/// Start mining for blocks internally on a separate thread. Relies on /// Start mining for blocks internally on a separate thread. Relies on
/// internal miner, and should only be used for automated testing. Burns /// internal miner, and should only be used for automated testing. Burns
/// reward if wallet_listener_url is 'None' /// reward if wallet_listener_url is 'None'
pub fn start_test_miner(&self, wallet_listener_url: Option<String>, stop: Arc<AtomicBool>) { pub fn start_test_miner(
&self,
wallet_listener_url: Option<String>,
stop_state: Arc<Mutex<StopState>>,
) {
info!("start_test_miner - start",); info!("start_test_miner - start",);
let sync_state = self.sync_state.clone(); let sync_state = self.sync_state.clone();
let config_wallet_url = match wallet_listener_url.clone() { let config_wallet_url = match wallet_listener_url.clone() {
@ -327,7 +326,7 @@ impl Server {
self.chain.clone(), self.chain.clone(),
self.tx_pool.clone(), self.tx_pool.clone(),
self.verifier_cache.clone(), self.verifier_cache.clone(),
stop, stop_state,
); );
miner.set_debug_output_id(format!("Port {}", self.config.p2p_config.port)); miner.set_debug_output_id(format!("Port {}", self.config.p2p_config.port));
let _ = thread::Builder::new() let _ = thread::Builder::new()
@ -429,24 +428,25 @@ impl Server {
/// Stop the server. /// Stop the server.
pub fn stop(&self) { pub fn stop(&self) {
self.p2p.stop(); self.p2p.stop();
self.stop.store(true, Ordering::Relaxed); self.stop_state.lock().stop();
} }
/// Pause the p2p server. /// Pause the p2p server.
pub fn pause(&self) { pub fn pause(&self) {
self.pause.store(true, Ordering::Relaxed); self.stop_state.lock().pause();
thread::sleep(time::Duration::from_secs(1)); thread::sleep(time::Duration::from_secs(1));
self.p2p.pause(); self.p2p.pause();
} }
/// Resume the p2p server. /// Resume p2p server.
/// TODO - We appear not to resume the p2p server (peer connections) here?
pub fn resume(&self) { pub fn resume(&self) {
self.pause.store(false, Ordering::Relaxed); self.stop_state.lock().resume();
} }
/// Stops the test miner without stopping the p2p layer /// Stops the test miner without stopping the p2p layer
pub fn stop_test_miner(&self, stop: Arc<AtomicBool>) { pub fn stop_test_miner(&self, stop: Arc<Mutex<StopState>>) {
stop.store(true, Ordering::Relaxed); stop.lock().stop();
info!("stop_test_miner - stop",); info!("stop_test_miner - stop",);
} }
} }

View file

@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::thread; use std::thread;
use std::time; use std::time;
@ -24,17 +23,18 @@ use crate::grin::sync::body_sync::BodySync;
use crate::grin::sync::header_sync::HeaderSync; use crate::grin::sync::header_sync::HeaderSync;
use crate::grin::sync::state_sync::StateSync; use crate::grin::sync::state_sync::StateSync;
use crate::p2p; use crate::p2p;
use crate::util::{Mutex, StopState};
pub fn run_sync( pub fn run_sync(
sync_state: Arc<SyncState>, sync_state: Arc<SyncState>,
peers: Arc<p2p::Peers>, peers: Arc<p2p::Peers>,
chain: Arc<chain::Chain>, chain: Arc<chain::Chain>,
stop: Arc<AtomicBool>, stop_state: Arc<Mutex<StopState>>,
) { ) {
let _ = thread::Builder::new() let _ = thread::Builder::new()
.name("sync".to_string()) .name("sync".to_string())
.spawn(move || { .spawn(move || {
let runner = SyncRunner::new(sync_state, peers, chain, stop); let runner = SyncRunner::new(sync_state, peers, chain, stop_state);
runner.sync_loop(); runner.sync_loop();
}); });
} }
@ -43,7 +43,7 @@ pub struct SyncRunner {
sync_state: Arc<SyncState>, sync_state: Arc<SyncState>,
peers: Arc<p2p::Peers>, peers: Arc<p2p::Peers>,
chain: Arc<chain::Chain>, chain: Arc<chain::Chain>,
stop: Arc<AtomicBool>, stop_state: Arc<Mutex<StopState>>,
} }
impl SyncRunner { impl SyncRunner {
@ -51,13 +51,13 @@ impl SyncRunner {
sync_state: Arc<SyncState>, sync_state: Arc<SyncState>,
peers: Arc<p2p::Peers>, peers: Arc<p2p::Peers>,
chain: Arc<chain::Chain>, chain: Arc<chain::Chain>,
stop: Arc<AtomicBool>, stop_state: Arc<Mutex<StopState>>,
) -> SyncRunner { ) -> SyncRunner {
SyncRunner { SyncRunner {
sync_state, sync_state,
peers, peers,
chain, chain,
stop, stop_state,
} }
} }
@ -121,7 +121,11 @@ impl SyncRunner {
let mut highest_height = 0; let mut highest_height = 0;
// Main syncing loop // Main syncing loop
while !self.stop.load(Ordering::Relaxed) { loop {
if self.stop_state.lock().is_stopped() {
break;
}
thread::sleep(time::Duration::from_millis(10)); thread::sleep(time::Duration::from_millis(10));
// check whether syncing is generally needed, when we compare our state with others // check whether syncing is generally needed, when we compare our state with others

View file

@ -19,7 +19,6 @@
use crate::util::RwLock; use crate::util::RwLock;
use chrono::prelude::Utc; use chrono::prelude::Utc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use crate::chain; use crate::chain;
@ -30,13 +29,14 @@ use crate::core::core::{Block, BlockHeader};
use crate::core::global; use crate::core::global;
use crate::mining::mine_block; use crate::mining::mine_block;
use crate::pool; use crate::pool;
use crate::util::{Mutex, StopState};
pub struct Miner { pub struct Miner {
config: StratumServerConfig, config: StratumServerConfig,
chain: Arc<chain::Chain>, chain: Arc<chain::Chain>,
tx_pool: Arc<RwLock<pool::TransactionPool>>, tx_pool: Arc<RwLock<pool::TransactionPool>>,
verifier_cache: Arc<RwLock<dyn VerifierCache>>, verifier_cache: Arc<RwLock<dyn VerifierCache>>,
stop: Arc<AtomicBool>, stop_state: Arc<Mutex<StopState>>,
// Just to hold the port we're on, so this miner can be identified // Just to hold the port we're on, so this miner can be identified
// while watching debug output // while watching debug output
@ -51,7 +51,7 @@ impl Miner {
chain: Arc<chain::Chain>, chain: Arc<chain::Chain>,
tx_pool: Arc<RwLock<pool::TransactionPool>>, tx_pool: Arc<RwLock<pool::TransactionPool>>,
verifier_cache: Arc<RwLock<dyn VerifierCache>>, verifier_cache: Arc<RwLock<dyn VerifierCache>>,
stop: Arc<AtomicBool>, stop_state: Arc<Mutex<StopState>>,
) -> Miner { ) -> Miner {
Miner { Miner {
config, config,
@ -59,7 +59,7 @@ impl Miner {
tx_pool, tx_pool,
verifier_cache, verifier_cache,
debug_output_id: String::from("none"), debug_output_id: String::from("none"),
stop, stop_state,
} }
} }
@ -135,7 +135,11 @@ impl Miner {
// nothing has changed. We only want to create a new key_id for each new block. // nothing has changed. We only want to create a new key_id for each new block.
let mut key_id = None; let mut key_id = None;
while !self.stop.load(Ordering::Relaxed) { loop {
if self.stop_state.lock().is_stopped() {
break;
}
trace!("in miner loop. key_id: {:?}", key_id); trace!("in miner loop. key_id: {:?}", key_id);
// get the latest chain state and build a block on top of it // get the latest chain state and build a block on top of it

View file

@ -222,7 +222,7 @@ impl LocalServerContainer {
"starting test Miner on port {}", "starting test Miner on port {}",
self.config.p2p_server_port self.config.p2p_server_port
); );
s.start_test_miner(wallet_url, s.stop.clone()); s.start_test_miner(wallet_url, s.stop_state.clone());
} }
for p in &mut self.peer_list { for p in &mut self.peer_list {

View file

@ -19,7 +19,7 @@ mod framework;
use self::core::core::hash::Hashed; use self::core::core::hash::Hashed;
use self::core::global::{self, ChainTypes}; use self::core::global::{self, ChainTypes};
use self::util::Mutex; use self::util::{Mutex, StopState};
use self::wallet::controller; use self::wallet::controller;
use self::wallet::libwallet::types::{WalletBackend, WalletInst}; use self::wallet::libwallet::types::{WalletBackend, WalletInst};
use self::wallet::lmdb_wallet::LMDBBackend; use self::wallet::lmdb_wallet::LMDBBackend;
@ -35,7 +35,6 @@ use grin_wallet as wallet;
use std::cmp; use std::cmp;
use std::default::Default; use std::default::Default;
use std::process::exit; use std::process::exit;
use std::sync::atomic::AtomicBool;
use std::sync::Arc; use std::sync::Arc;
use std::{thread, time}; use std::{thread, time};
@ -219,7 +218,7 @@ fn simulate_block_propagation() {
} }
// start mining // start mining
let stop = Arc::new(AtomicBool::new(false)); let stop = Arc::new(Mutex::new(StopState::new()));
servers[0].start_test_miner(None, stop.clone()); servers[0].start_test_miner(None, stop.clone());
// monitor for a change of head on a different server and check whether // monitor for a change of head on a different server and check whether
@ -272,7 +271,7 @@ fn simulate_full_sync() {
let s1 = servers::Server::new(framework::config(1000, "grin-sync", 1000)).unwrap(); let s1 = servers::Server::new(framework::config(1000, "grin-sync", 1000)).unwrap();
// mine a few blocks on server 1 // mine a few blocks on server 1
let stop = Arc::new(AtomicBool::new(false)); let stop = Arc::new(Mutex::new(StopState::new()));
s1.start_test_miner(None, stop.clone()); s1.start_test_miner(None, stop.clone());
thread::sleep(time::Duration::from_secs(8)); thread::sleep(time::Duration::from_secs(8));
s1.stop_test_miner(stop); s1.stop_test_miner(stop);
@ -328,7 +327,7 @@ fn simulate_fast_sync() {
// start s1 and mine enough blocks to get beyond the fast sync horizon // start s1 and mine enough blocks to get beyond the fast sync horizon
let s1 = servers::Server::new(framework::config(2000, "grin-fast", 2000)).unwrap(); let s1 = servers::Server::new(framework::config(2000, "grin-fast", 2000)).unwrap();
let stop = Arc::new(AtomicBool::new(false)); let stop = Arc::new(Mutex::new(StopState::new()));
s1.start_test_miner(None, stop.clone()); s1.start_test_miner(None, stop.clone());
while s1.head().height < 20 { while s1.head().height < 20 {
@ -460,7 +459,7 @@ fn long_fork_test_preparation() -> Vec<servers::Server> {
let s0 = servers::Server::new(conf).unwrap(); let s0 = servers::Server::new(conf).unwrap();
thread::sleep(time::Duration::from_millis(1_000)); thread::sleep(time::Duration::from_millis(1_000));
s.push(s0); s.push(s0);
let stop = Arc::new(AtomicBool::new(false)); let stop = Arc::new(Mutex::new(StopState::new()));
s[0].start_test_miner(None, stop.clone()); s[0].start_test_miner(None, stop.clone());
while s[0].head().height < global::cut_through_horizon() as u64 + 10 { while s[0].head().height < global::cut_through_horizon() as u64 + 10 {
@ -545,7 +544,7 @@ fn long_fork_test_mining(blocks: u64, n: u16, s: &servers::Server) {
let sn_header = s.chain.head().unwrap(); let sn_header = s.chain.head().unwrap();
// Mining // Mining
let stop = Arc::new(AtomicBool::new(false)); let stop = Arc::new(Mutex::new(StopState::new()));
s.start_test_miner(None, stop.clone()); s.start_test_miner(None, stop.clone());
while s.head().height < sn_header.height + blocks { while s.head().height < sn_header.height + blocks {
@ -928,7 +927,7 @@ fn replicate_tx_fluff_failure() {
s1_config.dandelion_config.relay_secs = Some(1); s1_config.dandelion_config.relay_secs = Some(1);
let s1 = servers::Server::new(s1_config.clone()).unwrap(); let s1 = servers::Server::new(s1_config.clone()).unwrap();
// Mine off of server 1 // Mine off of server 1
s1.start_test_miner(s1_config.test_miner_wallet_url, s1.stop.clone()); s1.start_test_miner(s1_config.test_miner_wallet_url, s1.stop_state.clone());
thread::sleep(time::Duration::from_secs(5)); thread::sleep(time::Duration::from_secs(5));
// Server 2 (another node) // Server 2 (another node)

View file

@ -23,11 +23,11 @@ use bufstream::BufStream;
use grin_core as core; use grin_core as core;
use grin_servers as servers; use grin_servers as servers;
use grin_util as util; use grin_util as util;
use grin_util::{Mutex, StopState};
use serde_json::Value; use serde_json::Value;
use std::io::prelude::{BufRead, Write}; use std::io::prelude::{BufRead, Write};
use std::net::TcpStream; use std::net::TcpStream;
use std::process; use std::process;
use std::sync::atomic::AtomicBool;
use std::sync::Arc; use std::sync::Arc;
use std::{thread, time}; use std::{thread, time};
@ -140,7 +140,7 @@ fn basic_stratum_server() {
info!("stratum server and worker stats verification ok"); info!("stratum server and worker stats verification ok");
// Start mining blocks // Start mining blocks
let stop = Arc::new(AtomicBool::new(false)); let stop = Arc::new(Mutex::new(StopState::new()));
s.start_test_miner(None, stop.clone()); s.start_test_miner(None, stop.clone());
info!("test miner started"); info!("test miner started");

View file

@ -13,7 +13,7 @@
//! Implementation of the persistent Backend for the prunable MMR tree. //! Implementation of the persistent Backend for the prunable MMR tree.
use std::{fs, io, marker, time}; use std::{fs, io, time};
use crate::core::core::hash::{Hash, Hashed}; use crate::core::core::hash::{Hash, Hashed};
use crate::core::core::pmmr::{self, family, Backend}; use crate::core::core::pmmr::{self, family, Backend};

View file

@ -109,3 +109,52 @@ where
pub fn to_base64(s: &str) -> String { pub fn to_base64(s: &str) -> String {
base64::encode(s) base64::encode(s)
} }
/// Global stopped/paused state shared across various subcomponents of Grin.
///
/// Arc<Mutex<StopState>> allows the chain to lock the stop_state during critical processing.
/// Other subcomponents cannot abruptly shutdown the server during block/header processing.
/// This should prevent the chain ever ending up in an inconsistent state on restart.
///
/// "Stopped" allows a clean shutdown of the Grin server.
/// "Paused" is used in some tests to allow nodes to reach steady state etc.
///
pub struct StopState {
stopped: bool,
paused: bool,
}
impl StopState {
/// Create a new stop_state in default "running" state.
pub fn new() -> StopState {
StopState {
stopped: false,
paused: false,
}
}
/// Check if we are stopped.
pub fn is_stopped(&self) -> bool {
self.stopped
}
/// Check if we are paused.
pub fn is_paused(&self) -> bool {
self.paused
}
/// Stop the server.
pub fn stop(&mut self) {
self.stopped = true;
}
/// Pause the server (only used in tests).
pub fn pause(&mut self) {
self.paused = true;
}
/// Resume a paused server (only used in tests).
pub fn resume(&mut self) {
self.paused = false;
}
}

View file

@ -26,7 +26,7 @@ use self::core::{pow, ser};
use self::keychain::Keychain; use self::keychain::Keychain;
use self::util::secp::pedersen; use self::util::secp::pedersen;
use self::util::secp::pedersen::Commitment; use self::util::secp::pedersen::Commitment;
use self::util::{Mutex, RwLock}; use self::util::{Mutex, RwLock, StopState};
use crate::libwallet::types::*; use crate::libwallet::types::*;
use crate::{controller, libwallet, WalletCommAdapter, WalletConfig}; use crate::{controller, libwallet, WalletCommAdapter, WalletConfig};
use failure::ResultExt; use failure::ResultExt;
@ -110,8 +110,8 @@ where
pow::verify_size, pow::verify_size,
verifier_cache, verifier_cache,
false, false,
) Arc::new(Mutex::new(StopState::new())),
.unwrap(); ).unwrap();
let (tx, rx) = channel(); let (tx, rx) = channel();
let retval = WalletProxy { let retval = WalletProxy {
chain_dir: chain_dir.to_owned(), chain_dir: chain_dir.to_owned(),