diff --git a/api/src/types.rs b/api/src/types.rs index 84fb423fc..555004741 100644 --- a/api/src/types.rs +++ b/api/src/types.rs @@ -434,8 +434,7 @@ impl<'de> serde::de::Deserialize<'de> for OutputPrintable { } if output_type.is_none() - || commit.is_none() - || spent.is_none() + || commit.is_none() || spent.is_none() || proof_hash.is_none() || mmr_index.is_none() { diff --git a/chain/src/chain.rs b/chain/src/chain.rs index c0d38bad7..5d359b513 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -33,7 +33,7 @@ use crate::types::{ BlockStatus, ChainAdapter, NoStatus, Options, Tip, TxHashSetRoots, TxHashsetWriteStatus, }; use crate::util::secp::pedersen::{Commitment, RangeProof}; -use crate::util::{Mutex, RwLock, StopState}; +use crate::util::RwLock; use grin_store::Error::NotFoundErr; use std::collections::HashMap; use std::fs::{self, File}; @@ -152,7 +152,6 @@ pub struct Chain { // POW verification function pow_verifier: fn(&BlockHeader) -> Result<(), pow::Error>, archive_mode: bool, - stop_state: Arc>, genesis: BlockHeader, } @@ -167,16 +166,7 @@ impl Chain { pow_verifier: fn(&BlockHeader) -> Result<(), pow::Error>, verifier_cache: Arc>, archive_mode: bool, - stop_state: Arc>, ) -> Result { - // Note: We take a lock on the stop_state here and do not release it until - // we have finished chain initialization. - let stop_state_local = stop_state.clone(); - let stop_lock = stop_state_local.lock(); - if stop_lock.is_stopped() { - return Err(ErrorKind::Stopped.into()); - } - let store = Arc::new(store::ChainStore::new(&db_root)?); // open the txhashset, creating a new one if necessary @@ -194,7 +184,6 @@ impl Chain { pow_verifier, verifier_cache, archive_mode, - stop_state, genesis: genesis.header.clone(), }) } @@ -283,15 +272,6 @@ impl Chain { /// or false if it has added to a fork (or orphan?). fn process_block_single(&self, b: Block, opts: Options) -> Result, Error> { let (maybe_new_head, prev_head) = { - // Note: We take a lock on the stop_state here and do not release it until - // we have finished processing this single block. - // We take care to write both the txhashset *and* the batch while we - // have the stop_state lock. - let stop_lock = self.stop_state.lock(); - if stop_lock.is_stopped() { - return Err(ErrorKind::Stopped.into()); - } - let mut txhashset = self.txhashset.write(); let batch = self.store.batch()?; let mut ctx = self.new_ctx(opts, batch, &mut txhashset)?; @@ -381,15 +361,6 @@ impl Chain { /// This is only ever used during sync and is based on sync_head. /// We update header_head here if our total work increases. pub fn sync_block_headers(&self, headers: &[BlockHeader], opts: Options) -> Result<(), Error> { - // Note: We take a lock on the stop_state here and do not release it until - // we have finished processing this single block. - // We take care to write both the txhashset *and* the batch while we - // have the stop_state lock. - let stop_lock = self.stop_state.lock(); - if stop_lock.is_stopped() { - return Err(ErrorKind::Stopped.into()); - } - let mut txhashset = self.txhashset.write(); let batch = self.store.batch()?; let mut ctx = self.new_ctx(opts, batch, &mut txhashset)?; @@ -1098,14 +1069,6 @@ impl Chain { } } - // Note: We take a lock on the stop_state here and do not release it until - // we have finished processing this chain compaction operation. - // We want to avoid shutting the node down in the middle of compacting the data. - let stop_lock = self.stop_state.lock(); - if stop_lock.is_stopped() { - return Err(ErrorKind::Stopped.into()); - } - // Take a write lock on the txhashet and start a new writeable db batch. let mut txhashset = self.txhashset.write(); let mut batch = self.store.batch()?; diff --git a/chain/src/store.rs b/chain/src/store.rs index 58c5b7216..8acbdc2b5 100644 --- a/chain/src/store.rs +++ b/chain/src/store.rs @@ -384,7 +384,6 @@ impl<'a> Batch<'a> { let key = to_key(BLOCK_PREFIX, &mut "".to_string().into_bytes()); self.db.iter(&key) } - } /// An iterator on blocks, from latest to earliest, specialized to return diff --git a/chain/tests/data_file_integrity.rs b/chain/tests/data_file_integrity.rs index 9e8b9a816..f4f90cce5 100644 --- a/chain/tests/data_file_integrity.rs +++ b/chain/tests/data_file_integrity.rs @@ -21,7 +21,7 @@ use self::core::libtx; use self::core::pow::{self, Difficulty}; use self::core::{consensus, genesis}; use self::keychain::{ExtKeychain, ExtKeychainPath, Keychain}; -use self::util::{Mutex, RwLock, StopState}; +use self::util::RwLock; use chrono::Duration; use grin_chain as chain; use grin_core as core; @@ -47,7 +47,6 @@ fn setup(dir_name: &str) -> Chain { pow::verify_size, verifier_cache, false, - Arc::new(Mutex::new(StopState::new())), ) .unwrap() } @@ -61,7 +60,6 @@ fn reload_chain(dir_name: &str) -> Chain { pow::verify_size, verifier_cache, false, - Arc::new(Mutex::new(StopState::new())), ) .unwrap() } diff --git a/chain/tests/mine_simple_chain.rs b/chain/tests/mine_simple_chain.rs index 32fc81173..80da3b0ef 100644 --- a/chain/tests/mine_simple_chain.rs +++ b/chain/tests/mine_simple_chain.rs @@ -23,7 +23,7 @@ use self::core::libtx::{self, build, reward}; use self::core::pow::Difficulty; use self::core::{consensus, global, pow}; use self::keychain::{ExtKeychain, ExtKeychainPath, Keychain}; -use self::util::{Mutex, RwLock, StopState}; +use self::util::{RwLock, StopState}; use chrono::Duration; use grin_chain as chain; use grin_core as core; @@ -47,7 +47,6 @@ fn setup(dir_name: &str, genesis: Block) -> Chain { pow::verify_size, verifier_cache, false, - Arc::new(Mutex::new(StopState::new())), ) .unwrap() } @@ -565,7 +564,6 @@ fn actual_diff_iter_output() { pow::verify_size, verifier_cache, false, - Arc::new(Mutex::new(StopState::new())), ) .unwrap(); let iter = chain.difficulty_iter().unwrap(); diff --git a/chain/tests/test_coinbase_maturity.rs b/chain/tests/test_coinbase_maturity.rs index 6f4575c1e..540c6ef8d 100644 --- a/chain/tests/test_coinbase_maturity.rs +++ b/chain/tests/test_coinbase_maturity.rs @@ -20,7 +20,7 @@ use self::core::libtx::{self, build}; use self::core::pow::Difficulty; use self::core::{consensus, pow}; use self::keychain::{ExtKeychain, ExtKeychainPath, Keychain}; -use self::util::{Mutex, RwLock, StopState}; +use self::util::{RwLock, StopState}; use chrono::Duration; use env_logger; use grin_chain as chain; @@ -53,7 +53,6 @@ fn test_coinbase_maturity() { pow::verify_size, verifier_cache, false, - Arc::new(Mutex::new(StopState::new())), ) .unwrap(); diff --git a/etc/gen_gen/src/bin/gen_gen.rs b/etc/gen_gen/src/bin/gen_gen.rs index 829ba4234..b9abbb2c6 100644 --- a/etc/gen_gen/src/bin/gen_gen.rs +++ b/etc/gen_gen/src/bin/gen_gen.rs @@ -281,7 +281,7 @@ fn setup_chain(dir_name: &str, genesis: core::core::Block) -> chain::Chain { core::pow::verify_size, verifier_cache, false, - Arc::new(util::Mutex::new(util::StopState::new())), + Arc::new(util::StopState::new()), ) .unwrap() } diff --git a/p2p/src/conn.rs b/p2p/src/conn.rs index 344ddef59..7bff66dc9 100644 --- a/p2p/src/conn.rs +++ b/p2p/src/conn.rs @@ -24,7 +24,11 @@ use std::fs::File; use std::io::{self, Read, Write}; use std::net::{Shutdown, TcpStream}; use std::sync::{mpsc, Arc}; -use std::{cmp, thread, time}; +use std::{ + cmp, + thread::{self, JoinHandle}, + time, +}; use crate::core::ser; use crate::core::ser::FixedLength; @@ -43,7 +47,7 @@ pub trait MessageHandler: Send + 'static { &self, msg: Message<'a>, writer: &'a mut dyn Write, - received_bytes: Arc>, + tracker: Arc, ) -> Result>, Error>; } @@ -130,15 +134,12 @@ impl<'a> Response<'a> { }) } - fn write(mut self, sent_bytes: Arc>) -> Result<(), Error> { + fn write(mut self, tracker: Arc) -> Result<(), Error> { let mut msg = ser::ser_vec(&MsgHeader::new(self.resp_type, self.body.len() as u64))?; msg.append(&mut self.body); write_all(&mut self.stream, &msg[..], time::Duration::from_secs(10))?; - // Increase sent bytes counter - { - let mut sent_bytes = sent_bytes.write(); - sent_bytes.inc(msg.len() as u64); - } + tracker.inc_sent(msg.len() as u64); + if let Some(mut file) = self.attachment { let mut buf = [0u8; 8000]; loop { @@ -148,8 +149,7 @@ impl<'a> Response<'a> { write_all(&mut self.stream, &buf[..n], time::Duration::from_secs(10))?; // Increase sent bytes "quietly" without incrementing the counter. // (In a loop here for the single attachment). - let mut sent_bytes = sent_bytes.write(); - sent_bytes.inc_quiet(n as u64); + tracker.inc_quiet_sent(n as u64); } Err(e) => return Err(From::from(e)), } @@ -165,72 +165,120 @@ impl<'a> Response<'a> { pub const SEND_CHANNEL_CAP: usize = 10; -pub struct Tracker { - /// Bytes we've sent. - pub sent_bytes: Arc>, - /// Bytes we've received. - pub received_bytes: Arc>, - /// Channel to allow sending data through the connection - pub send_channel: mpsc::SyncSender>, +pub struct StopHandle { /// Channel to close the connection pub close_channel: mpsc::Sender<()>, + // we need Option to take ownhership of the handle in stop() + peer_thread: Option>, } -impl Tracker { - pub fn send(&self, body: T, msg_type: Type) -> Result<(), Error> +impl StopHandle { + /// Schedule this connection to safely close via the async close_channel. + pub fn stop(&self) { + if self.close_channel.send(()).is_err() { + debug!("peer's close_channel is disconnected, must be stopped already"); + return; + } + } + + pub fn stop_and_wait(&mut self) { + self.stop(); + if let Some(peer_thread) = self.peer_thread.take() { + // wait only if other thread is calling us, eg shutdown + if thread::current().id() != peer_thread.thread().id() { + debug!("waiting for thread {:?} exit", peer_thread.thread().id()); + if let Err(e) = peer_thread.join() { + error!("failed to stop peer thread: {:?}", e); + } + } else { + debug!( + "attempt to stop thread {:?} from itself", + peer_thread.thread().id() + ); + } + } + } +} + +pub struct ConnHandle { + /// Channel to allow sending data through the connection + pub send_channel: mpsc::SyncSender>, +} + +impl ConnHandle { + pub fn send(&self, body: T, msg_type: Type) -> Result where T: ser::Writeable, { let buf = write_to_buf(body, msg_type)?; let buf_len = buf.len(); self.send_channel.try_send(buf)?; + Ok(buf_len as u64) + } +} - // Increase sent bytes counter - let mut sent_bytes = self.sent_bytes.write(); - sent_bytes.inc(buf_len as u64); +pub struct Tracker { + /// Bytes we've sent. + pub sent_bytes: Arc>, + /// Bytes we've received. + pub received_bytes: Arc>, +} - Ok(()) +impl Tracker { + pub fn new() -> Tracker { + let received_bytes = Arc::new(RwLock::new(RateCounter::new())); + let sent_bytes = Arc::new(RwLock::new(RateCounter::new())); + Tracker { + received_bytes, + sent_bytes, + } } - /// Schedule this connection to safely close via the async close_channel. - pub fn close(&self) { - let _ = self.close_channel.send(()); + pub fn inc_received(&self, size: u64) { + self.received_bytes.write().inc(size); + } + + pub fn inc_sent(&self, size: u64) { + self.sent_bytes.write().inc(size); + } + + pub fn inc_quiet_received(&self, size: u64) { + self.received_bytes.write().inc_quiet(size); + } + + pub fn inc_quiet_sent(&self, size: u64) { + self.sent_bytes.write().inc_quiet(size); } } /// Start listening on the provided connection and wraps it. Does not hang /// the current thread, instead just returns a future and the Connection /// itself. -pub fn listen(stream: TcpStream, handler: H) -> Tracker +pub fn listen( + stream: TcpStream, + tracker: Arc, + handler: H, +) -> io::Result<(ConnHandle, StopHandle)> where H: MessageHandler, { let (send_tx, send_rx) = mpsc::sync_channel(SEND_CHANNEL_CAP); let (close_tx, close_rx) = mpsc::channel(); - // Counter of number of bytes received - let received_bytes = Arc::new(RwLock::new(RateCounter::new())); - // Counter of number of bytes sent - let sent_bytes = Arc::new(RwLock::new(RateCounter::new())); - stream .set_nonblocking(true) .expect("Non-blocking IO not available."); - poll( - stream, - handler, - send_rx, - close_rx, - received_bytes.clone(), - sent_bytes.clone(), - ); + let peer_thread = poll(stream, handler, send_rx, close_rx, tracker)?; - Tracker { - sent_bytes: sent_bytes.clone(), - received_bytes: received_bytes.clone(), - send_channel: send_tx, - close_channel: close_tx, - } + Ok(( + ConnHandle { + send_channel: send_tx, + }, + StopHandle { + close_channel: close_tx, + peer_thread: Some(peer_thread), + }, + )) } fn poll( @@ -238,16 +286,16 @@ fn poll( handler: H, send_rx: mpsc::Receiver>, close_rx: mpsc::Receiver<()>, - received_bytes: Arc>, - sent_bytes: Arc>, -) where + tracker: Arc, +) -> io::Result> +where H: MessageHandler, { // Split out tcp stream out into separate reader/writer halves. let mut reader = conn.try_clone().expect("clone conn for reader failed"); let mut writer = conn.try_clone().expect("clone conn for writer failed"); - let _ = thread::Builder::new() + thread::Builder::new() .name("peer".to_string()) .spawn(move || { let sleep_time = time::Duration::from_millis(5); @@ -265,19 +313,17 @@ fn poll( ); // Increase received bytes counter - received_bytes - .write() - .inc(MsgHeader::LEN as u64 + msg.header.msg_len); + tracker.inc_received(MsgHeader::LEN as u64 + msg.header.msg_len); if let Some(Some(resp)) = - try_break!(handler.consume(msg, &mut writer, received_bytes.clone())) + try_break!(handler.consume(msg, &mut writer, tracker.clone())) { - try_break!(resp.write(sent_bytes.clone())); + try_break!(resp.write(tracker.clone())); } } Some(MsgHeaderWrapper::Unknown(msg_len)) => { // Increase received bytes counter - received_bytes.write().inc(MsgHeader::LEN as u64 + msg_len); + tracker.inc_received(MsgHeader::LEN as u64 + msg_len); try_break!(read_discard(msg_len, &mut reader)); } @@ -288,7 +334,12 @@ fn poll( let maybe_data = retry_send.or_else(|_| send_rx.try_recv()); retry_send = Err(()); if let Ok(data) = maybe_data { - let written = try_break!(writer.write_all(&data[..]).map_err(&From::from)); + let written = try_break!(write_all( + &mut writer, + &data[..], + std::time::Duration::from_secs(10) + ) + .map_err(&From::from)); if written.is_none() { retry_send = Ok(data); } @@ -309,5 +360,5 @@ fn poll( .unwrap_or("?".to_owned()) ); let _ = conn.shutdown(Shutdown::Both); - }); + }) } diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 08f5b697b..46bdda9f6 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -54,7 +54,12 @@ pub struct Peer { state: Arc>, // set of all hashes known to this peer (so no need to send) tracking_adapter: TrackingAdapter, - connection: Mutex, + tracker: Arc, + send_handle: Mutex, + // we need a special lock for stop operation, can't reuse handle mutex for that + // because it may be locked by different reasons, so we should wait for that, close + // mutex can be taken only during shutdown, it happens once + stop_handle: Mutex, } impl fmt::Debug for Peer { @@ -65,17 +70,22 @@ impl fmt::Debug for Peer { impl Peer { // Only accept and connect can be externally used to build a peer - fn new(info: PeerInfo, conn: TcpStream, adapter: Arc) -> Peer { + fn new(info: PeerInfo, conn: TcpStream, adapter: Arc) -> std::io::Result { let state = Arc::new(RwLock::new(State::Connected)); let tracking_adapter = TrackingAdapter::new(adapter); let handler = Protocol::new(Arc::new(tracking_adapter.clone()), info.clone()); - let connection = Mutex::new(conn::listen(conn, handler)); - Peer { + let tracker = Arc::new(conn::Tracker::new()); + let (sendh, stoph) = conn::listen(conn, tracker.clone(), handler)?; + let send_handle = Mutex::new(sendh); + let stop_handle = Mutex::new(stoph); + Ok(Peer { info, state, tracking_adapter, - connection, - } + tracker, + send_handle, + stop_handle, + }) } pub fn accept( @@ -88,7 +98,7 @@ impl Peer { debug!("accept: handshaking from {:?}", conn.peer_addr()); let info = hs.accept(capab, total_difficulty, &mut conn); match info { - Ok(info) => Ok(Peer::new(info, conn, adapter)), + Ok(info) => Ok(Peer::new(info, conn, adapter)?), Err(e) => { debug!( "accept: handshaking from {:?} failed with error: {:?}", @@ -114,7 +124,7 @@ impl Peer { debug!("connect: handshaking with {:?}", conn.peer_addr()); let info = hs.initiate(capab, total_difficulty, self_addr, &mut conn); match info { - Ok(info) => Ok(Peer::new(info, conn, adapter)), + Ok(info) => Ok(Peer::new(info, conn, adapter)?), Err(e) => { debug!( "connect: handshaking with {:?} failed with error: {:?}", @@ -184,30 +194,26 @@ impl Peer { /// Whether the peer is considered abusive, mostly for spammy nodes pub fn is_abusive(&self) -> bool { - let conn = self.connection.lock(); - let rec = conn.received_bytes.read(); - let sent = conn.sent_bytes.read(); + let rec = self.tracker.received_bytes.read(); + let sent = self.tracker.sent_bytes.read(); rec.count_per_min() > MAX_PEER_MSG_PER_MIN || sent.count_per_min() > MAX_PEER_MSG_PER_MIN } /// Number of bytes sent to the peer pub fn last_min_sent_bytes(&self) -> Option { - let conn = self.connection.lock(); - let sent_bytes = conn.sent_bytes.read(); + let sent_bytes = self.tracker.sent_bytes.read(); Some(sent_bytes.bytes_per_min()) } /// Number of bytes received from the peer pub fn last_min_received_bytes(&self) -> Option { - let conn = self.connection.lock(); - let received_bytes = conn.received_bytes.read(); + let received_bytes = self.tracker.received_bytes.read(); Some(received_bytes.bytes_per_min()) } pub fn last_min_message_counts(&self) -> Option<(u64, u64)> { - let conn = self.connection.lock(); - let received_bytes = conn.received_bytes.read(); - let sent_bytes = conn.sent_bytes.read(); + let received_bytes = self.tracker.received_bytes.read(); + let sent_bytes = self.tracker.sent_bytes.read(); Some((sent_bytes.count_per_min(), received_bytes.count_per_min())) } @@ -218,7 +224,9 @@ impl Peer { /// Send a msg with given msg_type to our peer via the connection. fn send(&self, msg: T, msg_type: Type) -> Result<(), Error> { - self.connection.lock().send(msg, msg_type) + let bytes = self.send_handle.lock().send(msg, msg_type)?; + self.tracker.inc_sent(bytes); + Ok(()) } /// Send a ping to the remote peer, providing our local difficulty and @@ -384,14 +392,25 @@ impl Peer { pub fn send_kernel_data_request(&self) -> Result<(), Error> { debug!("Asking {} for kernel data.", self.info.addr); - self.connection - .lock() - .send(&KernelDataRequest {}, msg::Type::KernelDataRequest) + self.send(&KernelDataRequest {}, msg::Type::KernelDataRequest) } - /// Stops the peer, closing its connection + /// Stops the peer pub fn stop(&self) { - self.connection.lock().close(); + debug!("Stopping peer without waiting {:?}", self.info.addr); + match self.stop_handle.try_lock() { + Some(handle) => handle.stop(), + None => error!("can't get stop lock for peer"), + } + } + + /// Stops the peer and wait until peer's thread exit + pub fn stop_and_wait(&self) { + debug!("Stopping peer {:?}", self.info.addr); + match self.stop_handle.try_lock() { + Some(mut handle) => handle.stop_and_wait(), + None => error!("can't get stop lock for peer"), + } } } diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index 527a28359..36bfe018e 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -27,15 +27,16 @@ use crate::core::core; use crate::core::core::hash::{Hash, Hashed}; use crate::core::global; use crate::core::pow::Difficulty; -use chrono::prelude::*; -use chrono::Duration; - use crate::peer::Peer; use crate::store::{PeerData, PeerStore, State}; use crate::types::{ Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan, TxHashSetRead, MAX_PEER_ADDRS, }; +use chrono::prelude::*; +use chrono::Duration; + +const LOCK_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(2); pub struct Peers { pub adapter: Arc, @@ -57,6 +58,13 @@ impl Peers { /// Adds the peer to our internal peer mapping. Note that the peer is still /// returned so the server can run it. pub fn add_connected(&self, peer: Arc) -> Result<(), Error> { + let mut peers = match self.peers.try_write_for(LOCK_TIMEOUT) { + Some(peers) => peers, + None => { + error!("add_connected: failed to get peers lock"); + return Err(Error::Timeout); + } + }; let peer_data = PeerData { addr: peer.info.addr, capabilities: peer.info.capabilities, @@ -68,7 +76,7 @@ impl Peers { }; debug!("Saving newly connected peer {}.", peer_data.addr); self.save_peer(&peer_data)?; - self.peers.write().insert(peer_data.addr, peer.clone()); + peers.insert(peer_data.addr, peer.clone()); Ok(()) } @@ -90,14 +98,26 @@ impl Peers { } pub fn is_known(&self, addr: PeerAddr) -> bool { - self.peers.read().contains_key(&addr) + let peers = match self.peers.try_read_for(LOCK_TIMEOUT) { + Some(peers) => peers, + None => { + error!("is_known: failed to get peers lock"); + return false; + } + }; + peers.contains_key(&addr) } /// Get vec of peers we are currently connected to. pub fn connected_peers(&self) -> Vec> { - let mut res = self - .peers - .read() + let peers = match self.peers.try_read_for(LOCK_TIMEOUT) { + Some(peers) => peers, + None => { + error!("connected_peers: failed to get peers lock"); + return vec![]; + } + }; + let mut res = peers .values() .filter(|p| p.is_connected()) .cloned() @@ -115,7 +135,14 @@ impl Peers { /// Get a peer we're connected to by address. pub fn get_connected_peer(&self, addr: PeerAddr) -> Option> { - self.peers.read().get(&addr).map(|p| p.clone()) + let peers = match self.peers.try_read_for(LOCK_TIMEOUT) { + Some(peers) => peers, + None => { + error!("get_connected_peer: failed to get peers lock"); + return None; + } + }; + peers.get(&addr).map(|p| p.clone()) } /// Number of peers currently connected to. @@ -204,9 +231,7 @@ impl Peers { pub fn is_banned(&self, peer_addr: PeerAddr) -> bool { if let Ok(peer) = self.store.get_peer(peer_addr) { - if peer.flags == State::Banned { - return true; - } + return peer.flags == State::Banned; } false } @@ -215,6 +240,7 @@ impl Peers { pub fn ban_peer(&self, peer_addr: PeerAddr, ban_reason: ReasonForBan) { if let Err(e) = self.update_state(peer_addr, State::Banned) { error!("Couldn't ban {}: {:?}", peer_addr, e); + return; } if let Some(peer) = self.get_connected_peer(peer_addr) { @@ -226,7 +252,15 @@ impl Peers { }; peer.set_banned(); peer.stop(); - self.peers.write().remove(&peer.info.addr); + + let mut peers = match self.peers.try_write_for(LOCK_TIMEOUT) { + Some(peers) => peers, + None => { + error!("ban_peer: failed to get peers lock"); + return; + } + }; + peers.remove(&peer.info.addr); } } @@ -264,8 +298,16 @@ impl Peers { "Error sending {:?} to peer {:?}: {:?}", obj_name, &p.info.addr, e ); + + let mut peers = match self.peers.try_write_for(LOCK_TIMEOUT) { + Some(peers) => peers, + None => { + error!("broadcast: failed to get peers lock"); + break; + } + }; p.stop(); - self.peers.write().remove(&p.info.addr); + peers.remove(&p.info.addr); } } @@ -331,8 +373,15 @@ impl Peers { for p in self.connected_peers().iter() { if let Err(e) = p.send_ping(total_difficulty, height) { debug!("Error pinging peer {:?}: {:?}", &p.info.addr, e); + let mut peers = match self.peers.try_write_for(LOCK_TIMEOUT) { + Some(peers) => peers, + None => { + error!("check_all: failed to get peers lock"); + break; + } + }; p.stop(); - self.peers.write().remove(&p.info.addr); + peers.remove(&p.info.addr); } } } @@ -388,33 +437,42 @@ impl Peers { let mut rm = vec![]; // build a list of peers to be cleaned up - for peer in self.peers.read().values() { - if peer.is_banned() { - debug!("clean_peers {:?}, peer banned", peer.info.addr); - rm.push(peer.info.addr.clone()); - } else if !peer.is_connected() { - debug!("clean_peers {:?}, not connected", peer.info.addr); - rm.push(peer.info.addr.clone()); - } else if peer.is_abusive() { - if let Some(counts) = peer.last_min_message_counts() { - debug!( - "clean_peers {:?}, abusive ({} sent, {} recv)", - peer.info.addr, counts.0, counts.1, - ); + { + let peers = match self.peers.try_read_for(LOCK_TIMEOUT) { + Some(peers) => peers, + None => { + error!("clean_peers: can't get peers lock"); + return; } - let _ = self.update_state(peer.info.addr, State::Banned); - rm.push(peer.info.addr.clone()); - } else { - let (stuck, diff) = peer.is_stuck(); - match self.adapter.total_difficulty() { - Ok(total_difficulty) => { - if stuck && diff < total_difficulty { - debug!("clean_peers {:?}, stuck peer", peer.info.addr); - let _ = self.update_state(peer.info.addr, State::Defunct); - rm.push(peer.info.addr.clone()); - } + }; + for peer in peers.values() { + if peer.is_banned() { + debug!("clean_peers {:?}, peer banned", peer.info.addr); + rm.push(peer.info.addr.clone()); + } else if !peer.is_connected() { + debug!("clean_peers {:?}, not connected", peer.info.addr); + rm.push(peer.info.addr.clone()); + } else if peer.is_abusive() { + if let Some(counts) = peer.last_min_message_counts() { + debug!( + "clean_peers {:?}, abusive ({} sent, {} recv)", + peer.info.addr, counts.0, counts.1, + ); + } + let _ = self.update_state(peer.info.addr, State::Banned); + rm.push(peer.info.addr.clone()); + } else { + let (stuck, diff) = peer.is_stuck(); + match self.adapter.total_difficulty() { + Ok(total_difficulty) => { + if stuck && diff < total_difficulty { + debug!("clean_peers {:?}, stuck peer", peer.info.addr); + let _ = self.update_state(peer.info.addr, State::Defunct); + rm.push(peer.info.addr.clone()); + } + } + Err(e) => error!("failed to get total difficulty: {:?}", e), } - Err(e) => error!("failed to get total difficulty: {:?}", e), } } } @@ -436,7 +494,13 @@ impl Peers { // now clean up peer map based on the list to remove { - let mut peers = self.peers.write(); + let mut peers = match self.peers.try_write_for(LOCK_TIMEOUT) { + Some(peers) => peers, + None => { + error!("clean_peers: failed to get peers lock"); + return; + } + }; for addr in rm { let _ = peers.get(&addr).map(|peer| peer.stop()); peers.remove(&addr); @@ -447,7 +511,7 @@ impl Peers { pub fn stop(&self) { let mut peers = self.peers.write(); for (_, peer) in peers.drain() { - peer.stop(); + peer.stop_and_wait(); } } diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index 02f8fe983..9c186a985 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -12,24 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. -use rand::{thread_rng, Rng}; -use std::cmp; -use std::fs::{self, File, OpenOptions}; -use std::io::{BufWriter, Seek, SeekFrom, Write}; -use std::sync::Arc; - -use chrono::prelude::Utc; -use tempfile::tempfile; - -use crate::conn::{Message, MessageHandler, Response}; +use crate::conn::{Message, MessageHandler, Response, Tracker}; use crate::core::core::{self, hash::Hash, CompactBlock}; -use crate::util::{RateCounter, RwLock}; - use crate::msg::{ BanReason, GetPeerAddrs, Headers, KernelDataResponse, Locator, PeerAddrs, Ping, Pong, TxHashSetArchive, TxHashSetRequest, Type, }; use crate::types::{Error, NetAdapter, PeerInfo}; +use chrono::prelude::Utc; +use rand::{thread_rng, Rng}; +use std::cmp; +use std::fs::{self, File, OpenOptions}; +use std::io::{BufWriter, Seek, SeekFrom, Write}; +use std::sync::Arc; +use tempfile::tempfile; pub struct Protocol { adapter: Arc, @@ -47,7 +43,7 @@ impl MessageHandler for Protocol { &self, mut msg: Message<'a>, writer: &'a mut dyn Write, - received_bytes: Arc>, + tracker: Arc, ) -> Result>, Error> { let adapter = &self.adapter; @@ -275,7 +271,7 @@ impl MessageHandler for Protocol { // Increase received bytes quietly (without affecting the counters). // Otherwise we risk banning a peer as "abusive". - received_bytes.write().inc_quiet(size as u64); + tracker.inc_quiet_received(size as u64); } // Remember to seek back to start of the file as the caller is likely @@ -362,10 +358,7 @@ impl MessageHandler for Protocol { // Increase received bytes quietly (without affecting the counters). // Otherwise we risk banning a peer as "abusive". - { - let mut received_bytes = received_bytes.write(); - received_bytes.inc_quiet(size as u64); - } + tracker.inc_quiet_received(size as u64) } tmp_zip .into_inner() diff --git a/p2p/src/serv.rs b/p2p/src/serv.rs index 5e439ecbc..471ea94ab 100644 --- a/p2p/src/serv.rs +++ b/p2p/src/serv.rs @@ -33,7 +33,7 @@ use crate::types::{ Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan, TxHashSetRead, }; -use crate::util::{Mutex, StopState}; +use crate::util::StopState; use chrono::prelude::{DateTime, Utc}; /// P2P server implementation, handling bootstrapping to find and connect to @@ -43,7 +43,7 @@ pub struct Server { capabilities: Capabilities, handshake: Arc, pub peers: Arc, - stop_state: Arc>, + stop_state: Arc, } // TODO TLS @@ -55,7 +55,7 @@ impl Server { config: P2PConfig, adapter: Arc, genesis: Hash, - stop_state: Arc>, + stop_state: Arc, ) -> Result { Ok(Server { config: config.clone(), @@ -77,7 +77,7 @@ impl Server { let sleep_time = Duration::from_millis(5); loop { // Pause peer ingress connection request. Only for tests. - if self.stop_state.lock().is_paused() { + if self.stop_state.is_paused() { thread::sleep(Duration::from_secs(1)); continue; } @@ -89,9 +89,13 @@ impl Server { if self.check_undesirable(&stream) { continue; } - if let Err(e) = self.handle_new_peer(stream) { - debug!("Error accepting peer {}: {:?}", peer_addr.to_string(), e); - let _ = self.peers.add_banned(peer_addr, ReasonForBan::BadHandshake); + match self.handle_new_peer(stream) { + Err(Error::ConnectionClose) => debug!("shutting down, ignoring a new peer"), + Err(e) => { + debug!("Error accepting peer {}: {:?}", peer_addr.to_string(), e); + let _ = self.peers.add_banned(peer_addr, ReasonForBan::BadHandshake); + } + Ok(_) => {} } } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { @@ -101,7 +105,7 @@ impl Server { debug!("Couldn't establish new client connection: {:?}", e); } } - if self.stop_state.lock().is_stopped() { + if self.stop_state.is_stopped() { break; } thread::sleep(sleep_time); @@ -112,6 +116,10 @@ impl Server { /// Asks the server to connect to a new peer. Directly returns the peer if /// we're already connected to the provided address. pub fn connect(&self, addr: PeerAddr) -> Result, Error> { + if self.stop_state.is_stopped() { + return Err(Error::ConnectionClose); + } + if Peer::is_denied(&self.config, addr) { debug!("connect_peer: peer {} denied, not connecting.", addr); return Err(Error::ConnectionClose); @@ -169,6 +177,9 @@ impl Server { } fn handle_new_peer(&self, stream: TcpStream) -> Result<(), Error> { + if self.stop_state.is_stopped() { + return Err(Error::ConnectionClose); + } let total_diff = self.peers.total_difficulty()?; // accept the peer and add it to the server map @@ -214,7 +225,7 @@ impl Server { } pub fn stop(&self) { - self.stop_state.lock().stop(); + self.stop_state.stop(); self.peers.stop(); } diff --git a/p2p/tests/peer_handshake.rs b/p2p/tests/peer_handshake.rs index 3afe32c80..3f72bd2df 100644 --- a/p2p/tests/peer_handshake.rs +++ b/p2p/tests/peer_handshake.rs @@ -16,7 +16,7 @@ use grin_core as core; use grin_p2p as p2p; use grin_util as util; -use grin_util::{Mutex, StopState}; +use grin_util::StopState; use std::net::{SocketAddr, TcpListener, TcpStream}; use std::sync::Arc; @@ -56,7 +56,7 @@ fn peer_handshake() { p2p_config.clone(), net_adapter.clone(), Hash::from_vec(&vec![]), - Arc::new(Mutex::new(StopState::new())), + Arc::new(StopState::new()), ) .unwrap(), ); diff --git a/servers/src/common/types.rs b/servers/src/common/types.rs index 1b7c9f34d..9b119446c 100644 --- a/servers/src/common/types.rs +++ b/servers/src/common/types.rs @@ -344,6 +344,7 @@ pub enum SyncStatus { current_height: u64, highest_height: u64, }, + Shutdown, } /// Current sync state. Encapsulates the current SyncStatus. diff --git a/servers/src/grin/dandelion_monitor.rs b/servers/src/grin/dandelion_monitor.rs index 3316d4d34..685d36c9d 100644 --- a/servers/src/grin/dandelion_monitor.rs +++ b/servers/src/grin/dandelion_monitor.rs @@ -16,14 +16,14 @@ use chrono::prelude::Utc; use rand::{thread_rng, Rng}; use std::sync::Arc; use std::thread; -use std::time::Duration; +use std::time::{Duration, Instant}; use crate::common::adapters::DandelionAdapter; use crate::core::core::hash::Hashed; use crate::core::core::transaction; use crate::core::core::verifier_cache::VerifierCache; use crate::pool::{DandelionConfig, Pool, PoolEntry, PoolError, TransactionPool, TxSource}; -use crate::util::{Mutex, RwLock, StopState}; +use crate::util::{RwLock, StopState}; /// A process to monitor transactions in the stempool. /// With Dandelion, transaction can be broadcasted in stem or fluff phase. @@ -38,43 +38,54 @@ pub fn monitor_transactions( tx_pool: Arc>, adapter: Arc, verifier_cache: Arc>, - stop_state: Arc>, -) { + stop_state: Arc, +) -> std::io::Result> { debug!("Started Dandelion transaction monitor."); - let _ = thread::Builder::new() + thread::Builder::new() .name("dandelion".to_string()) .spawn(move || { + let run_interval = Duration::from_secs(10); + let mut last_run = Instant::now() + .checked_sub(Duration::from_secs(20)) + .unwrap_or_else(|| Instant::now()); loop { // Halt Dandelion monitor if we have been notified that we are stopping. - if stop_state.lock().is_stopped() { + if stop_state.is_stopped() { break; } - if !adapter.is_stem() { - let _ = - process_fluff_phase(&dandelion_config, &tx_pool, &adapter, &verifier_cache) - .map_err(|e| { - error!("dand_mon: Problem processing fluff phase. {:?}", e); - }); + if last_run.elapsed() > run_interval { + if !adapter.is_stem() { + let _ = process_fluff_phase( + &dandelion_config, + &tx_pool, + &adapter, + &verifier_cache, + ) + .map_err(|e| { + error!("dand_mon: Problem processing fluff phase. {:?}", e); + }); + } + + // Now find all expired entries based on embargo timer. + let _ = process_expired_entries(&dandelion_config, &tx_pool).map_err(|e| { + error!("dand_mon: Problem processing expired entries. {:?}", e); + }); + + // Handle the tx above *before* we transition to next epoch. + // This gives us an opportunity to do the final "fluff" before we start + // stemming on the subsequent epoch. + if adapter.is_expired() { + adapter.next_epoch(); + } + last_run = Instant::now(); } - // Now find all expired entries based on embargo timer. - let _ = process_expired_entries(&dandelion_config, &tx_pool).map_err(|e| { - error!("dand_mon: Problem processing expired entries. {:?}", e); - }); - - // Handle the tx above *before* we transition to next epoch. - // This gives us an opportunity to do the final "fluff" before we start - // stemming on the subsequent epoch. - if adapter.is_expired() { - adapter.next_epoch(); - } - - // Monitor loops every 10s. - thread::sleep(Duration::from_secs(10)); + // Monitor loops every 10s, but check stop flag every second. + thread::sleep(Duration::from_secs(1)); } - }); + }) } // Query the pool for transactions older than the cutoff. diff --git a/servers/src/grin/seed.rs b/servers/src/grin/seed.rs index cda246254..5ccd9af99 100644 --- a/servers/src/grin/seed.rs +++ b/servers/src/grin/seed.rs @@ -30,7 +30,7 @@ use crate::core::global; use crate::p2p; use crate::p2p::types::PeerAddr; use crate::p2p::ChainAdapter; -use crate::util::{Mutex, StopState}; +use crate::util::StopState; // DNS Seeds with contact email associated const MAINNET_DNS_SEEDS: &'static [&'static str] = &[ @@ -54,9 +54,9 @@ pub fn connect_and_monitor( capabilities: p2p::Capabilities, seed_list: Box Vec + Send>, preferred_peers: Option>, - stop_state: Arc>, -) { - let _ = thread::Builder::new() + stop_state: Arc, +) -> std::io::Result> { + thread::Builder::new() .name("seed".to_string()) .spawn(move || { let peers = p2p_server.peers.clone(); @@ -77,16 +77,15 @@ pub fn connect_and_monitor( let mut prev_expire_check = MIN_DATE.and_hms(0, 0, 0); let mut prev_ping = Utc::now(); let mut start_attempt = 0; - let mut connecting_history: HashMap> = HashMap::new(); loop { - if stop_state.lock().is_stopped() { + if stop_state.is_stopped() { break; } // Pause egress peer connection request. Only for tests. - if stop_state.lock().is_paused() { + if stop_state.is_paused() { thread::sleep(time::Duration::from_secs(1)); continue; } @@ -136,7 +135,7 @@ pub fn connect_and_monitor( thread::sleep(time::Duration::from_secs(1)); } - }); + }) } fn monitor_peers( @@ -326,17 +325,19 @@ fn listen_for_addrs( let peers_c = peers.clone(); let p2p_c = p2p.clone(); - let _ = thread::Builder::new() + thread::Builder::new() .name("peer_connect".to_string()) .spawn(move || match p2p_c.connect(addr) { Ok(p) => { - let _ = p.send_peer_request(capab); - let _ = peers_c.update_state(addr, p2p::State::Healthy); + if p.send_peer_request(capab).is_ok() { + let _ = peers_c.update_state(addr, p2p::State::Healthy); + } } Err(_) => { let _ = peers_c.update_state(addr, p2p::State::Defunct); } - }); + }) + .expect("failed to launch peer_connect thread"); } // shrink the connecting history. diff --git a/servers/src/grin/server.rs b/servers/src/grin/server.rs index 379228742..9a9e4c398 100644 --- a/servers/src/grin/server.rs +++ b/servers/src/grin/server.rs @@ -21,7 +21,10 @@ use std::fs::File; use std::io::prelude::*; use std::path::Path; use std::sync::Arc; -use std::{thread, time}; +use std::{ + thread::{self, JoinHandle}, + time, +}; use fs2::FileExt; @@ -44,7 +47,7 @@ use crate::p2p; use crate::p2p::types::PeerAddr; use crate::pool; use crate::util::file::get_first_line; -use crate::util::{Mutex, RwLock, StopState}; +use crate::util::{RwLock, StopState}; /// Grin server holding internal structures. pub struct Server { @@ -64,9 +67,13 @@ pub struct Server { /// To be passed around to collect stats and info state_info: ServerStateInfo, /// Stop flag - pub stop_state: Arc>, + pub stop_state: Arc, /// Maintain a lock_file so we do not run multiple Grin nodes from same dir. lock_file: Arc, + connect_thread: Option>, + sync_thread: JoinHandle<()>, + dandelion_thread: JoinHandle<()>, + p2p_thread: JoinHandle<()>, } impl Server { @@ -142,7 +149,7 @@ impl Server { Some(b) => b, }; - let stop_state = Arc::new(Mutex::new(StopState::new())); + let stop_state = Arc::new(StopState::new()); // Shared cache for verification results. // We cache rangeproof verification and kernel signature verification. @@ -180,7 +187,6 @@ impl Server { pow::verify_size, verifier_cache.clone(), archive_mode, - stop_state.clone(), )?); pool_adapter.set_chain(shared_chain.clone()); @@ -208,6 +214,8 @@ impl Server { pool_net_adapter.init(p2p_server.peers.clone()); net_adapter.init(p2p_server.peers.clone()); + let mut connect_thread = None; + if config.p2p_config.seeding_type != p2p::Seeding::Programmatic { let seeder = match config.p2p_config.seeding_type { p2p::Seeding::None => { @@ -226,13 +234,13 @@ impl Server { _ => unreachable!(), }; - seed::connect_and_monitor( + connect_thread = Some(seed::connect_and_monitor( p2p_server.clone(), config.p2p_config.capabilities, seeder, config.p2p_config.peers_preferred.clone(), stop_state.clone(), - ); + )?); } // Defaults to None (optional) in config file. @@ -240,17 +248,21 @@ impl Server { let skip_sync_wait = config.skip_sync_wait.unwrap_or(false); sync_state.update(SyncStatus::AwaitingPeers(!skip_sync_wait)); - sync::run_sync( + let sync_thread = sync::run_sync( sync_state.clone(), p2p_server.peers.clone(), shared_chain.clone(), stop_state.clone(), - ); + )?; let p2p_inner = p2p_server.clone(); - let _ = thread::Builder::new() + let p2p_thread = thread::Builder::new() .name("p2p-server".to_string()) - .spawn(move || p2p_inner.listen()); + .spawn(move || { + if let Err(e) = p2p_inner.listen() { + error!("P2P server failed with erorr: {:?}", e); + } + })?; info!("Starting rest apis at: {}", &config.api_http_addr); let api_secret = get_first_line(config.api_secret_path.clone()); @@ -269,6 +281,7 @@ impl Server { } }; + // TODO fix API shutdown and join this thread api::start_rest_apis( config.api_http_addr.clone(), shared_chain.clone(), @@ -279,13 +292,13 @@ impl Server { ); info!("Starting dandelion monitor: {}", &config.api_http_addr); - dandelion_monitor::monitor_transactions( + let dandelion_thread = dandelion_monitor::monitor_transactions( config.dandelion_config.clone(), tx_pool.clone(), pool_net_adapter.clone(), verifier_cache.clone(), stop_state.clone(), - ); + )?; warn!("Grin server started."); Ok(Server { @@ -300,6 +313,10 @@ impl Server { }, stop_state, lock_file, + connect_thread, + sync_thread, + p2p_thread, + dandelion_thread, }) } @@ -348,7 +365,7 @@ impl Server { pub fn start_test_miner( &self, wallet_listener_url: Option, - stop_state: Arc>, + stop_state: Arc, ) { info!("start_test_miner - start",); let sync_state = self.sync_state.clone(); @@ -489,15 +506,41 @@ impl Server { } /// Stop the server. - pub fn stop(&self) { + pub fn stop(self) { + { + self.sync_state.update(SyncStatus::Shutdown); + self.stop_state.stop(); + + if let Some(connect_thread) = self.connect_thread { + match connect_thread.join() { + Err(e) => error!("failed to join to connect_and_monitor thread: {:?}", e), + Ok(_) => info!("connect_and_monitor thread stopped"), + } + } else { + info!("No active connect_and_monitor thread") + } + + match self.sync_thread.join() { + Err(e) => error!("failed to join to sync thread: {:?}", e), + Ok(_) => info!("sync thread stopped"), + } + + match self.dandelion_thread.join() { + Err(e) => error!("failed to join to dandelion_monitor thread: {:?}", e), + Ok(_) => info!("dandelion_monitor thread stopped"), + } + } self.p2p.stop(); - self.stop_state.lock().stop(); + match self.p2p_thread.join() { + Err(e) => error!("failed to join to p2p thread: {:?}", e), + Ok(_) => info!("p2p thread stopped"), + } let _ = self.lock_file.unlock(); } /// Pause the p2p server. pub fn pause(&self) { - self.stop_state.lock().pause(); + self.stop_state.pause(); thread::sleep(time::Duration::from_secs(1)); self.p2p.pause(); } @@ -505,12 +548,12 @@ impl Server { /// Resume p2p server. /// TODO - We appear not to resume the p2p server (peer connections) here? pub fn resume(&self) { - self.stop_state.lock().resume(); + self.stop_state.resume(); } /// Stops the test miner without stopping the p2p layer - pub fn stop_test_miner(&self, stop: Arc>) { - stop.lock().stop(); + pub fn stop_test_miner(&self, stop: Arc) { + stop.stop(); info!("stop_test_miner - stop",); } } diff --git a/servers/src/grin/sync/body_sync.rs b/servers/src/grin/sync/body_sync.rs index 15c2b9332..687258642 100644 --- a/servers/src/grin/sync/body_sync.rs +++ b/servers/src/grin/sync/body_sync.rs @@ -141,6 +141,7 @@ impl BodySync { if let Some(peer) = peers_iter.next() { if let Err(e) = peer.send_block_request(*hash) { debug!("Skipped request to {}: {:?}", peer.info.addr, e); + peer.stop(); } else { self.blocks_requested += 1; } diff --git a/servers/src/grin/sync/syncer.rs b/servers/src/grin/sync/syncer.rs index 99d504bec..3ef061b7e 100644 --- a/servers/src/grin/sync/syncer.rs +++ b/servers/src/grin/sync/syncer.rs @@ -24,27 +24,27 @@ use crate::grin::sync::body_sync::BodySync; use crate::grin::sync::header_sync::HeaderSync; use crate::grin::sync::state_sync::StateSync; use crate::p2p; -use crate::util::{Mutex, StopState}; +use crate::util::StopState; pub fn run_sync( sync_state: Arc, peers: Arc, chain: Arc, - stop_state: Arc>, -) { - let _ = thread::Builder::new() + stop_state: Arc, +) -> std::io::Result> { + thread::Builder::new() .name("sync".to_string()) .spawn(move || { let runner = SyncRunner::new(sync_state, peers, chain, stop_state); runner.sync_loop(); - }); + }) } pub struct SyncRunner { sync_state: Arc, peers: Arc, chain: Arc, - stop_state: Arc>, + stop_state: Arc, } impl SyncRunner { @@ -52,7 +52,7 @@ impl SyncRunner { sync_state: Arc, peers: Arc, chain: Arc, - stop_state: Arc>, + stop_state: Arc, ) -> SyncRunner { SyncRunner { sync_state, @@ -77,6 +77,9 @@ impl SyncRunner { let mut n = 0; const MIN_PEERS: usize = 3; loop { + if self.stop_state.is_stopped() { + break; + } let wp = self.peers.more_or_same_work_peers()?; // exit loop when: // * we have more than MIN_PEERS more_or_same_work peers @@ -140,7 +143,7 @@ impl SyncRunner { // Main syncing loop loop { - if self.stop_state.lock().is_stopped() { + if self.stop_state.is_stopped() { break; } @@ -167,7 +170,13 @@ impl SyncRunner { unwrap_or_restart_loop!(self.chain.compact()); } - thread::sleep(time::Duration::from_secs(10)); + // sleep for 10 secs but check stop signal every second + for _ in 1..10 { + thread::sleep(time::Duration::from_secs(1)); + if self.stop_state.is_stopped() { + break; + } + } continue; } diff --git a/servers/src/mining/test_miner.rs b/servers/src/mining/test_miner.rs index d671855ea..832fc3244 100644 --- a/servers/src/mining/test_miner.rs +++ b/servers/src/mining/test_miner.rs @@ -29,14 +29,14 @@ use crate::core::core::{Block, BlockHeader}; use crate::core::global; use crate::mining::mine_block; use crate::pool; -use crate::util::{Mutex, StopState}; +use crate::util::StopState; pub struct Miner { config: StratumServerConfig, chain: Arc, tx_pool: Arc>, verifier_cache: Arc>, - stop_state: Arc>, + stop_state: Arc, // Just to hold the port we're on, so this miner can be identified // while watching debug output @@ -51,7 +51,7 @@ impl Miner { chain: Arc, tx_pool: Arc>, verifier_cache: Arc>, - stop_state: Arc>, + stop_state: Arc, ) -> Miner { Miner { config, @@ -136,7 +136,7 @@ impl Miner { let mut key_id = None; loop { - if self.stop_state.lock().is_stopped() { + if self.stop_state.is_stopped() { break; } diff --git a/src/bin/tui/status.rs b/src/bin/tui/status.rs index 6651c66b3..d5bb77657 100644 --- a/src/bin/tui/status.rs +++ b/src/bin/tui/status.rs @@ -199,6 +199,7 @@ impl TUIStatusListener for TUIStatusView { }; format!("Downloading blocks: {}%, step 4/4", percent) } + SyncStatus::Shutdown => "Shutting down, closing connections".to_string(), } }; /*let basic_mining_config_status = { diff --git a/src/bin/tui/ui.rs b/src/bin/tui/ui.rs index 6c16143ff..60071f003 100644 --- a/src/bin/tui/ui.rs +++ b/src/bin/tui/ui.rs @@ -173,8 +173,10 @@ impl Controller { while let Some(message) = self.rx.try_iter().next() { match message { ControllerMessage::Shutdown => { - server.stop(); self.ui.stop(); + println!("Shutdown in progress, please wait"); + server.stop(); + return; } } } diff --git a/util/src/lib.rs b/util/src/lib.rs index 93883e951..d4577ed43 100644 --- a/util/src/lib.rs +++ b/util/src/lib.rs @@ -53,8 +53,8 @@ pub mod read_write; // other utils #[allow(unused_imports)] use std::ops::Deref; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; - mod hex; pub use crate::hex::*; @@ -112,49 +112,45 @@ pub fn to_base64(s: &str) -> String { /// Global stopped/paused state shared across various subcomponents of Grin. /// -/// Arc> allows the chain to lock the stop_state during critical processing. -/// Other subcomponents cannot abruptly shutdown the server during block/header processing. -/// This should prevent the chain ever ending up in an inconsistent state on restart. -/// /// "Stopped" allows a clean shutdown of the Grin server. /// "Paused" is used in some tests to allow nodes to reach steady state etc. /// pub struct StopState { - stopped: bool, - paused: bool, + stopped: AtomicBool, + paused: AtomicBool, } impl StopState { /// Create a new stop_state in default "running" state. pub fn new() -> StopState { StopState { - stopped: false, - paused: false, + stopped: AtomicBool::new(false), + paused: AtomicBool::new(false), } } /// Check if we are stopped. pub fn is_stopped(&self) -> bool { - self.stopped + self.stopped.load(Ordering::Relaxed) } /// Check if we are paused. pub fn is_paused(&self) -> bool { - self.paused + self.paused.load(Ordering::Relaxed) } /// Stop the server. - pub fn stop(&mut self) { - self.stopped = true; + pub fn stop(&self) { + self.stopped.store(true, Ordering::Relaxed) } /// Pause the server (only used in tests). - pub fn pause(&mut self) { - self.paused = true; + pub fn pause(&self) { + self.paused.store(true, Ordering::Relaxed) } /// Resume a paused server (only used in tests). - pub fn resume(&mut self) { - self.paused = false; + pub fn resume(&self) { + self.paused.store(false, Ordering::Relaxed) } }