mirror of
https://github.com/mimblewimble/grin.git
synced 2025-01-20 19:11:08 +03:00
Implement graceful shutdown (#2812)
* Remove stop status mutex * remove some deadlocks * Rewrite stop channel handling * fix deadlock in peers object * add missing test fixes
This commit is contained in:
parent
884851cdeb
commit
9ab23f6eef
23 changed files with 452 additions and 293 deletions
|
@ -434,8 +434,7 @@ impl<'de> serde::de::Deserialize<'de> for OutputPrintable {
|
|||
}
|
||||
|
||||
if output_type.is_none()
|
||||
|| commit.is_none()
|
||||
|| spent.is_none()
|
||||
|| commit.is_none() || spent.is_none()
|
||||
|| proof_hash.is_none()
|
||||
|| mmr_index.is_none()
|
||||
{
|
||||
|
|
|
@ -33,7 +33,7 @@ use crate::types::{
|
|||
BlockStatus, ChainAdapter, NoStatus, Options, Tip, TxHashSetRoots, TxHashsetWriteStatus,
|
||||
};
|
||||
use crate::util::secp::pedersen::{Commitment, RangeProof};
|
||||
use crate::util::{Mutex, RwLock, StopState};
|
||||
use crate::util::RwLock;
|
||||
use grin_store::Error::NotFoundErr;
|
||||
use std::collections::HashMap;
|
||||
use std::fs::{self, File};
|
||||
|
@ -152,7 +152,6 @@ pub struct Chain {
|
|||
// POW verification function
|
||||
pow_verifier: fn(&BlockHeader) -> Result<(), pow::Error>,
|
||||
archive_mode: bool,
|
||||
stop_state: Arc<Mutex<StopState>>,
|
||||
genesis: BlockHeader,
|
||||
}
|
||||
|
||||
|
@ -167,16 +166,7 @@ impl Chain {
|
|||
pow_verifier: fn(&BlockHeader) -> Result<(), pow::Error>,
|
||||
verifier_cache: Arc<RwLock<dyn VerifierCache>>,
|
||||
archive_mode: bool,
|
||||
stop_state: Arc<Mutex<StopState>>,
|
||||
) -> Result<Chain, Error> {
|
||||
// Note: We take a lock on the stop_state here and do not release it until
|
||||
// we have finished chain initialization.
|
||||
let stop_state_local = stop_state.clone();
|
||||
let stop_lock = stop_state_local.lock();
|
||||
if stop_lock.is_stopped() {
|
||||
return Err(ErrorKind::Stopped.into());
|
||||
}
|
||||
|
||||
let store = Arc::new(store::ChainStore::new(&db_root)?);
|
||||
|
||||
// open the txhashset, creating a new one if necessary
|
||||
|
@ -194,7 +184,6 @@ impl Chain {
|
|||
pow_verifier,
|
||||
verifier_cache,
|
||||
archive_mode,
|
||||
stop_state,
|
||||
genesis: genesis.header.clone(),
|
||||
})
|
||||
}
|
||||
|
@ -283,15 +272,6 @@ impl Chain {
|
|||
/// or false if it has added to a fork (or orphan?).
|
||||
fn process_block_single(&self, b: Block, opts: Options) -> Result<Option<Tip>, Error> {
|
||||
let (maybe_new_head, prev_head) = {
|
||||
// Note: We take a lock on the stop_state here and do not release it until
|
||||
// we have finished processing this single block.
|
||||
// We take care to write both the txhashset *and* the batch while we
|
||||
// have the stop_state lock.
|
||||
let stop_lock = self.stop_state.lock();
|
||||
if stop_lock.is_stopped() {
|
||||
return Err(ErrorKind::Stopped.into());
|
||||
}
|
||||
|
||||
let mut txhashset = self.txhashset.write();
|
||||
let batch = self.store.batch()?;
|
||||
let mut ctx = self.new_ctx(opts, batch, &mut txhashset)?;
|
||||
|
@ -381,15 +361,6 @@ impl Chain {
|
|||
/// This is only ever used during sync and is based on sync_head.
|
||||
/// We update header_head here if our total work increases.
|
||||
pub fn sync_block_headers(&self, headers: &[BlockHeader], opts: Options) -> Result<(), Error> {
|
||||
// Note: We take a lock on the stop_state here and do not release it until
|
||||
// we have finished processing this single block.
|
||||
// We take care to write both the txhashset *and* the batch while we
|
||||
// have the stop_state lock.
|
||||
let stop_lock = self.stop_state.lock();
|
||||
if stop_lock.is_stopped() {
|
||||
return Err(ErrorKind::Stopped.into());
|
||||
}
|
||||
|
||||
let mut txhashset = self.txhashset.write();
|
||||
let batch = self.store.batch()?;
|
||||
let mut ctx = self.new_ctx(opts, batch, &mut txhashset)?;
|
||||
|
@ -1098,14 +1069,6 @@ impl Chain {
|
|||
}
|
||||
}
|
||||
|
||||
// Note: We take a lock on the stop_state here and do not release it until
|
||||
// we have finished processing this chain compaction operation.
|
||||
// We want to avoid shutting the node down in the middle of compacting the data.
|
||||
let stop_lock = self.stop_state.lock();
|
||||
if stop_lock.is_stopped() {
|
||||
return Err(ErrorKind::Stopped.into());
|
||||
}
|
||||
|
||||
// Take a write lock on the txhashet and start a new writeable db batch.
|
||||
let mut txhashset = self.txhashset.write();
|
||||
let mut batch = self.store.batch()?;
|
||||
|
|
|
@ -384,7 +384,6 @@ impl<'a> Batch<'a> {
|
|||
let key = to_key(BLOCK_PREFIX, &mut "".to_string().into_bytes());
|
||||
self.db.iter(&key)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/// An iterator on blocks, from latest to earliest, specialized to return
|
||||
|
|
|
@ -21,7 +21,7 @@ use self::core::libtx;
|
|||
use self::core::pow::{self, Difficulty};
|
||||
use self::core::{consensus, genesis};
|
||||
use self::keychain::{ExtKeychain, ExtKeychainPath, Keychain};
|
||||
use self::util::{Mutex, RwLock, StopState};
|
||||
use self::util::RwLock;
|
||||
use chrono::Duration;
|
||||
use grin_chain as chain;
|
||||
use grin_core as core;
|
||||
|
@ -47,7 +47,6 @@ fn setup(dir_name: &str) -> Chain {
|
|||
pow::verify_size,
|
||||
verifier_cache,
|
||||
false,
|
||||
Arc::new(Mutex::new(StopState::new())),
|
||||
)
|
||||
.unwrap()
|
||||
}
|
||||
|
@ -61,7 +60,6 @@ fn reload_chain(dir_name: &str) -> Chain {
|
|||
pow::verify_size,
|
||||
verifier_cache,
|
||||
false,
|
||||
Arc::new(Mutex::new(StopState::new())),
|
||||
)
|
||||
.unwrap()
|
||||
}
|
||||
|
|
|
@ -23,7 +23,7 @@ use self::core::libtx::{self, build, reward};
|
|||
use self::core::pow::Difficulty;
|
||||
use self::core::{consensus, global, pow};
|
||||
use self::keychain::{ExtKeychain, ExtKeychainPath, Keychain};
|
||||
use self::util::{Mutex, RwLock, StopState};
|
||||
use self::util::{RwLock, StopState};
|
||||
use chrono::Duration;
|
||||
use grin_chain as chain;
|
||||
use grin_core as core;
|
||||
|
@ -47,7 +47,6 @@ fn setup(dir_name: &str, genesis: Block) -> Chain {
|
|||
pow::verify_size,
|
||||
verifier_cache,
|
||||
false,
|
||||
Arc::new(Mutex::new(StopState::new())),
|
||||
)
|
||||
.unwrap()
|
||||
}
|
||||
|
@ -565,7 +564,6 @@ fn actual_diff_iter_output() {
|
|||
pow::verify_size,
|
||||
verifier_cache,
|
||||
false,
|
||||
Arc::new(Mutex::new(StopState::new())),
|
||||
)
|
||||
.unwrap();
|
||||
let iter = chain.difficulty_iter().unwrap();
|
||||
|
|
|
@ -20,7 +20,7 @@ use self::core::libtx::{self, build};
|
|||
use self::core::pow::Difficulty;
|
||||
use self::core::{consensus, pow};
|
||||
use self::keychain::{ExtKeychain, ExtKeychainPath, Keychain};
|
||||
use self::util::{Mutex, RwLock, StopState};
|
||||
use self::util::{RwLock, StopState};
|
||||
use chrono::Duration;
|
||||
use env_logger;
|
||||
use grin_chain as chain;
|
||||
|
@ -53,7 +53,6 @@ fn test_coinbase_maturity() {
|
|||
pow::verify_size,
|
||||
verifier_cache,
|
||||
false,
|
||||
Arc::new(Mutex::new(StopState::new())),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
|
|
@ -281,7 +281,7 @@ fn setup_chain(dir_name: &str, genesis: core::core::Block) -> chain::Chain {
|
|||
core::pow::verify_size,
|
||||
verifier_cache,
|
||||
false,
|
||||
Arc::new(util::Mutex::new(util::StopState::new())),
|
||||
Arc::new(util::StopState::new()),
|
||||
)
|
||||
.unwrap()
|
||||
}
|
||||
|
|
167
p2p/src/conn.rs
167
p2p/src/conn.rs
|
@ -24,7 +24,11 @@ use std::fs::File;
|
|||
use std::io::{self, Read, Write};
|
||||
use std::net::{Shutdown, TcpStream};
|
||||
use std::sync::{mpsc, Arc};
|
||||
use std::{cmp, thread, time};
|
||||
use std::{
|
||||
cmp,
|
||||
thread::{self, JoinHandle},
|
||||
time,
|
||||
};
|
||||
|
||||
use crate::core::ser;
|
||||
use crate::core::ser::FixedLength;
|
||||
|
@ -43,7 +47,7 @@ pub trait MessageHandler: Send + 'static {
|
|||
&self,
|
||||
msg: Message<'a>,
|
||||
writer: &'a mut dyn Write,
|
||||
received_bytes: Arc<RwLock<RateCounter>>,
|
||||
tracker: Arc<Tracker>,
|
||||
) -> Result<Option<Response<'a>>, Error>;
|
||||
}
|
||||
|
||||
|
@ -130,15 +134,12 @@ impl<'a> Response<'a> {
|
|||
})
|
||||
}
|
||||
|
||||
fn write(mut self, sent_bytes: Arc<RwLock<RateCounter>>) -> Result<(), Error> {
|
||||
fn write(mut self, tracker: Arc<Tracker>) -> Result<(), Error> {
|
||||
let mut msg = ser::ser_vec(&MsgHeader::new(self.resp_type, self.body.len() as u64))?;
|
||||
msg.append(&mut self.body);
|
||||
write_all(&mut self.stream, &msg[..], time::Duration::from_secs(10))?;
|
||||
// Increase sent bytes counter
|
||||
{
|
||||
let mut sent_bytes = sent_bytes.write();
|
||||
sent_bytes.inc(msg.len() as u64);
|
||||
}
|
||||
tracker.inc_sent(msg.len() as u64);
|
||||
|
||||
if let Some(mut file) = self.attachment {
|
||||
let mut buf = [0u8; 8000];
|
||||
loop {
|
||||
|
@ -148,8 +149,7 @@ impl<'a> Response<'a> {
|
|||
write_all(&mut self.stream, &buf[..n], time::Duration::from_secs(10))?;
|
||||
// Increase sent bytes "quietly" without incrementing the counter.
|
||||
// (In a loop here for the single attachment).
|
||||
let mut sent_bytes = sent_bytes.write();
|
||||
sent_bytes.inc_quiet(n as u64);
|
||||
tracker.inc_quiet_sent(n as u64);
|
||||
}
|
||||
Err(e) => return Err(From::from(e)),
|
||||
}
|
||||
|
@ -165,72 +165,120 @@ impl<'a> Response<'a> {
|
|||
|
||||
pub const SEND_CHANNEL_CAP: usize = 10;
|
||||
|
||||
pub struct Tracker {
|
||||
/// Bytes we've sent.
|
||||
pub sent_bytes: Arc<RwLock<RateCounter>>,
|
||||
/// Bytes we've received.
|
||||
pub received_bytes: Arc<RwLock<RateCounter>>,
|
||||
/// Channel to allow sending data through the connection
|
||||
pub send_channel: mpsc::SyncSender<Vec<u8>>,
|
||||
pub struct StopHandle {
|
||||
/// Channel to close the connection
|
||||
pub close_channel: mpsc::Sender<()>,
|
||||
// we need Option to take ownhership of the handle in stop()
|
||||
peer_thread: Option<JoinHandle<()>>,
|
||||
}
|
||||
|
||||
impl Tracker {
|
||||
pub fn send<T>(&self, body: T, msg_type: Type) -> Result<(), Error>
|
||||
impl StopHandle {
|
||||
/// Schedule this connection to safely close via the async close_channel.
|
||||
pub fn stop(&self) {
|
||||
if self.close_channel.send(()).is_err() {
|
||||
debug!("peer's close_channel is disconnected, must be stopped already");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn stop_and_wait(&mut self) {
|
||||
self.stop();
|
||||
if let Some(peer_thread) = self.peer_thread.take() {
|
||||
// wait only if other thread is calling us, eg shutdown
|
||||
if thread::current().id() != peer_thread.thread().id() {
|
||||
debug!("waiting for thread {:?} exit", peer_thread.thread().id());
|
||||
if let Err(e) = peer_thread.join() {
|
||||
error!("failed to stop peer thread: {:?}", e);
|
||||
}
|
||||
} else {
|
||||
debug!(
|
||||
"attempt to stop thread {:?} from itself",
|
||||
peer_thread.thread().id()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ConnHandle {
|
||||
/// Channel to allow sending data through the connection
|
||||
pub send_channel: mpsc::SyncSender<Vec<u8>>,
|
||||
}
|
||||
|
||||
impl ConnHandle {
|
||||
pub fn send<T>(&self, body: T, msg_type: Type) -> Result<u64, Error>
|
||||
where
|
||||
T: ser::Writeable,
|
||||
{
|
||||
let buf = write_to_buf(body, msg_type)?;
|
||||
let buf_len = buf.len();
|
||||
self.send_channel.try_send(buf)?;
|
||||
Ok(buf_len as u64)
|
||||
}
|
||||
}
|
||||
|
||||
// Increase sent bytes counter
|
||||
let mut sent_bytes = self.sent_bytes.write();
|
||||
sent_bytes.inc(buf_len as u64);
|
||||
pub struct Tracker {
|
||||
/// Bytes we've sent.
|
||||
pub sent_bytes: Arc<RwLock<RateCounter>>,
|
||||
/// Bytes we've received.
|
||||
pub received_bytes: Arc<RwLock<RateCounter>>,
|
||||
}
|
||||
|
||||
Ok(())
|
||||
impl Tracker {
|
||||
pub fn new() -> Tracker {
|
||||
let received_bytes = Arc::new(RwLock::new(RateCounter::new()));
|
||||
let sent_bytes = Arc::new(RwLock::new(RateCounter::new()));
|
||||
Tracker {
|
||||
received_bytes,
|
||||
sent_bytes,
|
||||
}
|
||||
}
|
||||
|
||||
/// Schedule this connection to safely close via the async close_channel.
|
||||
pub fn close(&self) {
|
||||
let _ = self.close_channel.send(());
|
||||
pub fn inc_received(&self, size: u64) {
|
||||
self.received_bytes.write().inc(size);
|
||||
}
|
||||
|
||||
pub fn inc_sent(&self, size: u64) {
|
||||
self.sent_bytes.write().inc(size);
|
||||
}
|
||||
|
||||
pub fn inc_quiet_received(&self, size: u64) {
|
||||
self.received_bytes.write().inc_quiet(size);
|
||||
}
|
||||
|
||||
pub fn inc_quiet_sent(&self, size: u64) {
|
||||
self.sent_bytes.write().inc_quiet(size);
|
||||
}
|
||||
}
|
||||
|
||||
/// Start listening on the provided connection and wraps it. Does not hang
|
||||
/// the current thread, instead just returns a future and the Connection
|
||||
/// itself.
|
||||
pub fn listen<H>(stream: TcpStream, handler: H) -> Tracker
|
||||
pub fn listen<H>(
|
||||
stream: TcpStream,
|
||||
tracker: Arc<Tracker>,
|
||||
handler: H,
|
||||
) -> io::Result<(ConnHandle, StopHandle)>
|
||||
where
|
||||
H: MessageHandler,
|
||||
{
|
||||
let (send_tx, send_rx) = mpsc::sync_channel(SEND_CHANNEL_CAP);
|
||||
let (close_tx, close_rx) = mpsc::channel();
|
||||
|
||||
// Counter of number of bytes received
|
||||
let received_bytes = Arc::new(RwLock::new(RateCounter::new()));
|
||||
// Counter of number of bytes sent
|
||||
let sent_bytes = Arc::new(RwLock::new(RateCounter::new()));
|
||||
|
||||
stream
|
||||
.set_nonblocking(true)
|
||||
.expect("Non-blocking IO not available.");
|
||||
poll(
|
||||
stream,
|
||||
handler,
|
||||
send_rx,
|
||||
close_rx,
|
||||
received_bytes.clone(),
|
||||
sent_bytes.clone(),
|
||||
);
|
||||
let peer_thread = poll(stream, handler, send_rx, close_rx, tracker)?;
|
||||
|
||||
Tracker {
|
||||
sent_bytes: sent_bytes.clone(),
|
||||
received_bytes: received_bytes.clone(),
|
||||
send_channel: send_tx,
|
||||
close_channel: close_tx,
|
||||
}
|
||||
Ok((
|
||||
ConnHandle {
|
||||
send_channel: send_tx,
|
||||
},
|
||||
StopHandle {
|
||||
close_channel: close_tx,
|
||||
peer_thread: Some(peer_thread),
|
||||
},
|
||||
))
|
||||
}
|
||||
|
||||
fn poll<H>(
|
||||
|
@ -238,16 +286,16 @@ fn poll<H>(
|
|||
handler: H,
|
||||
send_rx: mpsc::Receiver<Vec<u8>>,
|
||||
close_rx: mpsc::Receiver<()>,
|
||||
received_bytes: Arc<RwLock<RateCounter>>,
|
||||
sent_bytes: Arc<RwLock<RateCounter>>,
|
||||
) where
|
||||
tracker: Arc<Tracker>,
|
||||
) -> io::Result<JoinHandle<()>>
|
||||
where
|
||||
H: MessageHandler,
|
||||
{
|
||||
// Split out tcp stream out into separate reader/writer halves.
|
||||
let mut reader = conn.try_clone().expect("clone conn for reader failed");
|
||||
let mut writer = conn.try_clone().expect("clone conn for writer failed");
|
||||
|
||||
let _ = thread::Builder::new()
|
||||
thread::Builder::new()
|
||||
.name("peer".to_string())
|
||||
.spawn(move || {
|
||||
let sleep_time = time::Duration::from_millis(5);
|
||||
|
@ -265,19 +313,17 @@ fn poll<H>(
|
|||
);
|
||||
|
||||
// Increase received bytes counter
|
||||
received_bytes
|
||||
.write()
|
||||
.inc(MsgHeader::LEN as u64 + msg.header.msg_len);
|
||||
tracker.inc_received(MsgHeader::LEN as u64 + msg.header.msg_len);
|
||||
|
||||
if let Some(Some(resp)) =
|
||||
try_break!(handler.consume(msg, &mut writer, received_bytes.clone()))
|
||||
try_break!(handler.consume(msg, &mut writer, tracker.clone()))
|
||||
{
|
||||
try_break!(resp.write(sent_bytes.clone()));
|
||||
try_break!(resp.write(tracker.clone()));
|
||||
}
|
||||
}
|
||||
Some(MsgHeaderWrapper::Unknown(msg_len)) => {
|
||||
// Increase received bytes counter
|
||||
received_bytes.write().inc(MsgHeader::LEN as u64 + msg_len);
|
||||
tracker.inc_received(MsgHeader::LEN as u64 + msg_len);
|
||||
|
||||
try_break!(read_discard(msg_len, &mut reader));
|
||||
}
|
||||
|
@ -288,7 +334,12 @@ fn poll<H>(
|
|||
let maybe_data = retry_send.or_else(|_| send_rx.try_recv());
|
||||
retry_send = Err(());
|
||||
if let Ok(data) = maybe_data {
|
||||
let written = try_break!(writer.write_all(&data[..]).map_err(&From::from));
|
||||
let written = try_break!(write_all(
|
||||
&mut writer,
|
||||
&data[..],
|
||||
std::time::Duration::from_secs(10)
|
||||
)
|
||||
.map_err(&From::from));
|
||||
if written.is_none() {
|
||||
retry_send = Ok(data);
|
||||
}
|
||||
|
@ -309,5 +360,5 @@ fn poll<H>(
|
|||
.unwrap_or("?".to_owned())
|
||||
);
|
||||
let _ = conn.shutdown(Shutdown::Both);
|
||||
});
|
||||
})
|
||||
}
|
||||
|
|
|
@ -54,7 +54,12 @@ pub struct Peer {
|
|||
state: Arc<RwLock<State>>,
|
||||
// set of all hashes known to this peer (so no need to send)
|
||||
tracking_adapter: TrackingAdapter,
|
||||
connection: Mutex<conn::Tracker>,
|
||||
tracker: Arc<conn::Tracker>,
|
||||
send_handle: Mutex<conn::ConnHandle>,
|
||||
// we need a special lock for stop operation, can't reuse handle mutex for that
|
||||
// because it may be locked by different reasons, so we should wait for that, close
|
||||
// mutex can be taken only during shutdown, it happens once
|
||||
stop_handle: Mutex<conn::StopHandle>,
|
||||
}
|
||||
|
||||
impl fmt::Debug for Peer {
|
||||
|
@ -65,17 +70,22 @@ impl fmt::Debug for Peer {
|
|||
|
||||
impl Peer {
|
||||
// Only accept and connect can be externally used to build a peer
|
||||
fn new(info: PeerInfo, conn: TcpStream, adapter: Arc<dyn NetAdapter>) -> Peer {
|
||||
fn new(info: PeerInfo, conn: TcpStream, adapter: Arc<dyn NetAdapter>) -> std::io::Result<Peer> {
|
||||
let state = Arc::new(RwLock::new(State::Connected));
|
||||
let tracking_adapter = TrackingAdapter::new(adapter);
|
||||
let handler = Protocol::new(Arc::new(tracking_adapter.clone()), info.clone());
|
||||
let connection = Mutex::new(conn::listen(conn, handler));
|
||||
Peer {
|
||||
let tracker = Arc::new(conn::Tracker::new());
|
||||
let (sendh, stoph) = conn::listen(conn, tracker.clone(), handler)?;
|
||||
let send_handle = Mutex::new(sendh);
|
||||
let stop_handle = Mutex::new(stoph);
|
||||
Ok(Peer {
|
||||
info,
|
||||
state,
|
||||
tracking_adapter,
|
||||
connection,
|
||||
}
|
||||
tracker,
|
||||
send_handle,
|
||||
stop_handle,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn accept(
|
||||
|
@ -88,7 +98,7 @@ impl Peer {
|
|||
debug!("accept: handshaking from {:?}", conn.peer_addr());
|
||||
let info = hs.accept(capab, total_difficulty, &mut conn);
|
||||
match info {
|
||||
Ok(info) => Ok(Peer::new(info, conn, adapter)),
|
||||
Ok(info) => Ok(Peer::new(info, conn, adapter)?),
|
||||
Err(e) => {
|
||||
debug!(
|
||||
"accept: handshaking from {:?} failed with error: {:?}",
|
||||
|
@ -114,7 +124,7 @@ impl Peer {
|
|||
debug!("connect: handshaking with {:?}", conn.peer_addr());
|
||||
let info = hs.initiate(capab, total_difficulty, self_addr, &mut conn);
|
||||
match info {
|
||||
Ok(info) => Ok(Peer::new(info, conn, adapter)),
|
||||
Ok(info) => Ok(Peer::new(info, conn, adapter)?),
|
||||
Err(e) => {
|
||||
debug!(
|
||||
"connect: handshaking with {:?} failed with error: {:?}",
|
||||
|
@ -184,30 +194,26 @@ impl Peer {
|
|||
|
||||
/// Whether the peer is considered abusive, mostly for spammy nodes
|
||||
pub fn is_abusive(&self) -> bool {
|
||||
let conn = self.connection.lock();
|
||||
let rec = conn.received_bytes.read();
|
||||
let sent = conn.sent_bytes.read();
|
||||
let rec = self.tracker.received_bytes.read();
|
||||
let sent = self.tracker.sent_bytes.read();
|
||||
rec.count_per_min() > MAX_PEER_MSG_PER_MIN || sent.count_per_min() > MAX_PEER_MSG_PER_MIN
|
||||
}
|
||||
|
||||
/// Number of bytes sent to the peer
|
||||
pub fn last_min_sent_bytes(&self) -> Option<u64> {
|
||||
let conn = self.connection.lock();
|
||||
let sent_bytes = conn.sent_bytes.read();
|
||||
let sent_bytes = self.tracker.sent_bytes.read();
|
||||
Some(sent_bytes.bytes_per_min())
|
||||
}
|
||||
|
||||
/// Number of bytes received from the peer
|
||||
pub fn last_min_received_bytes(&self) -> Option<u64> {
|
||||
let conn = self.connection.lock();
|
||||
let received_bytes = conn.received_bytes.read();
|
||||
let received_bytes = self.tracker.received_bytes.read();
|
||||
Some(received_bytes.bytes_per_min())
|
||||
}
|
||||
|
||||
pub fn last_min_message_counts(&self) -> Option<(u64, u64)> {
|
||||
let conn = self.connection.lock();
|
||||
let received_bytes = conn.received_bytes.read();
|
||||
let sent_bytes = conn.sent_bytes.read();
|
||||
let received_bytes = self.tracker.received_bytes.read();
|
||||
let sent_bytes = self.tracker.sent_bytes.read();
|
||||
Some((sent_bytes.count_per_min(), received_bytes.count_per_min()))
|
||||
}
|
||||
|
||||
|
@ -218,7 +224,9 @@ impl Peer {
|
|||
|
||||
/// Send a msg with given msg_type to our peer via the connection.
|
||||
fn send<T: Writeable>(&self, msg: T, msg_type: Type) -> Result<(), Error> {
|
||||
self.connection.lock().send(msg, msg_type)
|
||||
let bytes = self.send_handle.lock().send(msg, msg_type)?;
|
||||
self.tracker.inc_sent(bytes);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Send a ping to the remote peer, providing our local difficulty and
|
||||
|
@ -384,14 +392,25 @@ impl Peer {
|
|||
|
||||
pub fn send_kernel_data_request(&self) -> Result<(), Error> {
|
||||
debug!("Asking {} for kernel data.", self.info.addr);
|
||||
self.connection
|
||||
.lock()
|
||||
.send(&KernelDataRequest {}, msg::Type::KernelDataRequest)
|
||||
self.send(&KernelDataRequest {}, msg::Type::KernelDataRequest)
|
||||
}
|
||||
|
||||
/// Stops the peer, closing its connection
|
||||
/// Stops the peer
|
||||
pub fn stop(&self) {
|
||||
self.connection.lock().close();
|
||||
debug!("Stopping peer without waiting {:?}", self.info.addr);
|
||||
match self.stop_handle.try_lock() {
|
||||
Some(handle) => handle.stop(),
|
||||
None => error!("can't get stop lock for peer"),
|
||||
}
|
||||
}
|
||||
|
||||
/// Stops the peer and wait until peer's thread exit
|
||||
pub fn stop_and_wait(&self) {
|
||||
debug!("Stopping peer {:?}", self.info.addr);
|
||||
match self.stop_handle.try_lock() {
|
||||
Some(mut handle) => handle.stop_and_wait(),
|
||||
None => error!("can't get stop lock for peer"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
148
p2p/src/peers.rs
148
p2p/src/peers.rs
|
@ -27,15 +27,16 @@ use crate::core::core;
|
|||
use crate::core::core::hash::{Hash, Hashed};
|
||||
use crate::core::global;
|
||||
use crate::core::pow::Difficulty;
|
||||
use chrono::prelude::*;
|
||||
use chrono::Duration;
|
||||
|
||||
use crate::peer::Peer;
|
||||
use crate::store::{PeerData, PeerStore, State};
|
||||
use crate::types::{
|
||||
Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan,
|
||||
TxHashSetRead, MAX_PEER_ADDRS,
|
||||
};
|
||||
use chrono::prelude::*;
|
||||
use chrono::Duration;
|
||||
|
||||
const LOCK_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(2);
|
||||
|
||||
pub struct Peers {
|
||||
pub adapter: Arc<dyn ChainAdapter>,
|
||||
|
@ -57,6 +58,13 @@ impl Peers {
|
|||
/// Adds the peer to our internal peer mapping. Note that the peer is still
|
||||
/// returned so the server can run it.
|
||||
pub fn add_connected(&self, peer: Arc<Peer>) -> Result<(), Error> {
|
||||
let mut peers = match self.peers.try_write_for(LOCK_TIMEOUT) {
|
||||
Some(peers) => peers,
|
||||
None => {
|
||||
error!("add_connected: failed to get peers lock");
|
||||
return Err(Error::Timeout);
|
||||
}
|
||||
};
|
||||
let peer_data = PeerData {
|
||||
addr: peer.info.addr,
|
||||
capabilities: peer.info.capabilities,
|
||||
|
@ -68,7 +76,7 @@ impl Peers {
|
|||
};
|
||||
debug!("Saving newly connected peer {}.", peer_data.addr);
|
||||
self.save_peer(&peer_data)?;
|
||||
self.peers.write().insert(peer_data.addr, peer.clone());
|
||||
peers.insert(peer_data.addr, peer.clone());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -90,14 +98,26 @@ impl Peers {
|
|||
}
|
||||
|
||||
pub fn is_known(&self, addr: PeerAddr) -> bool {
|
||||
self.peers.read().contains_key(&addr)
|
||||
let peers = match self.peers.try_read_for(LOCK_TIMEOUT) {
|
||||
Some(peers) => peers,
|
||||
None => {
|
||||
error!("is_known: failed to get peers lock");
|
||||
return false;
|
||||
}
|
||||
};
|
||||
peers.contains_key(&addr)
|
||||
}
|
||||
|
||||
/// Get vec of peers we are currently connected to.
|
||||
pub fn connected_peers(&self) -> Vec<Arc<Peer>> {
|
||||
let mut res = self
|
||||
.peers
|
||||
.read()
|
||||
let peers = match self.peers.try_read_for(LOCK_TIMEOUT) {
|
||||
Some(peers) => peers,
|
||||
None => {
|
||||
error!("connected_peers: failed to get peers lock");
|
||||
return vec![];
|
||||
}
|
||||
};
|
||||
let mut res = peers
|
||||
.values()
|
||||
.filter(|p| p.is_connected())
|
||||
.cloned()
|
||||
|
@ -115,7 +135,14 @@ impl Peers {
|
|||
|
||||
/// Get a peer we're connected to by address.
|
||||
pub fn get_connected_peer(&self, addr: PeerAddr) -> Option<Arc<Peer>> {
|
||||
self.peers.read().get(&addr).map(|p| p.clone())
|
||||
let peers = match self.peers.try_read_for(LOCK_TIMEOUT) {
|
||||
Some(peers) => peers,
|
||||
None => {
|
||||
error!("get_connected_peer: failed to get peers lock");
|
||||
return None;
|
||||
}
|
||||
};
|
||||
peers.get(&addr).map(|p| p.clone())
|
||||
}
|
||||
|
||||
/// Number of peers currently connected to.
|
||||
|
@ -204,9 +231,7 @@ impl Peers {
|
|||
|
||||
pub fn is_banned(&self, peer_addr: PeerAddr) -> bool {
|
||||
if let Ok(peer) = self.store.get_peer(peer_addr) {
|
||||
if peer.flags == State::Banned {
|
||||
return true;
|
||||
}
|
||||
return peer.flags == State::Banned;
|
||||
}
|
||||
false
|
||||
}
|
||||
|
@ -215,6 +240,7 @@ impl Peers {
|
|||
pub fn ban_peer(&self, peer_addr: PeerAddr, ban_reason: ReasonForBan) {
|
||||
if let Err(e) = self.update_state(peer_addr, State::Banned) {
|
||||
error!("Couldn't ban {}: {:?}", peer_addr, e);
|
||||
return;
|
||||
}
|
||||
|
||||
if let Some(peer) = self.get_connected_peer(peer_addr) {
|
||||
|
@ -226,7 +252,15 @@ impl Peers {
|
|||
};
|
||||
peer.set_banned();
|
||||
peer.stop();
|
||||
self.peers.write().remove(&peer.info.addr);
|
||||
|
||||
let mut peers = match self.peers.try_write_for(LOCK_TIMEOUT) {
|
||||
Some(peers) => peers,
|
||||
None => {
|
||||
error!("ban_peer: failed to get peers lock");
|
||||
return;
|
||||
}
|
||||
};
|
||||
peers.remove(&peer.info.addr);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -264,8 +298,16 @@ impl Peers {
|
|||
"Error sending {:?} to peer {:?}: {:?}",
|
||||
obj_name, &p.info.addr, e
|
||||
);
|
||||
|
||||
let mut peers = match self.peers.try_write_for(LOCK_TIMEOUT) {
|
||||
Some(peers) => peers,
|
||||
None => {
|
||||
error!("broadcast: failed to get peers lock");
|
||||
break;
|
||||
}
|
||||
};
|
||||
p.stop();
|
||||
self.peers.write().remove(&p.info.addr);
|
||||
peers.remove(&p.info.addr);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -331,8 +373,15 @@ impl Peers {
|
|||
for p in self.connected_peers().iter() {
|
||||
if let Err(e) = p.send_ping(total_difficulty, height) {
|
||||
debug!("Error pinging peer {:?}: {:?}", &p.info.addr, e);
|
||||
let mut peers = match self.peers.try_write_for(LOCK_TIMEOUT) {
|
||||
Some(peers) => peers,
|
||||
None => {
|
||||
error!("check_all: failed to get peers lock");
|
||||
break;
|
||||
}
|
||||
};
|
||||
p.stop();
|
||||
self.peers.write().remove(&p.info.addr);
|
||||
peers.remove(&p.info.addr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -388,33 +437,42 @@ impl Peers {
|
|||
let mut rm = vec![];
|
||||
|
||||
// build a list of peers to be cleaned up
|
||||
for peer in self.peers.read().values() {
|
||||
if peer.is_banned() {
|
||||
debug!("clean_peers {:?}, peer banned", peer.info.addr);
|
||||
rm.push(peer.info.addr.clone());
|
||||
} else if !peer.is_connected() {
|
||||
debug!("clean_peers {:?}, not connected", peer.info.addr);
|
||||
rm.push(peer.info.addr.clone());
|
||||
} else if peer.is_abusive() {
|
||||
if let Some(counts) = peer.last_min_message_counts() {
|
||||
debug!(
|
||||
"clean_peers {:?}, abusive ({} sent, {} recv)",
|
||||
peer.info.addr, counts.0, counts.1,
|
||||
);
|
||||
{
|
||||
let peers = match self.peers.try_read_for(LOCK_TIMEOUT) {
|
||||
Some(peers) => peers,
|
||||
None => {
|
||||
error!("clean_peers: can't get peers lock");
|
||||
return;
|
||||
}
|
||||
let _ = self.update_state(peer.info.addr, State::Banned);
|
||||
rm.push(peer.info.addr.clone());
|
||||
} else {
|
||||
let (stuck, diff) = peer.is_stuck();
|
||||
match self.adapter.total_difficulty() {
|
||||
Ok(total_difficulty) => {
|
||||
if stuck && diff < total_difficulty {
|
||||
debug!("clean_peers {:?}, stuck peer", peer.info.addr);
|
||||
let _ = self.update_state(peer.info.addr, State::Defunct);
|
||||
rm.push(peer.info.addr.clone());
|
||||
}
|
||||
};
|
||||
for peer in peers.values() {
|
||||
if peer.is_banned() {
|
||||
debug!("clean_peers {:?}, peer banned", peer.info.addr);
|
||||
rm.push(peer.info.addr.clone());
|
||||
} else if !peer.is_connected() {
|
||||
debug!("clean_peers {:?}, not connected", peer.info.addr);
|
||||
rm.push(peer.info.addr.clone());
|
||||
} else if peer.is_abusive() {
|
||||
if let Some(counts) = peer.last_min_message_counts() {
|
||||
debug!(
|
||||
"clean_peers {:?}, abusive ({} sent, {} recv)",
|
||||
peer.info.addr, counts.0, counts.1,
|
||||
);
|
||||
}
|
||||
let _ = self.update_state(peer.info.addr, State::Banned);
|
||||
rm.push(peer.info.addr.clone());
|
||||
} else {
|
||||
let (stuck, diff) = peer.is_stuck();
|
||||
match self.adapter.total_difficulty() {
|
||||
Ok(total_difficulty) => {
|
||||
if stuck && diff < total_difficulty {
|
||||
debug!("clean_peers {:?}, stuck peer", peer.info.addr);
|
||||
let _ = self.update_state(peer.info.addr, State::Defunct);
|
||||
rm.push(peer.info.addr.clone());
|
||||
}
|
||||
}
|
||||
Err(e) => error!("failed to get total difficulty: {:?}", e),
|
||||
}
|
||||
Err(e) => error!("failed to get total difficulty: {:?}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -436,7 +494,13 @@ impl Peers {
|
|||
|
||||
// now clean up peer map based on the list to remove
|
||||
{
|
||||
let mut peers = self.peers.write();
|
||||
let mut peers = match self.peers.try_write_for(LOCK_TIMEOUT) {
|
||||
Some(peers) => peers,
|
||||
None => {
|
||||
error!("clean_peers: failed to get peers lock");
|
||||
return;
|
||||
}
|
||||
};
|
||||
for addr in rm {
|
||||
let _ = peers.get(&addr).map(|peer| peer.stop());
|
||||
peers.remove(&addr);
|
||||
|
@ -447,7 +511,7 @@ impl Peers {
|
|||
pub fn stop(&self) {
|
||||
let mut peers = self.peers.write();
|
||||
for (_, peer) in peers.drain() {
|
||||
peer.stop();
|
||||
peer.stop_and_wait();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -12,24 +12,20 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use rand::{thread_rng, Rng};
|
||||
use std::cmp;
|
||||
use std::fs::{self, File, OpenOptions};
|
||||
use std::io::{BufWriter, Seek, SeekFrom, Write};
|
||||
use std::sync::Arc;
|
||||
|
||||
use chrono::prelude::Utc;
|
||||
use tempfile::tempfile;
|
||||
|
||||
use crate::conn::{Message, MessageHandler, Response};
|
||||
use crate::conn::{Message, MessageHandler, Response, Tracker};
|
||||
use crate::core::core::{self, hash::Hash, CompactBlock};
|
||||
use crate::util::{RateCounter, RwLock};
|
||||
|
||||
use crate::msg::{
|
||||
BanReason, GetPeerAddrs, Headers, KernelDataResponse, Locator, PeerAddrs, Ping, Pong,
|
||||
TxHashSetArchive, TxHashSetRequest, Type,
|
||||
};
|
||||
use crate::types::{Error, NetAdapter, PeerInfo};
|
||||
use chrono::prelude::Utc;
|
||||
use rand::{thread_rng, Rng};
|
||||
use std::cmp;
|
||||
use std::fs::{self, File, OpenOptions};
|
||||
use std::io::{BufWriter, Seek, SeekFrom, Write};
|
||||
use std::sync::Arc;
|
||||
use tempfile::tempfile;
|
||||
|
||||
pub struct Protocol {
|
||||
adapter: Arc<dyn NetAdapter>,
|
||||
|
@ -47,7 +43,7 @@ impl MessageHandler for Protocol {
|
|||
&self,
|
||||
mut msg: Message<'a>,
|
||||
writer: &'a mut dyn Write,
|
||||
received_bytes: Arc<RwLock<RateCounter>>,
|
||||
tracker: Arc<Tracker>,
|
||||
) -> Result<Option<Response<'a>>, Error> {
|
||||
let adapter = &self.adapter;
|
||||
|
||||
|
@ -275,7 +271,7 @@ impl MessageHandler for Protocol {
|
|||
|
||||
// Increase received bytes quietly (without affecting the counters).
|
||||
// Otherwise we risk banning a peer as "abusive".
|
||||
received_bytes.write().inc_quiet(size as u64);
|
||||
tracker.inc_quiet_received(size as u64);
|
||||
}
|
||||
|
||||
// Remember to seek back to start of the file as the caller is likely
|
||||
|
@ -362,10 +358,7 @@ impl MessageHandler for Protocol {
|
|||
|
||||
// Increase received bytes quietly (without affecting the counters).
|
||||
// Otherwise we risk banning a peer as "abusive".
|
||||
{
|
||||
let mut received_bytes = received_bytes.write();
|
||||
received_bytes.inc_quiet(size as u64);
|
||||
}
|
||||
tracker.inc_quiet_received(size as u64)
|
||||
}
|
||||
tmp_zip
|
||||
.into_inner()
|
||||
|
|
|
@ -33,7 +33,7 @@ use crate::types::{
|
|||
Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan,
|
||||
TxHashSetRead,
|
||||
};
|
||||
use crate::util::{Mutex, StopState};
|
||||
use crate::util::StopState;
|
||||
use chrono::prelude::{DateTime, Utc};
|
||||
|
||||
/// P2P server implementation, handling bootstrapping to find and connect to
|
||||
|
@ -43,7 +43,7 @@ pub struct Server {
|
|||
capabilities: Capabilities,
|
||||
handshake: Arc<Handshake>,
|
||||
pub peers: Arc<Peers>,
|
||||
stop_state: Arc<Mutex<StopState>>,
|
||||
stop_state: Arc<StopState>,
|
||||
}
|
||||
|
||||
// TODO TLS
|
||||
|
@ -55,7 +55,7 @@ impl Server {
|
|||
config: P2PConfig,
|
||||
adapter: Arc<dyn ChainAdapter>,
|
||||
genesis: Hash,
|
||||
stop_state: Arc<Mutex<StopState>>,
|
||||
stop_state: Arc<StopState>,
|
||||
) -> Result<Server, Error> {
|
||||
Ok(Server {
|
||||
config: config.clone(),
|
||||
|
@ -77,7 +77,7 @@ impl Server {
|
|||
let sleep_time = Duration::from_millis(5);
|
||||
loop {
|
||||
// Pause peer ingress connection request. Only for tests.
|
||||
if self.stop_state.lock().is_paused() {
|
||||
if self.stop_state.is_paused() {
|
||||
thread::sleep(Duration::from_secs(1));
|
||||
continue;
|
||||
}
|
||||
|
@ -89,9 +89,13 @@ impl Server {
|
|||
if self.check_undesirable(&stream) {
|
||||
continue;
|
||||
}
|
||||
if let Err(e) = self.handle_new_peer(stream) {
|
||||
debug!("Error accepting peer {}: {:?}", peer_addr.to_string(), e);
|
||||
let _ = self.peers.add_banned(peer_addr, ReasonForBan::BadHandshake);
|
||||
match self.handle_new_peer(stream) {
|
||||
Err(Error::ConnectionClose) => debug!("shutting down, ignoring a new peer"),
|
||||
Err(e) => {
|
||||
debug!("Error accepting peer {}: {:?}", peer_addr.to_string(), e);
|
||||
let _ = self.peers.add_banned(peer_addr, ReasonForBan::BadHandshake);
|
||||
}
|
||||
Ok(_) => {}
|
||||
}
|
||||
}
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||
|
@ -101,7 +105,7 @@ impl Server {
|
|||
debug!("Couldn't establish new client connection: {:?}", e);
|
||||
}
|
||||
}
|
||||
if self.stop_state.lock().is_stopped() {
|
||||
if self.stop_state.is_stopped() {
|
||||
break;
|
||||
}
|
||||
thread::sleep(sleep_time);
|
||||
|
@ -112,6 +116,10 @@ impl Server {
|
|||
/// Asks the server to connect to a new peer. Directly returns the peer if
|
||||
/// we're already connected to the provided address.
|
||||
pub fn connect(&self, addr: PeerAddr) -> Result<Arc<Peer>, Error> {
|
||||
if self.stop_state.is_stopped() {
|
||||
return Err(Error::ConnectionClose);
|
||||
}
|
||||
|
||||
if Peer::is_denied(&self.config, addr) {
|
||||
debug!("connect_peer: peer {} denied, not connecting.", addr);
|
||||
return Err(Error::ConnectionClose);
|
||||
|
@ -169,6 +177,9 @@ impl Server {
|
|||
}
|
||||
|
||||
fn handle_new_peer(&self, stream: TcpStream) -> Result<(), Error> {
|
||||
if self.stop_state.is_stopped() {
|
||||
return Err(Error::ConnectionClose);
|
||||
}
|
||||
let total_diff = self.peers.total_difficulty()?;
|
||||
|
||||
// accept the peer and add it to the server map
|
||||
|
@ -214,7 +225,7 @@ impl Server {
|
|||
}
|
||||
|
||||
pub fn stop(&self) {
|
||||
self.stop_state.lock().stop();
|
||||
self.stop_state.stop();
|
||||
self.peers.stop();
|
||||
}
|
||||
|
||||
|
|
|
@ -16,7 +16,7 @@ use grin_core as core;
|
|||
use grin_p2p as p2p;
|
||||
|
||||
use grin_util as util;
|
||||
use grin_util::{Mutex, StopState};
|
||||
use grin_util::StopState;
|
||||
|
||||
use std::net::{SocketAddr, TcpListener, TcpStream};
|
||||
use std::sync::Arc;
|
||||
|
@ -56,7 +56,7 @@ fn peer_handshake() {
|
|||
p2p_config.clone(),
|
||||
net_adapter.clone(),
|
||||
Hash::from_vec(&vec![]),
|
||||
Arc::new(Mutex::new(StopState::new())),
|
||||
Arc::new(StopState::new()),
|
||||
)
|
||||
.unwrap(),
|
||||
);
|
||||
|
|
|
@ -344,6 +344,7 @@ pub enum SyncStatus {
|
|||
current_height: u64,
|
||||
highest_height: u64,
|
||||
},
|
||||
Shutdown,
|
||||
}
|
||||
|
||||
/// Current sync state. Encapsulates the current SyncStatus.
|
||||
|
|
|
@ -16,14 +16,14 @@ use chrono::prelude::Utc;
|
|||
use rand::{thread_rng, Rng};
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use crate::common::adapters::DandelionAdapter;
|
||||
use crate::core::core::hash::Hashed;
|
||||
use crate::core::core::transaction;
|
||||
use crate::core::core::verifier_cache::VerifierCache;
|
||||
use crate::pool::{DandelionConfig, Pool, PoolEntry, PoolError, TransactionPool, TxSource};
|
||||
use crate::util::{Mutex, RwLock, StopState};
|
||||
use crate::util::{RwLock, StopState};
|
||||
|
||||
/// A process to monitor transactions in the stempool.
|
||||
/// With Dandelion, transaction can be broadcasted in stem or fluff phase.
|
||||
|
@ -38,43 +38,54 @@ pub fn monitor_transactions(
|
|||
tx_pool: Arc<RwLock<TransactionPool>>,
|
||||
adapter: Arc<DandelionAdapter>,
|
||||
verifier_cache: Arc<RwLock<dyn VerifierCache>>,
|
||||
stop_state: Arc<Mutex<StopState>>,
|
||||
) {
|
||||
stop_state: Arc<StopState>,
|
||||
) -> std::io::Result<thread::JoinHandle<()>> {
|
||||
debug!("Started Dandelion transaction monitor.");
|
||||
|
||||
let _ = thread::Builder::new()
|
||||
thread::Builder::new()
|
||||
.name("dandelion".to_string())
|
||||
.spawn(move || {
|
||||
let run_interval = Duration::from_secs(10);
|
||||
let mut last_run = Instant::now()
|
||||
.checked_sub(Duration::from_secs(20))
|
||||
.unwrap_or_else(|| Instant::now());
|
||||
loop {
|
||||
// Halt Dandelion monitor if we have been notified that we are stopping.
|
||||
if stop_state.lock().is_stopped() {
|
||||
if stop_state.is_stopped() {
|
||||
break;
|
||||
}
|
||||
|
||||
if !adapter.is_stem() {
|
||||
let _ =
|
||||
process_fluff_phase(&dandelion_config, &tx_pool, &adapter, &verifier_cache)
|
||||
.map_err(|e| {
|
||||
error!("dand_mon: Problem processing fluff phase. {:?}", e);
|
||||
});
|
||||
if last_run.elapsed() > run_interval {
|
||||
if !adapter.is_stem() {
|
||||
let _ = process_fluff_phase(
|
||||
&dandelion_config,
|
||||
&tx_pool,
|
||||
&adapter,
|
||||
&verifier_cache,
|
||||
)
|
||||
.map_err(|e| {
|
||||
error!("dand_mon: Problem processing fluff phase. {:?}", e);
|
||||
});
|
||||
}
|
||||
|
||||
// Now find all expired entries based on embargo timer.
|
||||
let _ = process_expired_entries(&dandelion_config, &tx_pool).map_err(|e| {
|
||||
error!("dand_mon: Problem processing expired entries. {:?}", e);
|
||||
});
|
||||
|
||||
// Handle the tx above *before* we transition to next epoch.
|
||||
// This gives us an opportunity to do the final "fluff" before we start
|
||||
// stemming on the subsequent epoch.
|
||||
if adapter.is_expired() {
|
||||
adapter.next_epoch();
|
||||
}
|
||||
last_run = Instant::now();
|
||||
}
|
||||
|
||||
// Now find all expired entries based on embargo timer.
|
||||
let _ = process_expired_entries(&dandelion_config, &tx_pool).map_err(|e| {
|
||||
error!("dand_mon: Problem processing expired entries. {:?}", e);
|
||||
});
|
||||
|
||||
// Handle the tx above *before* we transition to next epoch.
|
||||
// This gives us an opportunity to do the final "fluff" before we start
|
||||
// stemming on the subsequent epoch.
|
||||
if adapter.is_expired() {
|
||||
adapter.next_epoch();
|
||||
}
|
||||
|
||||
// Monitor loops every 10s.
|
||||
thread::sleep(Duration::from_secs(10));
|
||||
// Monitor loops every 10s, but check stop flag every second.
|
||||
thread::sleep(Duration::from_secs(1));
|
||||
}
|
||||
});
|
||||
})
|
||||
}
|
||||
|
||||
// Query the pool for transactions older than the cutoff.
|
||||
|
|
|
@ -30,7 +30,7 @@ use crate::core::global;
|
|||
use crate::p2p;
|
||||
use crate::p2p::types::PeerAddr;
|
||||
use crate::p2p::ChainAdapter;
|
||||
use crate::util::{Mutex, StopState};
|
||||
use crate::util::StopState;
|
||||
|
||||
// DNS Seeds with contact email associated
|
||||
const MAINNET_DNS_SEEDS: &'static [&'static str] = &[
|
||||
|
@ -54,9 +54,9 @@ pub fn connect_and_monitor(
|
|||
capabilities: p2p::Capabilities,
|
||||
seed_list: Box<dyn Fn() -> Vec<PeerAddr> + Send>,
|
||||
preferred_peers: Option<Vec<PeerAddr>>,
|
||||
stop_state: Arc<Mutex<StopState>>,
|
||||
) {
|
||||
let _ = thread::Builder::new()
|
||||
stop_state: Arc<StopState>,
|
||||
) -> std::io::Result<thread::JoinHandle<()>> {
|
||||
thread::Builder::new()
|
||||
.name("seed".to_string())
|
||||
.spawn(move || {
|
||||
let peers = p2p_server.peers.clone();
|
||||
|
@ -77,16 +77,15 @@ pub fn connect_and_monitor(
|
|||
let mut prev_expire_check = MIN_DATE.and_hms(0, 0, 0);
|
||||
let mut prev_ping = Utc::now();
|
||||
let mut start_attempt = 0;
|
||||
|
||||
let mut connecting_history: HashMap<PeerAddr, DateTime<Utc>> = HashMap::new();
|
||||
|
||||
loop {
|
||||
if stop_state.lock().is_stopped() {
|
||||
if stop_state.is_stopped() {
|
||||
break;
|
||||
}
|
||||
|
||||
// Pause egress peer connection request. Only for tests.
|
||||
if stop_state.lock().is_paused() {
|
||||
if stop_state.is_paused() {
|
||||
thread::sleep(time::Duration::from_secs(1));
|
||||
continue;
|
||||
}
|
||||
|
@ -136,7 +135,7 @@ pub fn connect_and_monitor(
|
|||
|
||||
thread::sleep(time::Duration::from_secs(1));
|
||||
}
|
||||
});
|
||||
})
|
||||
}
|
||||
|
||||
fn monitor_peers(
|
||||
|
@ -326,17 +325,19 @@ fn listen_for_addrs(
|
|||
|
||||
let peers_c = peers.clone();
|
||||
let p2p_c = p2p.clone();
|
||||
let _ = thread::Builder::new()
|
||||
thread::Builder::new()
|
||||
.name("peer_connect".to_string())
|
||||
.spawn(move || match p2p_c.connect(addr) {
|
||||
Ok(p) => {
|
||||
let _ = p.send_peer_request(capab);
|
||||
let _ = peers_c.update_state(addr, p2p::State::Healthy);
|
||||
if p.send_peer_request(capab).is_ok() {
|
||||
let _ = peers_c.update_state(addr, p2p::State::Healthy);
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
let _ = peers_c.update_state(addr, p2p::State::Defunct);
|
||||
}
|
||||
});
|
||||
})
|
||||
.expect("failed to launch peer_connect thread");
|
||||
}
|
||||
|
||||
// shrink the connecting history.
|
||||
|
|
|
@ -21,7 +21,10 @@ use std::fs::File;
|
|||
use std::io::prelude::*;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::{thread, time};
|
||||
use std::{
|
||||
thread::{self, JoinHandle},
|
||||
time,
|
||||
};
|
||||
|
||||
use fs2::FileExt;
|
||||
|
||||
|
@ -44,7 +47,7 @@ use crate::p2p;
|
|||
use crate::p2p::types::PeerAddr;
|
||||
use crate::pool;
|
||||
use crate::util::file::get_first_line;
|
||||
use crate::util::{Mutex, RwLock, StopState};
|
||||
use crate::util::{RwLock, StopState};
|
||||
|
||||
/// Grin server holding internal structures.
|
||||
pub struct Server {
|
||||
|
@ -64,9 +67,13 @@ pub struct Server {
|
|||
/// To be passed around to collect stats and info
|
||||
state_info: ServerStateInfo,
|
||||
/// Stop flag
|
||||
pub stop_state: Arc<Mutex<StopState>>,
|
||||
pub stop_state: Arc<StopState>,
|
||||
/// Maintain a lock_file so we do not run multiple Grin nodes from same dir.
|
||||
lock_file: Arc<File>,
|
||||
connect_thread: Option<JoinHandle<()>>,
|
||||
sync_thread: JoinHandle<()>,
|
||||
dandelion_thread: JoinHandle<()>,
|
||||
p2p_thread: JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl Server {
|
||||
|
@ -142,7 +149,7 @@ impl Server {
|
|||
Some(b) => b,
|
||||
};
|
||||
|
||||
let stop_state = Arc::new(Mutex::new(StopState::new()));
|
||||
let stop_state = Arc::new(StopState::new());
|
||||
|
||||
// Shared cache for verification results.
|
||||
// We cache rangeproof verification and kernel signature verification.
|
||||
|
@ -180,7 +187,6 @@ impl Server {
|
|||
pow::verify_size,
|
||||
verifier_cache.clone(),
|
||||
archive_mode,
|
||||
stop_state.clone(),
|
||||
)?);
|
||||
|
||||
pool_adapter.set_chain(shared_chain.clone());
|
||||
|
@ -208,6 +214,8 @@ impl Server {
|
|||
pool_net_adapter.init(p2p_server.peers.clone());
|
||||
net_adapter.init(p2p_server.peers.clone());
|
||||
|
||||
let mut connect_thread = None;
|
||||
|
||||
if config.p2p_config.seeding_type != p2p::Seeding::Programmatic {
|
||||
let seeder = match config.p2p_config.seeding_type {
|
||||
p2p::Seeding::None => {
|
||||
|
@ -226,13 +234,13 @@ impl Server {
|
|||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
seed::connect_and_monitor(
|
||||
connect_thread = Some(seed::connect_and_monitor(
|
||||
p2p_server.clone(),
|
||||
config.p2p_config.capabilities,
|
||||
seeder,
|
||||
config.p2p_config.peers_preferred.clone(),
|
||||
stop_state.clone(),
|
||||
);
|
||||
)?);
|
||||
}
|
||||
|
||||
// Defaults to None (optional) in config file.
|
||||
|
@ -240,17 +248,21 @@ impl Server {
|
|||
let skip_sync_wait = config.skip_sync_wait.unwrap_or(false);
|
||||
sync_state.update(SyncStatus::AwaitingPeers(!skip_sync_wait));
|
||||
|
||||
sync::run_sync(
|
||||
let sync_thread = sync::run_sync(
|
||||
sync_state.clone(),
|
||||
p2p_server.peers.clone(),
|
||||
shared_chain.clone(),
|
||||
stop_state.clone(),
|
||||
);
|
||||
)?;
|
||||
|
||||
let p2p_inner = p2p_server.clone();
|
||||
let _ = thread::Builder::new()
|
||||
let p2p_thread = thread::Builder::new()
|
||||
.name("p2p-server".to_string())
|
||||
.spawn(move || p2p_inner.listen());
|
||||
.spawn(move || {
|
||||
if let Err(e) = p2p_inner.listen() {
|
||||
error!("P2P server failed with erorr: {:?}", e);
|
||||
}
|
||||
})?;
|
||||
|
||||
info!("Starting rest apis at: {}", &config.api_http_addr);
|
||||
let api_secret = get_first_line(config.api_secret_path.clone());
|
||||
|
@ -269,6 +281,7 @@ impl Server {
|
|||
}
|
||||
};
|
||||
|
||||
// TODO fix API shutdown and join this thread
|
||||
api::start_rest_apis(
|
||||
config.api_http_addr.clone(),
|
||||
shared_chain.clone(),
|
||||
|
@ -279,13 +292,13 @@ impl Server {
|
|||
);
|
||||
|
||||
info!("Starting dandelion monitor: {}", &config.api_http_addr);
|
||||
dandelion_monitor::monitor_transactions(
|
||||
let dandelion_thread = dandelion_monitor::monitor_transactions(
|
||||
config.dandelion_config.clone(),
|
||||
tx_pool.clone(),
|
||||
pool_net_adapter.clone(),
|
||||
verifier_cache.clone(),
|
||||
stop_state.clone(),
|
||||
);
|
||||
)?;
|
||||
|
||||
warn!("Grin server started.");
|
||||
Ok(Server {
|
||||
|
@ -300,6 +313,10 @@ impl Server {
|
|||
},
|
||||
stop_state,
|
||||
lock_file,
|
||||
connect_thread,
|
||||
sync_thread,
|
||||
p2p_thread,
|
||||
dandelion_thread,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -348,7 +365,7 @@ impl Server {
|
|||
pub fn start_test_miner(
|
||||
&self,
|
||||
wallet_listener_url: Option<String>,
|
||||
stop_state: Arc<Mutex<StopState>>,
|
||||
stop_state: Arc<StopState>,
|
||||
) {
|
||||
info!("start_test_miner - start",);
|
||||
let sync_state = self.sync_state.clone();
|
||||
|
@ -489,15 +506,41 @@ impl Server {
|
|||
}
|
||||
|
||||
/// Stop the server.
|
||||
pub fn stop(&self) {
|
||||
pub fn stop(self) {
|
||||
{
|
||||
self.sync_state.update(SyncStatus::Shutdown);
|
||||
self.stop_state.stop();
|
||||
|
||||
if let Some(connect_thread) = self.connect_thread {
|
||||
match connect_thread.join() {
|
||||
Err(e) => error!("failed to join to connect_and_monitor thread: {:?}", e),
|
||||
Ok(_) => info!("connect_and_monitor thread stopped"),
|
||||
}
|
||||
} else {
|
||||
info!("No active connect_and_monitor thread")
|
||||
}
|
||||
|
||||
match self.sync_thread.join() {
|
||||
Err(e) => error!("failed to join to sync thread: {:?}", e),
|
||||
Ok(_) => info!("sync thread stopped"),
|
||||
}
|
||||
|
||||
match self.dandelion_thread.join() {
|
||||
Err(e) => error!("failed to join to dandelion_monitor thread: {:?}", e),
|
||||
Ok(_) => info!("dandelion_monitor thread stopped"),
|
||||
}
|
||||
}
|
||||
self.p2p.stop();
|
||||
self.stop_state.lock().stop();
|
||||
match self.p2p_thread.join() {
|
||||
Err(e) => error!("failed to join to p2p thread: {:?}", e),
|
||||
Ok(_) => info!("p2p thread stopped"),
|
||||
}
|
||||
let _ = self.lock_file.unlock();
|
||||
}
|
||||
|
||||
/// Pause the p2p server.
|
||||
pub fn pause(&self) {
|
||||
self.stop_state.lock().pause();
|
||||
self.stop_state.pause();
|
||||
thread::sleep(time::Duration::from_secs(1));
|
||||
self.p2p.pause();
|
||||
}
|
||||
|
@ -505,12 +548,12 @@ impl Server {
|
|||
/// Resume p2p server.
|
||||
/// TODO - We appear not to resume the p2p server (peer connections) here?
|
||||
pub fn resume(&self) {
|
||||
self.stop_state.lock().resume();
|
||||
self.stop_state.resume();
|
||||
}
|
||||
|
||||
/// Stops the test miner without stopping the p2p layer
|
||||
pub fn stop_test_miner(&self, stop: Arc<Mutex<StopState>>) {
|
||||
stop.lock().stop();
|
||||
pub fn stop_test_miner(&self, stop: Arc<StopState>) {
|
||||
stop.stop();
|
||||
info!("stop_test_miner - stop",);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -141,6 +141,7 @@ impl BodySync {
|
|||
if let Some(peer) = peers_iter.next() {
|
||||
if let Err(e) = peer.send_block_request(*hash) {
|
||||
debug!("Skipped request to {}: {:?}", peer.info.addr, e);
|
||||
peer.stop();
|
||||
} else {
|
||||
self.blocks_requested += 1;
|
||||
}
|
||||
|
|
|
@ -24,27 +24,27 @@ use crate::grin::sync::body_sync::BodySync;
|
|||
use crate::grin::sync::header_sync::HeaderSync;
|
||||
use crate::grin::sync::state_sync::StateSync;
|
||||
use crate::p2p;
|
||||
use crate::util::{Mutex, StopState};
|
||||
use crate::util::StopState;
|
||||
|
||||
pub fn run_sync(
|
||||
sync_state: Arc<SyncState>,
|
||||
peers: Arc<p2p::Peers>,
|
||||
chain: Arc<chain::Chain>,
|
||||
stop_state: Arc<Mutex<StopState>>,
|
||||
) {
|
||||
let _ = thread::Builder::new()
|
||||
stop_state: Arc<StopState>,
|
||||
) -> std::io::Result<std::thread::JoinHandle<()>> {
|
||||
thread::Builder::new()
|
||||
.name("sync".to_string())
|
||||
.spawn(move || {
|
||||
let runner = SyncRunner::new(sync_state, peers, chain, stop_state);
|
||||
runner.sync_loop();
|
||||
});
|
||||
})
|
||||
}
|
||||
|
||||
pub struct SyncRunner {
|
||||
sync_state: Arc<SyncState>,
|
||||
peers: Arc<p2p::Peers>,
|
||||
chain: Arc<chain::Chain>,
|
||||
stop_state: Arc<Mutex<StopState>>,
|
||||
stop_state: Arc<StopState>,
|
||||
}
|
||||
|
||||
impl SyncRunner {
|
||||
|
@ -52,7 +52,7 @@ impl SyncRunner {
|
|||
sync_state: Arc<SyncState>,
|
||||
peers: Arc<p2p::Peers>,
|
||||
chain: Arc<chain::Chain>,
|
||||
stop_state: Arc<Mutex<StopState>>,
|
||||
stop_state: Arc<StopState>,
|
||||
) -> SyncRunner {
|
||||
SyncRunner {
|
||||
sync_state,
|
||||
|
@ -77,6 +77,9 @@ impl SyncRunner {
|
|||
let mut n = 0;
|
||||
const MIN_PEERS: usize = 3;
|
||||
loop {
|
||||
if self.stop_state.is_stopped() {
|
||||
break;
|
||||
}
|
||||
let wp = self.peers.more_or_same_work_peers()?;
|
||||
// exit loop when:
|
||||
// * we have more than MIN_PEERS more_or_same_work peers
|
||||
|
@ -140,7 +143,7 @@ impl SyncRunner {
|
|||
|
||||
// Main syncing loop
|
||||
loop {
|
||||
if self.stop_state.lock().is_stopped() {
|
||||
if self.stop_state.is_stopped() {
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -167,7 +170,13 @@ impl SyncRunner {
|
|||
unwrap_or_restart_loop!(self.chain.compact());
|
||||
}
|
||||
|
||||
thread::sleep(time::Duration::from_secs(10));
|
||||
// sleep for 10 secs but check stop signal every second
|
||||
for _ in 1..10 {
|
||||
thread::sleep(time::Duration::from_secs(1));
|
||||
if self.stop_state.is_stopped() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
|
@ -29,14 +29,14 @@ use crate::core::core::{Block, BlockHeader};
|
|||
use crate::core::global;
|
||||
use crate::mining::mine_block;
|
||||
use crate::pool;
|
||||
use crate::util::{Mutex, StopState};
|
||||
use crate::util::StopState;
|
||||
|
||||
pub struct Miner {
|
||||
config: StratumServerConfig,
|
||||
chain: Arc<chain::Chain>,
|
||||
tx_pool: Arc<RwLock<pool::TransactionPool>>,
|
||||
verifier_cache: Arc<RwLock<dyn VerifierCache>>,
|
||||
stop_state: Arc<Mutex<StopState>>,
|
||||
stop_state: Arc<StopState>,
|
||||
|
||||
// Just to hold the port we're on, so this miner can be identified
|
||||
// while watching debug output
|
||||
|
@ -51,7 +51,7 @@ impl Miner {
|
|||
chain: Arc<chain::Chain>,
|
||||
tx_pool: Arc<RwLock<pool::TransactionPool>>,
|
||||
verifier_cache: Arc<RwLock<dyn VerifierCache>>,
|
||||
stop_state: Arc<Mutex<StopState>>,
|
||||
stop_state: Arc<StopState>,
|
||||
) -> Miner {
|
||||
Miner {
|
||||
config,
|
||||
|
@ -136,7 +136,7 @@ impl Miner {
|
|||
let mut key_id = None;
|
||||
|
||||
loop {
|
||||
if self.stop_state.lock().is_stopped() {
|
||||
if self.stop_state.is_stopped() {
|
||||
break;
|
||||
}
|
||||
|
||||
|
|
|
@ -199,6 +199,7 @@ impl TUIStatusListener for TUIStatusView {
|
|||
};
|
||||
format!("Downloading blocks: {}%, step 4/4", percent)
|
||||
}
|
||||
SyncStatus::Shutdown => "Shutting down, closing connections".to_string(),
|
||||
}
|
||||
};
|
||||
/*let basic_mining_config_status = {
|
||||
|
|
|
@ -173,8 +173,10 @@ impl Controller {
|
|||
while let Some(message) = self.rx.try_iter().next() {
|
||||
match message {
|
||||
ControllerMessage::Shutdown => {
|
||||
server.stop();
|
||||
self.ui.stop();
|
||||
println!("Shutdown in progress, please wait");
|
||||
server.stop();
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -53,8 +53,8 @@ pub mod read_write;
|
|||
// other utils
|
||||
#[allow(unused_imports)]
|
||||
use std::ops::Deref;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
mod hex;
|
||||
pub use crate::hex::*;
|
||||
|
||||
|
@ -112,49 +112,45 @@ pub fn to_base64(s: &str) -> String {
|
|||
|
||||
/// Global stopped/paused state shared across various subcomponents of Grin.
|
||||
///
|
||||
/// Arc<Mutex<StopState>> allows the chain to lock the stop_state during critical processing.
|
||||
/// Other subcomponents cannot abruptly shutdown the server during block/header processing.
|
||||
/// This should prevent the chain ever ending up in an inconsistent state on restart.
|
||||
///
|
||||
/// "Stopped" allows a clean shutdown of the Grin server.
|
||||
/// "Paused" is used in some tests to allow nodes to reach steady state etc.
|
||||
///
|
||||
pub struct StopState {
|
||||
stopped: bool,
|
||||
paused: bool,
|
||||
stopped: AtomicBool,
|
||||
paused: AtomicBool,
|
||||
}
|
||||
|
||||
impl StopState {
|
||||
/// Create a new stop_state in default "running" state.
|
||||
pub fn new() -> StopState {
|
||||
StopState {
|
||||
stopped: false,
|
||||
paused: false,
|
||||
stopped: AtomicBool::new(false),
|
||||
paused: AtomicBool::new(false),
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if we are stopped.
|
||||
pub fn is_stopped(&self) -> bool {
|
||||
self.stopped
|
||||
self.stopped.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Check if we are paused.
|
||||
pub fn is_paused(&self) -> bool {
|
||||
self.paused
|
||||
self.paused.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Stop the server.
|
||||
pub fn stop(&mut self) {
|
||||
self.stopped = true;
|
||||
pub fn stop(&self) {
|
||||
self.stopped.store(true, Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Pause the server (only used in tests).
|
||||
pub fn pause(&mut self) {
|
||||
self.paused = true;
|
||||
pub fn pause(&self) {
|
||||
self.paused.store(true, Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Resume a paused server (only used in tests).
|
||||
pub fn resume(&mut self) {
|
||||
self.paused = false;
|
||||
pub fn resume(&self) {
|
||||
self.paused.store(false, Ordering::Relaxed)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue