mirror of
https://github.com/mimblewimble/grin.git
synced 2025-01-21 11:31:08 +03:00
[2.x.x] Use blocking IO in P2P to reduce CPU load (#2855)
* Use blocking IO in P2P to reduce CPU load
This commit is contained in:
parent
1395074a25
commit
d3dbafa80b
11 changed files with 138 additions and 240 deletions
|
@ -646,8 +646,7 @@ impl Chain {
|
||||||
/// TODO - Write this data to disk and validate the rebuilt kernel MMR.
|
/// TODO - Write this data to disk and validate the rebuilt kernel MMR.
|
||||||
pub fn kernel_data_write(&self, reader: &mut Read) -> Result<(), Error> {
|
pub fn kernel_data_write(&self, reader: &mut Read) -> Result<(), Error> {
|
||||||
let mut count = 0;
|
let mut count = 0;
|
||||||
let mut stream =
|
let mut stream = StreamingReader::new(reader, ProtocolVersion::local());
|
||||||
StreamingReader::new(reader, ProtocolVersion::local(), Duration::from_secs(1));
|
|
||||||
while let Ok(_kernel) = TxKernelEntry::read(&mut stream) {
|
while let Ok(_kernel) = TxKernelEntry::read(&mut stream) {
|
||||||
count += 1;
|
count += 1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -180,9 +180,9 @@ impl Default for HeaderVersion {
|
||||||
|
|
||||||
// self-conscious increment function courtesy of Jasper
|
// self-conscious increment function courtesy of Jasper
|
||||||
impl HeaderVersion {
|
impl HeaderVersion {
|
||||||
fn next(&self) -> Self {
|
fn next(&self) -> Self {
|
||||||
Self(self.0+1)
|
Self(self.0 + 1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl HeaderVersion {
|
impl HeaderVersion {
|
||||||
|
|
|
@ -17,13 +17,15 @@
|
||||||
//! should be used sparingly.
|
//! should be used sparingly.
|
||||||
|
|
||||||
use crate::consensus::{
|
use crate::consensus::{
|
||||||
HeaderInfo, valid_header_version, graph_weight, BASE_EDGE_BITS, BLOCK_TIME_SEC,
|
graph_weight, valid_header_version, HeaderInfo, BASE_EDGE_BITS, BLOCK_TIME_SEC,
|
||||||
COINBASE_MATURITY, CUT_THROUGH_HORIZON, DAY_HEIGHT, DEFAULT_MIN_EDGE_BITS,
|
COINBASE_MATURITY, CUT_THROUGH_HORIZON, DAY_HEIGHT, DEFAULT_MIN_EDGE_BITS,
|
||||||
DIFFICULTY_ADJUST_WINDOW, INITIAL_DIFFICULTY, MAX_BLOCK_WEIGHT, PROOFSIZE,
|
DIFFICULTY_ADJUST_WINDOW, INITIAL_DIFFICULTY, MAX_BLOCK_WEIGHT, PROOFSIZE,
|
||||||
SECOND_POW_EDGE_BITS, STATE_SYNC_THRESHOLD,
|
SECOND_POW_EDGE_BITS, STATE_SYNC_THRESHOLD,
|
||||||
};
|
};
|
||||||
use crate::core::block::HeaderVersion;
|
use crate::core::block::HeaderVersion;
|
||||||
use crate::pow::{self, new_cuckatoo_ctx, new_cuckaroo_ctx, new_cuckarood_ctx, EdgeType, PoWContext};
|
use crate::pow::{
|
||||||
|
self, new_cuckaroo_ctx, new_cuckarood_ctx, new_cuckatoo_ctx, EdgeType, PoWContext,
|
||||||
|
};
|
||||||
/// An enum collecting sets of parameters used throughout the
|
/// An enum collecting sets of parameters used throughout the
|
||||||
/// code wherever mining is needed. This should allow for
|
/// code wherever mining is needed. This should allow for
|
||||||
/// different sets of parameters for different purposes,
|
/// different sets of parameters for different purposes,
|
||||||
|
@ -164,14 +166,16 @@ where
|
||||||
match chain_type {
|
match chain_type {
|
||||||
// Mainnet has Cuckaroo(d)29 for AR and Cuckatoo31+ for AF
|
// Mainnet has Cuckaroo(d)29 for AR and Cuckatoo31+ for AF
|
||||||
ChainTypes::Mainnet if edge_bits > 29 => new_cuckatoo_ctx(edge_bits, proof_size, max_sols),
|
ChainTypes::Mainnet if edge_bits > 29 => new_cuckatoo_ctx(edge_bits, proof_size, max_sols),
|
||||||
ChainTypes::Mainnet if valid_header_version(height, HeaderVersion::new(2))
|
ChainTypes::Mainnet if valid_header_version(height, HeaderVersion::new(2)) => {
|
||||||
=> new_cuckarood_ctx(edge_bits, proof_size),
|
new_cuckarood_ctx(edge_bits, proof_size)
|
||||||
|
}
|
||||||
ChainTypes::Mainnet => new_cuckaroo_ctx(edge_bits, proof_size),
|
ChainTypes::Mainnet => new_cuckaroo_ctx(edge_bits, proof_size),
|
||||||
|
|
||||||
// Same for Floonet
|
// Same for Floonet
|
||||||
ChainTypes::Floonet if edge_bits > 29 => new_cuckatoo_ctx(edge_bits, proof_size, max_sols),
|
ChainTypes::Floonet if edge_bits > 29 => new_cuckatoo_ctx(edge_bits, proof_size, max_sols),
|
||||||
ChainTypes::Floonet if valid_header_version(height, HeaderVersion::new(2))
|
ChainTypes::Floonet if valid_header_version(height, HeaderVersion::new(2)) => {
|
||||||
=> new_cuckarood_ctx(edge_bits, proof_size),
|
new_cuckarood_ctx(edge_bits, proof_size)
|
||||||
|
}
|
||||||
ChainTypes::Floonet => new_cuckaroo_ctx(edge_bits, proof_size),
|
ChainTypes::Floonet => new_cuckaroo_ctx(edge_bits, proof_size),
|
||||||
|
|
||||||
// Everything else is Cuckatoo only
|
// Everything else is Cuckatoo only
|
||||||
|
|
|
@ -33,9 +33,9 @@ use num;
|
||||||
|
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
mod common;
|
mod common;
|
||||||
pub mod cuckatoo;
|
|
||||||
pub mod cuckaroo;
|
pub mod cuckaroo;
|
||||||
pub mod cuckarood;
|
pub mod cuckarood;
|
||||||
|
pub mod cuckatoo;
|
||||||
mod error;
|
mod error;
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub mod lean;
|
pub mod lean;
|
||||||
|
@ -49,9 +49,9 @@ use chrono::prelude::{DateTime, NaiveDateTime, Utc};
|
||||||
|
|
||||||
pub use self::common::EdgeType;
|
pub use self::common::EdgeType;
|
||||||
pub use self::types::*;
|
pub use self::types::*;
|
||||||
pub use crate::pow::cuckatoo::{new_cuckatoo_ctx, CuckatooContext};
|
|
||||||
pub use crate::pow::cuckaroo::{new_cuckaroo_ctx, CuckarooContext};
|
pub use crate::pow::cuckaroo::{new_cuckaroo_ctx, CuckarooContext};
|
||||||
pub use crate::pow::cuckarood::{new_cuckarood_ctx, CuckaroodContext};
|
pub use crate::pow::cuckarood::{new_cuckarood_ctx, CuckaroodContext};
|
||||||
|
pub use crate::pow::cuckatoo::{new_cuckatoo_ctx, CuckatooContext};
|
||||||
pub use crate::pow::error::Error;
|
pub use crate::pow::error::Error;
|
||||||
|
|
||||||
const MAX_SOLS: u32 = 10;
|
const MAX_SOLS: u32 = 10;
|
||||||
|
|
|
@ -22,11 +22,11 @@
|
||||||
//! a rotation by 25, halves the number of graph nodes in each partition,
|
//! a rotation by 25, halves the number of graph nodes in each partition,
|
||||||
//! and requires cycles to alternate between even- and odd-indexed edges.
|
//! and requires cycles to alternate between even- and odd-indexed edges.
|
||||||
|
|
||||||
|
use crate::global;
|
||||||
use crate::pow::common::{CuckooParams, EdgeType};
|
use crate::pow::common::{CuckooParams, EdgeType};
|
||||||
use crate::pow::error::{Error, ErrorKind};
|
use crate::pow::error::{Error, ErrorKind};
|
||||||
use crate::pow::siphash::siphash_block;
|
use crate::pow::siphash::siphash_block;
|
||||||
use crate::pow::{PoWContext, Proof};
|
use crate::pow::{PoWContext, Proof};
|
||||||
use crate::global;
|
|
||||||
|
|
||||||
/// Instantiate a new CuckaroodContext as a PowContext. Note that this can't
|
/// Instantiate a new CuckaroodContext as a PowContext. Note that this can't
|
||||||
/// be moved in the PoWContext trait as this particular trait needs to be
|
/// be moved in the PoWContext trait as this particular trait needs to be
|
||||||
|
@ -69,8 +69,7 @@ where
|
||||||
|
|
||||||
fn verify(&self, proof: &Proof) -> Result<(), Error> {
|
fn verify(&self, proof: &Proof) -> Result<(), Error> {
|
||||||
if proof.proof_size() != global::proofsize() {
|
if proof.proof_size() != global::proofsize() {
|
||||||
return Err(ErrorKind::Verification(
|
return Err(ErrorKind::Verification("wrong cycle length".to_owned()))?;
|
||||||
"wrong cycle length".to_owned(),))?;
|
|
||||||
}
|
}
|
||||||
let nonces = &proof.nonces;
|
let nonces = &proof.nonces;
|
||||||
let mut uvs = vec![0u64; 2 * proof.proof_size()];
|
let mut uvs = vec![0u64; 2 * proof.proof_size()];
|
||||||
|
@ -92,10 +91,10 @@ where
|
||||||
}
|
}
|
||||||
let edge = to_edge!(T, siphash_block(&self.params.siphash_keys, nonces[n], 25));
|
let edge = to_edge!(T, siphash_block(&self.params.siphash_keys, nonces[n], 25));
|
||||||
let idx = 4 * ndir[dir] + 2 * dir;
|
let idx = 4 * ndir[dir] + 2 * dir;
|
||||||
uvs[idx ] = to_u64!( edge & nodemask);
|
uvs[idx] = to_u64!(edge & nodemask);
|
||||||
uvs[idx+1] = to_u64!((edge >> 32) & nodemask);
|
uvs[idx + 1] = to_u64!((edge >> 32) & nodemask);
|
||||||
xor0 ^= uvs[idx ];
|
xor0 ^= uvs[idx];
|
||||||
xor1 ^= uvs[idx+1];
|
xor1 ^= uvs[idx + 1];
|
||||||
ndir[dir] += 1;
|
ndir[dir] += 1;
|
||||||
}
|
}
|
||||||
if xor0 | xor1 != 0 {
|
if xor0 | xor1 != 0 {
|
||||||
|
@ -110,7 +109,8 @@ where
|
||||||
// follow cycle
|
// follow cycle
|
||||||
j = i;
|
j = i;
|
||||||
for k in (((i % 4) ^ 2)..(2 * self.params.proof_size)).step_by(4) {
|
for k in (((i % 4) ^ 2)..(2 * self.params.proof_size)).step_by(4) {
|
||||||
if uvs[k] == uvs[i] { // find reverse edge endpoint identical to one at i
|
if uvs[k] == uvs[i] {
|
||||||
|
// find reverse edge endpoint identical to one at i
|
||||||
if j != i {
|
if j != i {
|
||||||
return Err(ErrorKind::Verification("branch in cycle".to_owned()))?;
|
return Err(ErrorKind::Verification("branch in cycle".to_owned()))?;
|
||||||
}
|
}
|
||||||
|
@ -173,11 +173,15 @@ mod test {
|
||||||
fn cuckarood19_29_vectors() {
|
fn cuckarood19_29_vectors() {
|
||||||
let mut ctx19 = new_impl::<u64>(19, 42);
|
let mut ctx19 = new_impl::<u64>(19, 42);
|
||||||
ctx19.params.siphash_keys = V1_19_HASH.clone();
|
ctx19.params.siphash_keys = V1_19_HASH.clone();
|
||||||
assert!(ctx19.verify(&Proof::new(V1_19_SOL.to_vec().clone())).is_ok());
|
assert!(ctx19
|
||||||
|
.verify(&Proof::new(V1_19_SOL.to_vec().clone()))
|
||||||
|
.is_ok());
|
||||||
assert!(ctx19.verify(&Proof::zero(42)).is_err());
|
assert!(ctx19.verify(&Proof::zero(42)).is_err());
|
||||||
let mut ctx29 = new_impl::<u64>(29, 42);
|
let mut ctx29 = new_impl::<u64>(29, 42);
|
||||||
ctx29.params.siphash_keys = V2_29_HASH.clone();
|
ctx29.params.siphash_keys = V2_29_HASH.clone();
|
||||||
assert!(ctx29.verify(&Proof::new(V2_29_SOL.to_vec().clone())).is_ok());
|
assert!(ctx29
|
||||||
|
.verify(&Proof::new(V2_29_SOL.to_vec().clone()))
|
||||||
|
.is_ok());
|
||||||
assert!(ctx29.verify(&Proof::zero(42)).is_err());
|
assert!(ctx29.verify(&Proof::zero(42)).is_err());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,7 +22,6 @@
|
||||||
use crate::core::hash::{DefaultHashable, Hash, Hashed};
|
use crate::core::hash::{DefaultHashable, Hash, Hashed};
|
||||||
use crate::global::PROTOCOL_VERSION;
|
use crate::global::PROTOCOL_VERSION;
|
||||||
use crate::keychain::{BlindingFactor, Identifier, IDENTIFIER_SIZE};
|
use crate::keychain::{BlindingFactor, Identifier, IDENTIFIER_SIZE};
|
||||||
use crate::util::read_write::read_exact;
|
|
||||||
use crate::util::secp::constants::{
|
use crate::util::secp::constants::{
|
||||||
AGG_SIGNATURE_SIZE, COMPRESSED_PUBLIC_KEY_SIZE, MAX_PROOF_SIZE, PEDERSEN_COMMITMENT_SIZE,
|
AGG_SIGNATURE_SIZE, COMPRESSED_PUBLIC_KEY_SIZE, MAX_PROOF_SIZE, PEDERSEN_COMMITMENT_SIZE,
|
||||||
SECRET_KEY_SIZE,
|
SECRET_KEY_SIZE,
|
||||||
|
@ -35,7 +34,6 @@ use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
|
||||||
use std::fmt::{self, Debug};
|
use std::fmt::{self, Debug};
|
||||||
use std::io::{self, Read, Write};
|
use std::io::{self, Read, Write};
|
||||||
use std::marker;
|
use std::marker;
|
||||||
use std::time::Duration;
|
|
||||||
use std::{cmp, error};
|
use std::{cmp, error};
|
||||||
|
|
||||||
/// Possible errors deriving from serializing or deserializing.
|
/// Possible errors deriving from serializing or deserializing.
|
||||||
|
@ -459,22 +457,16 @@ pub struct StreamingReader<'a> {
|
||||||
total_bytes_read: u64,
|
total_bytes_read: u64,
|
||||||
version: ProtocolVersion,
|
version: ProtocolVersion,
|
||||||
stream: &'a mut dyn Read,
|
stream: &'a mut dyn Read,
|
||||||
timeout: Duration,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> StreamingReader<'a> {
|
impl<'a> StreamingReader<'a> {
|
||||||
/// Create a new streaming reader with the provided underlying stream.
|
/// Create a new streaming reader with the provided underlying stream.
|
||||||
/// Also takes a duration to be used for each individual read_exact call.
|
/// Also takes a duration to be used for each individual read_exact call.
|
||||||
pub fn new(
|
pub fn new(stream: &'a mut dyn Read, version: ProtocolVersion) -> StreamingReader<'a> {
|
||||||
stream: &'a mut dyn Read,
|
|
||||||
version: ProtocolVersion,
|
|
||||||
timeout: Duration,
|
|
||||||
) -> StreamingReader<'a> {
|
|
||||||
StreamingReader {
|
StreamingReader {
|
||||||
total_bytes_read: 0,
|
total_bytes_read: 0,
|
||||||
version,
|
version,
|
||||||
stream,
|
stream,
|
||||||
timeout,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -521,7 +513,7 @@ impl<'a> Reader for StreamingReader<'a> {
|
||||||
/// Read a fixed number of bytes.
|
/// Read a fixed number of bytes.
|
||||||
fn read_fixed_bytes(&mut self, len: usize) -> Result<Vec<u8>, Error> {
|
fn read_fixed_bytes(&mut self, len: usize) -> Result<Vec<u8>, Error> {
|
||||||
let mut buf = vec![0u8; len];
|
let mut buf = vec![0u8; len];
|
||||||
read_exact(&mut self.stream, &mut buf, self.timeout, true)?;
|
self.stream.read_exact(&mut buf)?;
|
||||||
self.total_bytes_read += len as u64;
|
self.total_bytes_read += len as u64;
|
||||||
Ok(buf)
|
Ok(buf)
|
||||||
}
|
}
|
||||||
|
|
164
p2p/src/conn.rs
164
p2p/src/conn.rs
|
@ -20,24 +20,26 @@
|
||||||
//! forces us to go through some additional gymnastic to loop over the async
|
//! forces us to go through some additional gymnastic to loop over the async
|
||||||
//! stream and make sure we get the right number of bytes out.
|
//! stream and make sure we get the right number of bytes out.
|
||||||
|
|
||||||
use std::fs::File;
|
use crate::core::ser;
|
||||||
use std::io::{self, Read, Write};
|
use crate::core::ser::{FixedLength, ProtocolVersion};
|
||||||
use std::net::{Shutdown, TcpStream};
|
|
||||||
use std::sync::{mpsc, Arc};
|
|
||||||
use std::{
|
|
||||||
cmp,
|
|
||||||
thread::{self, JoinHandle},
|
|
||||||
time,
|
|
||||||
};
|
|
||||||
|
|
||||||
use crate::core::ser::{self, FixedLength, ProtocolVersion};
|
|
||||||
use crate::msg::{
|
use crate::msg::{
|
||||||
read_body, read_discard, read_header, read_item, write_to_buf, MsgHeader, MsgHeaderWrapper,
|
read_body, read_discard, read_header, read_item, write_to_buf, MsgHeader, MsgHeaderWrapper,
|
||||||
Type,
|
Type,
|
||||||
};
|
};
|
||||||
use crate::types::Error;
|
use crate::types::Error;
|
||||||
use crate::util::read_write::{read_exact, write_all};
|
|
||||||
use crate::util::{RateCounter, RwLock};
|
use crate::util::{RateCounter, RwLock};
|
||||||
|
use std::fs::File;
|
||||||
|
use std::io::{self, Read, Write};
|
||||||
|
use std::net::{Shutdown, TcpStream};
|
||||||
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
|
use std::sync::{mpsc, Arc};
|
||||||
|
use std::time::Duration;
|
||||||
|
use std::{
|
||||||
|
cmp,
|
||||||
|
thread::{self, JoinHandle},
|
||||||
|
};
|
||||||
|
|
||||||
|
const IO_TIMEOUT: Duration = Duration::from_millis(1000);
|
||||||
|
|
||||||
/// A trait to be implemented in order to receive messages from the
|
/// A trait to be implemented in order to receive messages from the
|
||||||
/// connection. Allows providing an optional response.
|
/// connection. Allows providing an optional response.
|
||||||
|
@ -56,7 +58,11 @@ macro_rules! try_break {
|
||||||
($inner:expr) => {
|
($inner:expr) => {
|
||||||
match $inner {
|
match $inner {
|
||||||
Ok(v) => Some(v),
|
Ok(v) => Some(v),
|
||||||
Err(Error::Connection(ref e)) if e.kind() == io::ErrorKind::WouldBlock => None,
|
Err(Error::Connection(ref e))
|
||||||
|
if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut =>
|
||||||
|
{
|
||||||
|
None
|
||||||
|
}
|
||||||
Err(Error::Store(_))
|
Err(Error::Store(_))
|
||||||
| Err(Error::Chain(_))
|
| Err(Error::Chain(_))
|
||||||
| Err(Error::Internal)
|
| Err(Error::Internal)
|
||||||
|
@ -106,12 +112,7 @@ impl<'a> Message<'a> {
|
||||||
while written < len {
|
while written < len {
|
||||||
let read_len = cmp::min(8000, len - written);
|
let read_len = cmp::min(8000, len - written);
|
||||||
let mut buf = vec![0u8; read_len];
|
let mut buf = vec![0u8; read_len];
|
||||||
read_exact(
|
self.stream.read_exact(&mut buf[..])?;
|
||||||
&mut self.stream,
|
|
||||||
&mut buf[..],
|
|
||||||
time::Duration::from_secs(10),
|
|
||||||
true,
|
|
||||||
)?;
|
|
||||||
writer.write_all(&mut buf)?;
|
writer.write_all(&mut buf)?;
|
||||||
written += read_len;
|
written += read_len;
|
||||||
}
|
}
|
||||||
|
@ -151,7 +152,7 @@ impl<'a> Response<'a> {
|
||||||
self.version,
|
self.version,
|
||||||
)?;
|
)?;
|
||||||
msg.append(&mut self.body);
|
msg.append(&mut self.body);
|
||||||
write_all(&mut self.stream, &msg[..], time::Duration::from_secs(10))?;
|
self.stream.write_all(&msg[..])?;
|
||||||
tracker.inc_sent(msg.len() as u64);
|
tracker.inc_sent(msg.len() as u64);
|
||||||
|
|
||||||
if let Some(mut file) = self.attachment {
|
if let Some(mut file) = self.attachment {
|
||||||
|
@ -160,7 +161,7 @@ impl<'a> Response<'a> {
|
||||||
match file.read(&mut buf[..]) {
|
match file.read(&mut buf[..]) {
|
||||||
Ok(0) => break,
|
Ok(0) => break,
|
||||||
Ok(n) => {
|
Ok(n) => {
|
||||||
write_all(&mut self.stream, &buf[..n], time::Duration::from_secs(10))?;
|
self.stream.write_all(&buf[..n])?;
|
||||||
// Increase sent bytes "quietly" without incrementing the counter.
|
// Increase sent bytes "quietly" without incrementing the counter.
|
||||||
// (In a loop here for the single attachment).
|
// (In a loop here for the single attachment).
|
||||||
tracker.inc_quiet_sent(n as u64);
|
tracker.inc_quiet_sent(n as u64);
|
||||||
|
@ -181,34 +182,39 @@ pub const SEND_CHANNEL_CAP: usize = 10;
|
||||||
|
|
||||||
pub struct StopHandle {
|
pub struct StopHandle {
|
||||||
/// Channel to close the connection
|
/// Channel to close the connection
|
||||||
pub close_channel: mpsc::Sender<()>,
|
stopped: Arc<AtomicBool>,
|
||||||
// we need Option to take ownhership of the handle in stop()
|
// we need Option to take ownhership of the handle in stop()
|
||||||
peer_thread: Option<JoinHandle<()>>,
|
reader_thread: Option<JoinHandle<()>>,
|
||||||
|
writer_thread: Option<JoinHandle<()>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl StopHandle {
|
impl StopHandle {
|
||||||
/// Schedule this connection to safely close via the async close_channel.
|
/// Schedule this connection to safely close via the async close_channel.
|
||||||
pub fn stop(&self) {
|
pub fn stop(&self) {
|
||||||
if self.close_channel.send(()).is_err() {
|
self.stopped.store(true, Ordering::Relaxed);
|
||||||
debug!("peer's close_channel is disconnected, must be stopped already");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn wait(&mut self) {
|
pub fn wait(&mut self) {
|
||||||
if let Some(peer_thread) = self.peer_thread.take() {
|
if let Some(reader_thread) = self.reader_thread.take() {
|
||||||
// wait only if other thread is calling us, eg shutdown
|
self.join_thread(reader_thread);
|
||||||
if thread::current().id() != peer_thread.thread().id() {
|
}
|
||||||
debug!("waiting for thread {:?} exit", peer_thread.thread().id());
|
if let Some(writer_thread) = self.writer_thread.take() {
|
||||||
if let Err(e) = peer_thread.join() {
|
self.join_thread(writer_thread);
|
||||||
error!("failed to wait for peer thread to stop: {:?}", e);
|
}
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
debug!(
|
fn join_thread(&self, peer_thread: JoinHandle<()>) {
|
||||||
"attempt to wait for thread {:?} from itself",
|
// wait only if other thread is calling us, eg shutdown
|
||||||
peer_thread.thread().id()
|
if thread::current().id() != peer_thread.thread().id() {
|
||||||
);
|
debug!("waiting for thread {:?} exit", peer_thread.thread().id());
|
||||||
|
if let Err(e) = peer_thread.join() {
|
||||||
|
error!("failed to stop peer thread: {:?}", e);
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
debug!(
|
||||||
|
"attempt to stop thread {:?} from itself",
|
||||||
|
peer_thread.thread().id()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -277,20 +283,27 @@ where
|
||||||
H: MessageHandler,
|
H: MessageHandler,
|
||||||
{
|
{
|
||||||
let (send_tx, send_rx) = mpsc::sync_channel(SEND_CHANNEL_CAP);
|
let (send_tx, send_rx) = mpsc::sync_channel(SEND_CHANNEL_CAP);
|
||||||
let (close_tx, close_rx) = mpsc::channel();
|
|
||||||
|
|
||||||
stream
|
stream
|
||||||
.set_nonblocking(true)
|
.set_read_timeout(Some(IO_TIMEOUT))
|
||||||
.expect("Non-blocking IO not available.");
|
.expect("can't set read timeout");
|
||||||
let peer_thread = poll(stream, version, handler, send_rx, close_rx, tracker)?;
|
stream
|
||||||
|
.set_write_timeout(Some(IO_TIMEOUT))
|
||||||
|
.expect("can't set read timeout");
|
||||||
|
|
||||||
|
let stopped = Arc::new(AtomicBool::new(false));
|
||||||
|
|
||||||
|
let (reader_thread, writer_thread) =
|
||||||
|
poll(stream, version, handler, send_rx, stopped.clone(), tracker)?;
|
||||||
|
|
||||||
Ok((
|
Ok((
|
||||||
ConnHandle {
|
ConnHandle {
|
||||||
send_channel: send_tx,
|
send_channel: send_tx,
|
||||||
},
|
},
|
||||||
StopHandle {
|
StopHandle {
|
||||||
close_channel: close_tx,
|
stopped,
|
||||||
peer_thread: Some(peer_thread),
|
reader_thread: Some(reader_thread),
|
||||||
|
writer_thread: Some(writer_thread),
|
||||||
},
|
},
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
@ -300,24 +313,24 @@ fn poll<H>(
|
||||||
version: ProtocolVersion,
|
version: ProtocolVersion,
|
||||||
handler: H,
|
handler: H,
|
||||||
send_rx: mpsc::Receiver<Vec<u8>>,
|
send_rx: mpsc::Receiver<Vec<u8>>,
|
||||||
close_rx: mpsc::Receiver<()>,
|
stopped: Arc<AtomicBool>,
|
||||||
tracker: Arc<Tracker>,
|
tracker: Arc<Tracker>,
|
||||||
) -> io::Result<JoinHandle<()>>
|
) -> io::Result<(JoinHandle<()>, JoinHandle<()>)>
|
||||||
where
|
where
|
||||||
H: MessageHandler,
|
H: MessageHandler,
|
||||||
{
|
{
|
||||||
// Split out tcp stream out into separate reader/writer halves.
|
// Split out tcp stream out into separate reader/writer halves.
|
||||||
let mut reader = conn.try_clone().expect("clone conn for reader failed");
|
let mut reader = conn.try_clone().expect("clone conn for reader failed");
|
||||||
let mut writer = conn.try_clone().expect("clone conn for writer failed");
|
let mut writer = conn.try_clone().expect("clone conn for writer failed");
|
||||||
|
let mut responder = conn.try_clone().expect("clone conn for writer failed");
|
||||||
|
let reader_stopped = stopped.clone();
|
||||||
|
|
||||||
thread::Builder::new()
|
let reader_thread = thread::Builder::new()
|
||||||
.name("peer".to_string())
|
.name("peer_read".to_string())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
let sleep_time = time::Duration::from_millis(5);
|
|
||||||
let mut retry_send = Err(());
|
|
||||||
loop {
|
loop {
|
||||||
// check the read end
|
// check the read end
|
||||||
match try_break!(read_header(&mut reader, version, None)) {
|
match try_break!(read_header(&mut reader, version)) {
|
||||||
Some(MsgHeaderWrapper::Known(header)) => {
|
Some(MsgHeaderWrapper::Known(header)) => {
|
||||||
let msg = Message::from_header(header, &mut reader, version);
|
let msg = Message::from_header(header, &mut reader, version);
|
||||||
|
|
||||||
|
@ -331,7 +344,7 @@ where
|
||||||
tracker.inc_received(MsgHeader::LEN as u64 + msg.header.msg_len);
|
tracker.inc_received(MsgHeader::LEN as u64 + msg.header.msg_len);
|
||||||
|
|
||||||
if let Some(Some(resp)) =
|
if let Some(Some(resp)) =
|
||||||
try_break!(handler.consume(msg, &mut writer, tracker.clone()))
|
try_break!(handler.consume(msg, &mut responder, tracker.clone()))
|
||||||
{
|
{
|
||||||
try_break!(resp.write(tracker.clone()));
|
try_break!(resp.write(tracker.clone()));
|
||||||
}
|
}
|
||||||
|
@ -345,35 +358,48 @@ where
|
||||||
None => {}
|
None => {}
|
||||||
}
|
}
|
||||||
|
|
||||||
// check the write end, use or_else so try_recv is lazily eval'd
|
// check the close channel
|
||||||
let maybe_data = retry_send.or_else(|_| send_rx.try_recv());
|
if reader_stopped.load(Ordering::Relaxed) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
debug!(
|
||||||
|
"Shutting down reader connection with {}",
|
||||||
|
reader
|
||||||
|
.peer_addr()
|
||||||
|
.map(|a| a.to_string())
|
||||||
|
.unwrap_or("?".to_owned())
|
||||||
|
);
|
||||||
|
let _ = reader.shutdown(Shutdown::Both);
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let writer_thread = thread::Builder::new()
|
||||||
|
.name("peer_read".to_string())
|
||||||
|
.spawn(move || {
|
||||||
|
let mut retry_send = Err(());
|
||||||
|
loop {
|
||||||
|
let maybe_data = retry_send.or_else(|_| send_rx.recv_timeout(IO_TIMEOUT));
|
||||||
retry_send = Err(());
|
retry_send = Err(());
|
||||||
if let Ok(data) = maybe_data {
|
if let Ok(data) = maybe_data {
|
||||||
let written = try_break!(write_all(
|
let written = try_break!(writer.write_all(&data[..]).map_err(&From::from));
|
||||||
&mut writer,
|
|
||||||
&data[..],
|
|
||||||
std::time::Duration::from_secs(10)
|
|
||||||
)
|
|
||||||
.map_err(&From::from));
|
|
||||||
if written.is_none() {
|
if written.is_none() {
|
||||||
retry_send = Ok(data);
|
retry_send = Ok(data);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// check the close channel
|
// check the close channel
|
||||||
if let Ok(_) = close_rx.try_recv() {
|
if stopped.load(Ordering::Relaxed) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
thread::sleep(sleep_time);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
"Shutting down connection with {}",
|
"Shutting down reader connection with {}",
|
||||||
conn.peer_addr()
|
writer
|
||||||
|
.peer_addr()
|
||||||
.map(|a| a.to_string())
|
.map(|a| a.to_string())
|
||||||
.unwrap_or("?".to_owned())
|
.unwrap_or("?".to_owned())
|
||||||
);
|
);
|
||||||
let _ = conn.shutdown(Shutdown::Both);
|
})?;
|
||||||
})
|
Ok((reader_thread, writer_thread))
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,10 +14,6 @@
|
||||||
|
|
||||||
//! Message types that transit over the network and related serialization code.
|
//! Message types that transit over the network and related serialization code.
|
||||||
|
|
||||||
use num::FromPrimitive;
|
|
||||||
use std::io::{Read, Write};
|
|
||||||
use std::time;
|
|
||||||
|
|
||||||
use crate::core::core::hash::Hash;
|
use crate::core::core::hash::Hash;
|
||||||
use crate::core::core::BlockHeader;
|
use crate::core::core::BlockHeader;
|
||||||
use crate::core::pow::Difficulty;
|
use crate::core::pow::Difficulty;
|
||||||
|
@ -28,7 +24,8 @@ use crate::core::{consensus, global};
|
||||||
use crate::types::{
|
use crate::types::{
|
||||||
Capabilities, Error, PeerAddr, ReasonForBan, MAX_BLOCK_HEADERS, MAX_LOCATORS, MAX_PEER_ADDRS,
|
Capabilities, Error, PeerAddr, ReasonForBan, MAX_BLOCK_HEADERS, MAX_LOCATORS, MAX_PEER_ADDRS,
|
||||||
};
|
};
|
||||||
use crate::util::read_write::read_exact;
|
use num::FromPrimitive;
|
||||||
|
use std::io::{Read, Write};
|
||||||
|
|
||||||
/// Grin's user agent with current version
|
/// Grin's user agent with current version
|
||||||
pub const USER_AGENT: &'static str = concat!("MW/Grin ", env!("CARGO_PKG_VERSION"));
|
pub const USER_AGENT: &'static str = concat!("MW/Grin ", env!("CARGO_PKG_VERSION"));
|
||||||
|
@ -126,14 +123,9 @@ fn magic() -> [u8; 2] {
|
||||||
pub fn read_header(
|
pub fn read_header(
|
||||||
stream: &mut dyn Read,
|
stream: &mut dyn Read,
|
||||||
version: ProtocolVersion,
|
version: ProtocolVersion,
|
||||||
msg_type: Option<Type>,
|
|
||||||
) -> Result<MsgHeaderWrapper, Error> {
|
) -> Result<MsgHeaderWrapper, Error> {
|
||||||
let mut head = vec![0u8; MsgHeader::LEN];
|
let mut head = vec![0u8; MsgHeader::LEN];
|
||||||
if Some(Type::Hand) == msg_type {
|
stream.read_exact(&mut head)?;
|
||||||
read_exact(stream, &mut head, time::Duration::from_millis(10), true)?;
|
|
||||||
} else {
|
|
||||||
read_exact(stream, &mut head, time::Duration::from_secs(10), false)?;
|
|
||||||
}
|
|
||||||
let header = ser::deserialize::<MsgHeaderWrapper>(&mut &head[..], version)?;
|
let header = ser::deserialize::<MsgHeaderWrapper>(&mut &head[..], version)?;
|
||||||
Ok(header)
|
Ok(header)
|
||||||
}
|
}
|
||||||
|
@ -145,8 +137,7 @@ pub fn read_item<T: Readable>(
|
||||||
stream: &mut dyn Read,
|
stream: &mut dyn Read,
|
||||||
version: ProtocolVersion,
|
version: ProtocolVersion,
|
||||||
) -> Result<(T, u64), Error> {
|
) -> Result<(T, u64), Error> {
|
||||||
let timeout = time::Duration::from_secs(20);
|
let mut reader = StreamingReader::new(stream, version);
|
||||||
let mut reader = StreamingReader::new(stream, version, timeout);
|
|
||||||
let res = T::read(&mut reader)?;
|
let res = T::read(&mut reader)?;
|
||||||
Ok((res, reader.total_bytes_read()))
|
Ok((res, reader.total_bytes_read()))
|
||||||
}
|
}
|
||||||
|
@ -159,14 +150,14 @@ pub fn read_body<T: Readable>(
|
||||||
version: ProtocolVersion,
|
version: ProtocolVersion,
|
||||||
) -> Result<T, Error> {
|
) -> Result<T, Error> {
|
||||||
let mut body = vec![0u8; h.msg_len as usize];
|
let mut body = vec![0u8; h.msg_len as usize];
|
||||||
read_exact(stream, &mut body, time::Duration::from_secs(20), true)?;
|
stream.read_exact(&mut body)?;
|
||||||
ser::deserialize(&mut &body[..], version).map_err(From::from)
|
ser::deserialize(&mut &body[..], version).map_err(From::from)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Read (an unknown) message from the provided stream and discard it.
|
/// Read (an unknown) message from the provided stream and discard it.
|
||||||
pub fn read_discard(msg_len: u64, stream: &mut dyn Read) -> Result<(), Error> {
|
pub fn read_discard(msg_len: u64, stream: &mut dyn Read) -> Result<(), Error> {
|
||||||
let mut buffer = vec![0u8; msg_len as usize];
|
let mut buffer = vec![0u8; msg_len as usize];
|
||||||
read_exact(stream, &mut buffer, time::Duration::from_secs(20), true)?;
|
stream.read_exact(&mut buffer)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -176,7 +167,7 @@ pub fn read_message<T: Readable>(
|
||||||
version: ProtocolVersion,
|
version: ProtocolVersion,
|
||||||
msg_type: Type,
|
msg_type: Type,
|
||||||
) -> Result<T, Error> {
|
) -> Result<T, Error> {
|
||||||
match read_header(stream, version, Some(msg_type))? {
|
match read_header(stream, version)? {
|
||||||
MsgHeaderWrapper::Known(header) => {
|
MsgHeaderWrapper::Known(header) => {
|
||||||
if header.msg_type == msg_type {
|
if header.msg_type == msg_type {
|
||||||
read_body(&header, stream, version)
|
read_body(&header, stream, version)
|
||||||
|
|
|
@ -16,14 +16,14 @@ use memmap;
|
||||||
use tempfile::tempfile;
|
use tempfile::tempfile;
|
||||||
|
|
||||||
use crate::core::ser::{
|
use crate::core::ser::{
|
||||||
self, BinWriter, FixedLength, ProtocolVersion, Readable, Reader, StreamingReader, Writeable, Writer,
|
self, BinWriter, FixedLength, ProtocolVersion, Readable, Reader, StreamingReader, Writeable,
|
||||||
|
Writer,
|
||||||
};
|
};
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
use std::fs::{self, File, OpenOptions};
|
use std::fs::{self, File, OpenOptions};
|
||||||
use std::io::{self, BufReader, BufWriter, Seek, SeekFrom, Write};
|
use std::io::{self, BufReader, BufWriter, Seek, SeekFrom, Write};
|
||||||
use std::marker;
|
use std::marker;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::time;
|
|
||||||
|
|
||||||
/// Represents a single entry in the size_file.
|
/// Represents a single entry in the size_file.
|
||||||
/// Offset (in bytes) and size (in bytes) of a variable sized entry
|
/// Offset (in bytes) and size (in bytes) of a variable sized entry
|
||||||
|
@ -482,8 +482,7 @@ where
|
||||||
{
|
{
|
||||||
let reader = File::open(&self.path)?;
|
let reader = File::open(&self.path)?;
|
||||||
let mut buf_reader = BufReader::new(reader);
|
let mut buf_reader = BufReader::new(reader);
|
||||||
let mut streaming_reader =
|
let mut streaming_reader = StreamingReader::new(&mut buf_reader, self.version);
|
||||||
StreamingReader::new(&mut buf_reader, self.version, time::Duration::from_secs(1));
|
|
||||||
|
|
||||||
let mut buf_writer = BufWriter::new(File::create(&tmp_path)?);
|
let mut buf_writer = BufWriter::new(File::create(&tmp_path)?);
|
||||||
let mut bin_writer = BinWriter::new(&mut buf_writer, self.version);
|
let mut bin_writer = BinWriter::new(&mut buf_writer, self.version);
|
||||||
|
@ -529,11 +528,7 @@ where
|
||||||
{
|
{
|
||||||
let reader = File::open(&self.path)?;
|
let reader = File::open(&self.path)?;
|
||||||
let mut buf_reader = BufReader::new(reader);
|
let mut buf_reader = BufReader::new(reader);
|
||||||
let mut streaming_reader = StreamingReader::new(
|
let mut streaming_reader = StreamingReader::new(&mut buf_reader, self.version);
|
||||||
&mut buf_reader,
|
|
||||||
self.version,
|
|
||||||
time::Duration::from_secs(1),
|
|
||||||
);
|
|
||||||
|
|
||||||
let mut buf_writer = BufWriter::new(File::create(&tmp_path)?);
|
let mut buf_writer = BufWriter::new(File::create(&tmp_path)?);
|
||||||
let mut bin_writer = BinWriter::new(&mut buf_writer, self.version);
|
let mut bin_writer = BinWriter::new(&mut buf_writer, self.version);
|
||||||
|
|
|
@ -47,9 +47,6 @@ pub use crate::types::{LogLevel, LoggingConfig, ZeroingString};
|
||||||
|
|
||||||
pub mod macros;
|
pub mod macros;
|
||||||
|
|
||||||
// read_exact and write_all impls
|
|
||||||
pub mod read_write;
|
|
||||||
|
|
||||||
// other utils
|
// other utils
|
||||||
#[allow(unused_imports)]
|
#[allow(unused_imports)]
|
||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
|
|
|
@ -1,110 +0,0 @@
|
||||||
// Copyright 2018 The Grin Developers
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
//! Custom impls of read_exact and write_all to work around async stream restrictions.
|
|
||||||
|
|
||||||
use std::io;
|
|
||||||
use std::io::prelude::*;
|
|
||||||
use std::thread;
|
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
/// The default implementation of read_exact is useless with an async stream (TcpStream) as
|
|
||||||
/// it will return as soon as something has been read, regardless of
|
|
||||||
/// whether the buffer has been filled (and then errors). This implementation
|
|
||||||
/// will block until it has read exactly `len` bytes and returns them as a
|
|
||||||
/// `vec<u8>`. Except for a timeout, this implementation will never return a
|
|
||||||
/// partially filled buffer.
|
|
||||||
///
|
|
||||||
/// The timeout in milliseconds aborts the read when it's met. Note that the
|
|
||||||
/// time is not guaranteed to be exact. To support cases where we want to poll
|
|
||||||
/// instead of blocking, a `block_on_empty` boolean, when false, ensures
|
|
||||||
/// `read_exact` returns early with a `io::ErrorKind::WouldBlock` if nothing
|
|
||||||
/// has been read from the socket.
|
|
||||||
pub fn read_exact(
|
|
||||||
stream: &mut dyn Read,
|
|
||||||
mut buf: &mut [u8],
|
|
||||||
timeout: Duration,
|
|
||||||
block_on_empty: bool,
|
|
||||||
) -> io::Result<()> {
|
|
||||||
let sleep_time = Duration::from_micros(10);
|
|
||||||
let mut count = Duration::new(0, 0);
|
|
||||||
|
|
||||||
let mut read = 0;
|
|
||||||
loop {
|
|
||||||
match stream.read(buf) {
|
|
||||||
Ok(0) => {
|
|
||||||
return Err(io::Error::new(
|
|
||||||
io::ErrorKind::ConnectionAborted,
|
|
||||||
"read_exact",
|
|
||||||
));
|
|
||||||
}
|
|
||||||
Ok(n) => {
|
|
||||||
let tmp = buf;
|
|
||||||
buf = &mut tmp[n..];
|
|
||||||
read += n;
|
|
||||||
}
|
|
||||||
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
|
|
||||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
|
||||||
if read == 0 && !block_on_empty {
|
|
||||||
return Err(io::Error::new(io::ErrorKind::WouldBlock, "read_exact"));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => return Err(e),
|
|
||||||
}
|
|
||||||
if !buf.is_empty() {
|
|
||||||
thread::sleep(sleep_time);
|
|
||||||
count += sleep_time;
|
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if count > timeout {
|
|
||||||
return Err(io::Error::new(
|
|
||||||
io::ErrorKind::TimedOut,
|
|
||||||
"reading from stream",
|
|
||||||
));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Same as `read_exact` but for writing.
|
|
||||||
pub fn write_all(stream: &mut dyn Write, mut buf: &[u8], timeout: Duration) -> io::Result<()> {
|
|
||||||
let sleep_time = Duration::from_micros(10);
|
|
||||||
let mut count = Duration::new(0, 0);
|
|
||||||
|
|
||||||
while !buf.is_empty() {
|
|
||||||
match stream.write(buf) {
|
|
||||||
Ok(0) => {
|
|
||||||
return Err(io::Error::new(
|
|
||||||
io::ErrorKind::WriteZero,
|
|
||||||
"failed to write whole buffer",
|
|
||||||
));
|
|
||||||
}
|
|
||||||
Ok(n) => buf = &buf[n..],
|
|
||||||
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
|
|
||||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
|
|
||||||
Err(e) => return Err(e),
|
|
||||||
}
|
|
||||||
if !buf.is_empty() {
|
|
||||||
thread::sleep(sleep_time);
|
|
||||||
count += sleep_time;
|
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if count > timeout {
|
|
||||||
return Err(io::Error::new(io::ErrorKind::TimedOut, "writing to stream"));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
Loading…
Reference in a new issue