Refactor p2p reader (#3433)

* Refactor p2p reader

* Rename Output enum

* Consume enum takes owned values instead of references

* Deserialization in codec, remove Consume enum

* Calculate block header size

* Read headers in batches

* Remove headers type from deserializer
This commit is contained in:
jaspervdm 2020-09-28 15:43:37 +02:00 committed by GitHub
parent 4944679fb2
commit defc714f6e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 701 additions and 341 deletions

2
Cargo.lock generated
View file

@ -1027,6 +1027,7 @@ version = "4.2.0-alpha.1"
dependencies = [
"blake2-rfc",
"byteorder",
"bytes",
"chrono",
"croaring-mw",
"enum_primitive",
@ -1073,6 +1074,7 @@ name = "grin_p2p"
version = "4.2.0-alpha.1"
dependencies = [
"bitflags 1.2.1",
"bytes",
"chrono",
"enum_primitive",
"grin_chain",

View file

@ -27,6 +27,7 @@ siphasher = "0.3"
log = "0.4"
chrono = { version = "0.4.11", features = ["serde"] }
zeroize = { version = "1.1", features =["zeroize_derive"] }
bytes = "0.5"
keychain = { package = "grin_keychain", path = "../keychain", version = "4.2.0-alpha.1" }
util = { package = "grin_util", path = "../util", version = "4.2.0-alpha.1" }

View file

@ -26,7 +26,7 @@ use crate::core::block::HeaderVersion;
use crate::{
pow::{
self, new_cuckaroo_ctx, new_cuckarood_ctx, new_cuckaroom_ctx, new_cuckarooz_ctx,
new_cuckatoo_ctx, PoWContext,
new_cuckatoo_ctx, BitVec, PoWContext,
},
ser::ProtocolVersion,
};
@ -411,3 +411,51 @@ where
last_n.reverse();
last_n
}
/// Calculates the size of a header (in bytes) given a number of edge bits in the PoW
#[inline]
pub fn header_size_bytes(edge_bits: u8) -> usize {
let size = 2 + 2 * 8 + 5 * 32 + 32 + 2 * 8;
let proof_size = 8 + 4 + 8 + 1 + BitVec::bytes_len(edge_bits as usize * proofsize());
size + proof_size
}
#[cfg(test)]
mod test {
use super::*;
use crate::core::Block;
use crate::genesis::*;
use crate::pow::mine_genesis_block;
use crate::ser::{BinWriter, Writeable};
fn test_header_len(genesis: Block) {
let mut raw = Vec::<u8>::with_capacity(1_024);
let mut writer = BinWriter::new(&mut raw, ProtocolVersion::local());
genesis.header.write(&mut writer).unwrap();
assert_eq!(raw.len(), header_size_bytes(genesis.header.pow.edge_bits()));
}
#[test]
fn automated_testing_header_len() {
set_local_chain_type(ChainTypes::AutomatedTesting);
test_header_len(mine_genesis_block().unwrap());
}
#[test]
fn user_testing_header_len() {
set_local_chain_type(ChainTypes::UserTesting);
test_header_len(mine_genesis_block().unwrap());
}
#[test]
fn floonet_header_len() {
set_local_chain_type(ChainTypes::Floonet);
test_header_len(genesis_floo());
}
#[test]
fn mainnet_header_len() {
set_local_chain_type(ChainTypes::Mainnet);
test_header_len(genesis_main());
}
}

View file

@ -471,15 +471,17 @@ impl Writeable for Proof {
}
}
/// A bit vector
// TODO this could likely be optimized by writing whole bytes (or even words)
// in the `BitVec` at once, dealing with the truncation, instead of bits by bits
struct BitVec {
pub struct BitVec {
bits: Vec<u8>,
}
impl BitVec {
/// Number of bytes required to store the provided number of bits
fn bytes_len(bits_len: usize) -> usize {
#[inline]
pub fn bytes_len(bits_len: usize) -> usize {
(bits_len + 7) / 8
}

View file

@ -22,6 +22,7 @@
use crate::core::hash::{DefaultHashable, Hash, Hashed};
use crate::global::PROTOCOL_VERSION;
use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
use bytes::Buf;
use keychain::{BlindingFactor, Identifier, IDENTIFIER_SIZE};
use std::convert::TryInto;
use std::fmt::{self, Debug};
@ -79,6 +80,12 @@ impl From<io::Error> for Error {
}
}
impl From<io::ErrorKind> for Error {
fn from(e: io::ErrorKind) -> Error {
Error::IOErr(format!("{}", io::Error::from(e)), e)
}
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
@ -575,6 +582,109 @@ impl<'a> Reader for StreamingReader<'a> {
}
}
/// Protocol version-aware wrapper around a `Buf` impl
pub struct BufReader<'a, B: Buf> {
inner: &'a mut B,
version: ProtocolVersion,
bytes_read: usize,
}
impl<'a, B: Buf> BufReader<'a, B> {
/// Construct a new BufReader
pub fn new(buf: &'a mut B, version: ProtocolVersion) -> Self {
Self {
inner: buf,
version,
bytes_read: 0,
}
}
/// Check whether the buffer has enough bytes remaining to perform a read
fn has_remaining(&mut self, len: usize) -> Result<(), Error> {
if self.inner.remaining() >= len {
self.bytes_read += len;
Ok(())
} else {
Err(io::ErrorKind::UnexpectedEof.into())
}
}
/// The total bytes read
pub fn bytes_read(&self) -> u64 {
self.bytes_read as u64
}
/// Convenience function to read from the buffer and deserialize
pub fn body<T: Readable>(&mut self) -> Result<T, Error> {
T::read(self)
}
}
impl<'a, B: Buf> Reader for BufReader<'a, B> {
fn read_u8(&mut self) -> Result<u8, Error> {
self.has_remaining(1)?;
Ok(self.inner.get_u8())
}
fn read_u16(&mut self) -> Result<u16, Error> {
self.has_remaining(2)?;
Ok(self.inner.get_u16())
}
fn read_u32(&mut self) -> Result<u32, Error> {
self.has_remaining(4)?;
Ok(self.inner.get_u32())
}
fn read_u64(&mut self) -> Result<u64, Error> {
self.has_remaining(8)?;
Ok(self.inner.get_u64())
}
fn read_i32(&mut self) -> Result<i32, Error> {
self.has_remaining(4)?;
Ok(self.inner.get_i32())
}
fn read_i64(&mut self) -> Result<i64, Error> {
self.has_remaining(8)?;
Ok(self.inner.get_i64())
}
fn read_bytes_len_prefix(&mut self) -> Result<Vec<u8>, Error> {
let len = self.read_u64()?;
self.read_fixed_bytes(len as usize)
}
fn read_fixed_bytes(&mut self, len: usize) -> Result<Vec<u8>, Error> {
// not reading more than 100k bytes in a single read
if len > 100_000 {
return Err(Error::TooLargeReadErr);
}
self.has_remaining(len)?;
let mut buf = vec![0; len];
self.inner.copy_to_slice(&mut buf[..]);
Ok(buf)
}
fn expect_u8(&mut self, val: u8) -> Result<u8, Error> {
let b = self.read_u8()?;
if b == val {
Ok(b)
} else {
Err(Error::UnexpectedData {
expected: vec![val],
received: vec![b],
})
}
}
fn protocol_version(&self) -> ProtocolVersion {
self.version
}
}
impl Readable for Commitment {
fn read<R: Reader>(reader: &mut R) -> Result<Commitment, Error> {
let a = reader.read_fixed_bytes(PEDERSEN_COMMITMENT_SIZE)?;

View file

@ -20,6 +20,7 @@ serde_derive = "1"
tempfile = "3.1"
log = "0.4"
chrono = { version = "0.4.11", features = ["serde"] }
bytes = "0.5"
grin_core = { path = "../core", version = "4.2.0-alpha.1" }
grin_store = { path = "../store", version = "4.2.0-alpha.1" }

245
p2p/src/codec.rs Normal file
View file

@ -0,0 +1,245 @@
use crate::core::core::block::{BlockHeader, UntrustedBlockHeader};
use crate::core::global::header_size_bytes;
use crate::core::ser::{BufReader, ProtocolVersion, Readable};
use crate::msg::{Message, MsgHeader, MsgHeaderWrapper, Type};
use crate::types::{AttachmentMeta, AttachmentUpdate, Error};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use core::ser::Reader;
use std::cmp::min;
use std::io::Read;
use std::mem;
use std::net::TcpStream;
use std::sync::Arc;
use std::time::{Duration, Instant};
use MsgHeaderWrapper::*;
use State::*;
const HEADER_IO_TIMEOUT: Duration = Duration::from_millis(2000);
pub const BODY_IO_TIMEOUT: Duration = Duration::from_millis(60000);
const HEADER_BATCH_SIZE: usize = 32;
enum State {
None,
Header(MsgHeaderWrapper),
BlockHeaders {
bytes_left: usize,
items_left: usize,
headers: Vec<BlockHeader>,
},
Attachment(usize, Arc<AttachmentMeta>, Instant),
}
impl State {
fn is_none(&self) -> bool {
match self {
State::None => true,
_ => false,
}
}
}
pub struct Codec {
pub version: ProtocolVersion,
stream: TcpStream,
buffer: BytesMut,
state: State,
bytes_read: usize,
}
impl Codec {
pub fn new(version: ProtocolVersion, stream: TcpStream) -> Self {
Self {
version,
stream,
buffer: BytesMut::with_capacity(8 * 1024),
state: None,
bytes_read: 0,
}
}
/// Destroy the codec and return the reader
pub fn stream(self) -> TcpStream {
self.stream
}
/// Inform codec next `len` bytes are an attachment
/// Panics if already reading a body
pub fn expect_attachment(&mut self, meta: Arc<AttachmentMeta>) {
assert!(self.state.is_none());
self.state = Attachment(meta.size, meta, Instant::now());
}
/// Length of the next item we are expecting, could be msg header, body, block header or attachment chunk
fn next_len(&self) -> usize {
match &self.state {
None => MsgHeader::LEN,
Header(Known(h)) if h.msg_type == Type::Headers => {
// If we are receiving a list of headers, read off the item count first
min(h.msg_len as usize, 2)
}
Header(Known(header)) => header.msg_len as usize,
Header(Unknown(len, _)) => *len as usize,
BlockHeaders { bytes_left, .. } => {
// The header length varies with the number of edge bits. Therefore we overestimate
// its size and only actually read the bytes we need
min(*bytes_left, header_size_bytes(63))
}
Attachment(left, _, _) => min(*left, 48_000),
}
}
/// Set stream timeout depending on the next expected item
fn set_stream_timeout(&self) -> Result<(), Error> {
let timeout = match &self.state {
None => HEADER_IO_TIMEOUT,
_ => BODY_IO_TIMEOUT,
};
self.stream.set_read_timeout(Some(timeout))?;
Ok(())
}
fn read_inner(&mut self) -> Result<Message, Error> {
self.bytes_read = 0;
loop {
let next_len = self.next_len();
let pre_len = self.buffer.len();
// Buffer could already be partially filled, calculate additional bytes we need
let to_read = next_len.saturating_sub(pre_len);
if to_read > 0 {
self.buffer.reserve(to_read);
for _ in 0..to_read {
self.buffer.put_u8(0);
}
self.set_stream_timeout()?;
if let Err(e) = self.stream.read_exact(&mut self.buffer[pre_len..]) {
// Undo reserved bytes on a failed read
self.buffer.truncate(pre_len);
return Err(e.into());
}
self.bytes_read += to_read;
}
match &mut self.state {
None => {
// Parse header and keep reading
let mut raw = self.buffer.split_to(next_len).freeze();
let mut reader = BufReader::new(&mut raw, self.version);
let header = MsgHeaderWrapper::read(&mut reader)?;
self.state = Header(header);
}
Header(Known(header)) => {
let mut raw = self.buffer.split_to(next_len).freeze();
if header.msg_type == Type::Headers {
// Special consideration for a list of headers, as we want to verify and process
// them as they come in instead of only after the full list has been received
let mut reader = BufReader::new(&mut raw, self.version);
let items_left = reader.read_u16()? as usize;
self.state = BlockHeaders {
bytes_left: header.msg_len as usize - 2,
items_left,
headers: Vec::with_capacity(min(HEADER_BATCH_SIZE, items_left)),
};
} else {
// Return full message
let msg = decode_message(header, &mut raw, self.version);
self.state = None;
return msg;
}
}
Header(Unknown(_, msg_type)) => {
// Discard body and return
let msg_type = *msg_type;
self.buffer.advance(next_len);
self.state = None;
return Ok(Message::Unknown(msg_type));
}
BlockHeaders {
bytes_left,
items_left,
headers,
} => {
if *bytes_left == 0 {
// Incorrect item count
self.state = None;
return Err(Error::BadMessage);
}
let mut reader = BufReader::new(&mut self.buffer, self.version);
let header: UntrustedBlockHeader = reader.body()?;
let bytes_read = reader.bytes_read() as usize;
headers.push(header.into());
*bytes_left = bytes_left.saturating_sub(bytes_read);
*items_left -= 1;
if headers.len() == HEADER_BATCH_SIZE || *items_left == 0 {
let mut h = Vec::with_capacity(min(HEADER_BATCH_SIZE, *items_left));
mem::swap(headers, &mut h);
if *items_left == 0 {
let bytes_left = *bytes_left;
self.state = None;
if bytes_left > 0 {
return Err(Error::BadMessage);
}
}
return Ok(Message::Headers(h));
}
}
Attachment(left, meta, now) => {
let raw = self.buffer.split_to(next_len).freeze();
*left -= next_len;
if now.elapsed().as_secs() > 10 {
*now = Instant::now();
debug!("attachment: {}/{}", meta.size - *left, meta.size);
}
let update = AttachmentUpdate {
read: next_len,
left: *left,
meta: Arc::clone(meta),
};
if *left == 0 {
self.state = None;
debug!("attachment: DONE");
}
return Ok(Message::Attachment(update, Some(raw)));
}
}
}
}
/// Blocking read of the next message
pub fn read(&mut self) -> (Result<Message, Error>, u64) {
let msg = self.read_inner();
(msg, self.bytes_read as u64)
}
}
// TODO: replace with a macro?
fn decode_message(
header: &MsgHeader,
body: &mut Bytes,
version: ProtocolVersion,
) -> Result<Message, Error> {
let mut msg = BufReader::new(body, version);
let c = match header.msg_type {
Type::Ping => Message::Ping(msg.body()?),
Type::Pong => Message::Pong(msg.body()?),
Type::BanReason => Message::BanReason(msg.body()?),
Type::TransactionKernel => Message::TransactionKernel(msg.body()?),
Type::GetTransaction => Message::GetTransaction(msg.body()?),
Type::Transaction => Message::Transaction(msg.body()?),
Type::StemTransaction => Message::StemTransaction(msg.body()?),
Type::GetBlock => Message::GetBlock(msg.body()?),
Type::Block => Message::Block(msg.body()?),
Type::GetCompactBlock => Message::GetCompactBlock(msg.body()?),
Type::CompactBlock => Message::CompactBlock(msg.body()?),
Type::GetHeaders => Message::GetHeaders(msg.body()?),
Type::Header => Message::Header(msg.body()?),
Type::GetPeerAddrs => Message::GetPeerAddrs(msg.body()?),
Type::PeerAddrs => Message::PeerAddrs(msg.body()?),
Type::TxHashSetRequest => Message::TxHashSetRequest(msg.body()?),
Type::TxHashSetArchive => Message::TxHashSetArchive(msg.body()?),
Type::Error | Type::Hand | Type::Shake | Type::Headers => {
return Err(Error::UnexpectedMessage)
}
};
Ok(c)
}

View file

@ -20,40 +20,28 @@
//! forces us to go through some additional gymnastic to loop over the async
//! stream and make sure we get the right number of bytes out.
use crate::core::ser;
use crate::codec::{Codec, BODY_IO_TIMEOUT};
use crate::core::ser::ProtocolVersion;
use crate::msg::{
read_body, read_discard, read_header, read_item, write_message, Msg, MsgHeader,
MsgHeaderWrapper,
};
use crate::msg::{write_message, Consumed, Message, Msg};
use crate::types::Error;
use crate::util::{RateCounter, RwLock};
use std::io::{self, Read, Write};
use std::fs::File;
use std::io::{self, Write};
use std::net::{Shutdown, TcpStream};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::RecvTimeoutError;
use std::sync::{mpsc, Arc};
use std::thread::{self, JoinHandle};
use std::time::Duration;
use std::{
cmp,
thread::{self, JoinHandle},
};
pub const SEND_CHANNEL_CAP: usize = 100;
const HEADER_IO_TIMEOUT: Duration = Duration::from_millis(2000);
const CHANNEL_TIMEOUT: Duration = Duration::from_millis(1000);
const BODY_IO_TIMEOUT: Duration = Duration::from_millis(60000);
/// A trait to be implemented in order to receive messages from the
/// connection. Allows providing an optional response.
pub trait MessageHandler: Send + 'static {
fn consume<'a, R: Read>(
&self,
msg: Message<'a, R>,
stopped: Arc<AtomicBool>,
tracker: Arc<Tracker>,
) -> Result<Option<Msg>, Error>;
fn consume(&self, message: Message) -> Result<Consumed, Error>;
}
// Macro to simplify the boilerplate around I/O and Grin error handling
@ -79,54 +67,6 @@ macro_rules! try_break {
};
}
macro_rules! try_header {
($res:expr, $conn: expr) => {{
let _ = $conn.set_read_timeout(Some(HEADER_IO_TIMEOUT));
try_break!($res)
}};
}
/// A message as received by the connection. Provides access to the message
/// header lazily consumes the message body, handling its deserialization.
pub struct Message<'a, R: Read> {
pub header: MsgHeader,
stream: &'a mut R,
version: ProtocolVersion,
}
impl<'a, R: Read> Message<'a, R> {
fn from_header(header: MsgHeader, stream: &'a mut R, version: ProtocolVersion) -> Self {
Message {
header,
stream,
version,
}
}
/// Read the message body from the underlying connection
pub fn body<T: ser::Readable>(&mut self) -> Result<T, Error> {
read_body(&self.header, self.stream, self.version)
}
/// Read a single "thing" from the underlying connection.
/// Return the thing and the total bytes read.
pub fn streaming_read<T: ser::Readable>(&mut self) -> Result<(T, u64), Error> {
read_item(self.stream, self.version)
}
pub fn copy_attachment(&mut self, len: usize, writer: &mut dyn Write) -> Result<usize, Error> {
let mut written = 0;
while written < len {
let read_len = cmp::min(8000, len - written);
let mut buf = vec![0u8; read_len];
self.stream.read_exact(&mut buf[..])?;
writer.write_all(&buf)?;
written += read_len;
}
Ok(written)
}
}
pub struct StopHandle {
/// Channel to close the connection
stopped: Arc<AtomicBool>,
@ -281,7 +221,7 @@ where
H: MessageHandler,
{
// Split out tcp stream out into separate reader/writer halves.
let mut reader = conn.try_clone().expect("clone conn for reader failed");
let reader = conn.try_clone().expect("clone conn for reader failed");
let mut writer = conn.try_clone().expect("clone conn for writer failed");
let reader_stopped = stopped.clone();
@ -291,58 +231,83 @@ where
let reader_thread = thread::Builder::new()
.name("peer_read".to_string())
.spawn(move || {
let peer_addr = reader
.peer_addr()
.map(|a| a.to_string())
.unwrap_or_else(|_| "?".to_owned());
let mut codec = Codec::new(version, reader);
let mut attachment: Option<File> = None;
loop {
// check the read end
match try_header!(read_header(&mut reader, version), &reader) {
Some(MsgHeaderWrapper::Known(header)) => {
let _ = reader.set_read_timeout(Some(BODY_IO_TIMEOUT));
let msg = Message::from_header(header, &mut reader, version);
trace!(
"Received message header, type {:?}, len {}.",
msg.header.msg_type,
msg.header.msg_len
);
// Increase received bytes counter
reader_tracker.inc_received(MsgHeader::LEN as u64 + msg.header.msg_len);
let resp_msg = try_break!(handler.consume(
msg,
reader_stopped.clone(),
reader_tracker.clone()
));
if let Some(Some(resp_msg)) = resp_msg {
try_break!(conn_handle.send(resp_msg));
}
}
Some(MsgHeaderWrapper::Unknown(msg_len, type_byte)) => {
debug!(
"Received unknown message header, type {:?}, len {}.",
type_byte, msg_len
);
// Increase received bytes counter
reader_tracker.inc_received(MsgHeader::LEN as u64 + msg_len);
try_break!(read_discard(msg_len, &mut reader));
}
None => {}
}
// check the close channel
if reader_stopped.load(Ordering::Relaxed) {
break;
}
// check the read end
let (next, bytes_read) = codec.read();
// increase the appropriate counter
match &next {
Ok(Message::Attachment(_, _)) => reader_tracker.inc_quiet_received(bytes_read),
_ => reader_tracker.inc_received(bytes_read),
}
let message = match try_break!(next) {
Some(Message::Unknown(type_byte)) => {
debug!(
"Received unknown message, type {:?}, len {}.",
type_byte, bytes_read
);
continue;
}
Some(Message::Attachment(update, bytes)) => {
let a = match &mut attachment {
Some(a) => a,
None => {
error!("Received unexpected attachment chunk");
break;
}
};
let bytes = bytes.unwrap();
if let Err(e) = a.write_all(&bytes) {
error!("Unable to write attachment file: {}", e);
break;
}
if update.left == 0 {
if let Err(e) = a.sync_all() {
error!("Unable to sync attachment file: {}", e);
break;
}
attachment.take();
}
Message::Attachment(update, None)
}
Some(message) => {
trace!("Received message, type {}, len {}.", message, bytes_read);
message
}
None => continue,
};
let consumed = try_break!(handler.consume(message)).unwrap_or(Consumed::None);
match consumed {
Consumed::Response(resp_msg) => {
try_break!(conn_handle.send(resp_msg));
}
Consumed::Attachment(meta, file) => {
// Start attachment
codec.expect_attachment(meta);
attachment = Some(file);
}
Consumed::Disconnect => break,
Consumed::None => {}
}
}
debug!(
"Shutting down reader connection with {}",
reader
.peer_addr()
.map(|a| a.to_string())
.unwrap_or_else(|_| "?".to_owned())
);
let _ = reader.shutdown(Shutdown::Both);
debug!("Shutting down reader connection with {}", peer_addr);
let _ = codec.stream().shutdown(Shutdown::Both);
})?;
let writer_thread = thread::Builder::new()

View file

@ -36,6 +36,7 @@ extern crate serde_derive;
#[macro_use]
extern crate log;
mod codec;
mod conn;
pub mod handshake;
pub mod msg;

View file

@ -16,16 +16,21 @@
use crate::conn::Tracker;
use crate::core::core::hash::Hash;
use crate::core::core::BlockHeader;
use crate::core::core::{
BlockHeader, Transaction, UntrustedBlock, UntrustedBlockHeader, UntrustedCompactBlock,
};
use crate::core::pow::Difficulty;
use crate::core::ser::{
self, ProtocolVersion, Readable, Reader, StreamingReader, Writeable, Writer,
};
use crate::core::{consensus, global};
use crate::types::{
Capabilities, Error, PeerAddr, ReasonForBan, MAX_BLOCK_HEADERS, MAX_LOCATORS, MAX_PEER_ADDRS,
AttachmentMeta, AttachmentUpdate, Capabilities, Error, PeerAddr, ReasonForBan,
MAX_BLOCK_HEADERS, MAX_LOCATORS, MAX_PEER_ADDRS,
};
use bytes::Bytes;
use num::FromPrimitive;
use std::fmt;
use std::fs::File;
use std::io::{Read, Write};
use std::sync::Arc;
@ -704,3 +709,77 @@ impl Readable for TxHashSetArchive {
})
}
}
pub enum Message {
Unknown(u8),
Ping(Ping),
Pong(Pong),
BanReason(BanReason),
TransactionKernel(Hash),
GetTransaction(Hash),
Transaction(Transaction),
StemTransaction(Transaction),
GetBlock(Hash),
Block(UntrustedBlock),
GetCompactBlock(Hash),
CompactBlock(UntrustedCompactBlock),
GetHeaders(Locator),
Header(UntrustedBlockHeader),
Headers(Vec<BlockHeader>),
GetPeerAddrs(GetPeerAddrs),
PeerAddrs(PeerAddrs),
TxHashSetRequest(TxHashSetRequest),
TxHashSetArchive(TxHashSetArchive),
Attachment(AttachmentUpdate, Option<Bytes>),
}
impl fmt::Display for Message {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Message::Unknown(_) => write!(f, "unknown"),
Message::Ping(_) => write!(f, "ping"),
Message::Pong(_) => write!(f, "pong"),
Message::BanReason(_) => write!(f, "ban reason"),
Message::TransactionKernel(_) => write!(f, "tx kernel"),
Message::GetTransaction(_) => write!(f, "get tx"),
Message::Transaction(_) => write!(f, "tx"),
Message::StemTransaction(_) => write!(f, "stem tx"),
Message::GetBlock(_) => write!(f, "get block"),
Message::Block(_) => write!(f, "block"),
Message::GetCompactBlock(_) => write!(f, "get compact block"),
Message::CompactBlock(_) => write!(f, "compact block"),
Message::GetHeaders(_) => write!(f, "get headers"),
Message::Header(_) => write!(f, "header"),
Message::Headers(_) => write!(f, "headers"),
Message::GetPeerAddrs(_) => write!(f, "get peer addrs"),
Message::PeerAddrs(_) => write!(f, "peer addrs"),
Message::TxHashSetRequest(_) => write!(f, "tx hash set request"),
Message::TxHashSetArchive(_) => write!(f, "tx hash set"),
Message::Attachment(_, _) => write!(f, "attachment"),
}
}
}
impl fmt::Debug for Message {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Consume({})", self)
}
}
pub enum Consumed {
Response(Msg),
Attachment(Arc<AttachmentMeta>, File),
None,
Disconnect,
}
impl fmt::Debug for Consumed {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Consumed::Response(msg) => write!(f, "Consumed::Response({:?})", msg.header.msg_type),
Consumed::Attachment(meta, _) => write!(f, "Consumed::Attachment({:?})", meta.size),
Consumed::None => write!(f, "Consumed::None"),
Consumed::Disconnect => write!(f, "Consumed::Disconnect"),
}
}
}

View file

@ -13,21 +13,16 @@
// limitations under the License.
use crate::chain;
use crate::conn::{Message, MessageHandler, Tracker};
use crate::core::core::{self, hash::Hash, hash::Hashed, CompactBlock};
use crate::msg::{
BanReason, GetPeerAddrs, Headers, Locator, Msg, PeerAddrs, Ping, Pong, TxHashSetArchive,
TxHashSetRequest, Type,
};
use crate::types::{Error, NetAdapter, PeerInfo};
use crate::conn::MessageHandler;
use crate::core::core::{hash::Hashed, CompactBlock};
use crate::msg::{Consumed, Headers, Message, Msg, PeerAddrs, Pong, TxHashSetArchive, Type};
use crate::types::{AttachmentMeta, Error, NetAdapter, PeerInfo};
use chrono::prelude::Utc;
use rand::{thread_rng, Rng};
use std::cmp;
use std::fs::{self, File, OpenOptions};
use std::io::{BufWriter, Read};
use std::fs::{self, File};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Instant;
pub struct Protocol {
adapter: Arc<dyn NetAdapter>,
@ -50,12 +45,7 @@ impl Protocol {
}
impl MessageHandler for Protocol {
fn consume<R: Read>(
&self,
mut msg: Message<R>,
stopped: Arc<AtomicBool>,
tracker: Arc<Tracker>,
) -> Result<Option<Msg>, Error> {
fn consume(&self, message: Message) -> Result<Consumed, Error> {
let adapter = &self.adapter;
// If we received a msg from a banned peer then log and drop it.
@ -63,210 +53,169 @@ impl MessageHandler for Protocol {
// banned peers up correctly?
if adapter.is_banned(self.peer_info.addr) {
debug!(
"handler: consume: peer {:?} banned, received: {:?}, dropping.",
self.peer_info.addr, msg.header.msg_type,
"handler: consume: peer {:?} banned, received: {}, dropping.",
self.peer_info.addr, message,
);
return Ok(None);
return Ok(Consumed::Disconnect);
}
match msg.header.msg_type {
Type::Ping => {
let ping: Ping = msg.body()?;
adapter.peer_difficulty(self.peer_info.addr, ping.total_difficulty, ping.height);
let consumed = match message {
Message::Attachment(update, _) => {
self.adapter.txhashset_download_update(
update.meta.start_time,
(update.meta.size - update.left) as u64,
update.meta.size as u64,
);
Ok(Some(Msg::new(
if update.left == 0 {
let meta = update.meta;
trace!(
"handle_payload: txhashset archive save to file {:?} success",
meta.path,
);
let zip = File::open(meta.path.clone())?;
let res =
self.adapter
.txhashset_write(meta.hash.clone(), zip, &self.peer_info)?;
debug!(
"handle_payload: txhashset archive for {} at {}, DONE. Data Ok: {}",
meta.hash, meta.height, !res
);
if let Err(e) = fs::remove_file(meta.path.clone()) {
warn!("fail to remove tmp file: {:?}. err: {}", meta.path, e);
}
}
Consumed::None
}
Message::Ping(ping) => {
adapter.peer_difficulty(self.peer_info.addr, ping.total_difficulty, ping.height);
Consumed::Response(Msg::new(
Type::Pong,
Pong {
total_difficulty: adapter.total_difficulty()?,
height: adapter.total_height()?,
},
self.peer_info.version,
)?))
)?)
}
Type::Pong => {
let pong: Pong = msg.body()?;
Message::Pong(pong) => {
adapter.peer_difficulty(self.peer_info.addr, pong.total_difficulty, pong.height);
Ok(None)
Consumed::None
}
Type::BanReason => {
let ban_reason: BanReason = msg.body()?;
Message::BanReason(ban_reason) => {
error!("handle_payload: BanReason {:?}", ban_reason);
Ok(None)
Consumed::Disconnect
}
Type::TransactionKernel => {
let h: Hash = msg.body()?;
debug!(
"handle_payload: received tx kernel: {}, msg_len: {}",
h, msg.header.msg_len
);
Message::TransactionKernel(h) => {
debug!("handle_payload: received tx kernel: {}", h);
adapter.tx_kernel_received(h, &self.peer_info)?;
Ok(None)
Consumed::None
}
Type::GetTransaction => {
let h: Hash = msg.body()?;
debug!(
"handle_payload: GetTransaction: {}, msg_len: {}",
h, msg.header.msg_len,
);
Message::GetTransaction(h) => {
debug!("handle_payload: GetTransaction: {}", h);
let tx = adapter.get_transaction(h);
if let Some(tx) = tx {
Ok(Some(Msg::new(
Type::Transaction,
tx,
self.peer_info.version,
)?))
Consumed::Response(Msg::new(Type::Transaction, tx, self.peer_info.version)?)
} else {
Ok(None)
Consumed::None
}
}
Type::Transaction => {
debug!(
"handle_payload: received tx: msg_len: {}",
msg.header.msg_len
);
let tx: core::Transaction = msg.body()?;
Message::Transaction(tx) => {
debug!("handle_payload: received tx");
adapter.transaction_received(tx, false)?;
Ok(None)
Consumed::None
}
Type::StemTransaction => {
debug!(
"handle_payload: received stem tx: msg_len: {}",
msg.header.msg_len
);
let tx: core::Transaction = msg.body()?;
Message::StemTransaction(tx) => {
debug!("handle_payload: received stem tx");
adapter.transaction_received(tx, true)?;
Ok(None)
Consumed::None
}
Type::GetBlock => {
let h: Hash = msg.body()?;
trace!(
"handle_payload: GetBlock: {}, msg_len: {}",
h,
msg.header.msg_len,
);
Message::GetBlock(h) => {
trace!("handle_payload: GetBlock: {}", h);
let bo = adapter.get_block(h, &self.peer_info);
if let Some(b) = bo {
return Ok(Some(Msg::new(Type::Block, b, self.peer_info.version)?));
Consumed::Response(Msg::new(Type::Block, b, self.peer_info.version)?)
} else {
Consumed::None
}
Ok(None)
}
Type::Block => {
debug!(
"handle_payload: received block: msg_len: {}",
msg.header.msg_len
);
let b: core::UntrustedBlock = msg.body()?;
Message::Block(b) => {
debug!("handle_payload: received block");
// We default to NONE opts here as we do not know know yet why this block was
// received.
// If we requested this block from a peer due to our node syncing then
// the peer adapter will override opts to reflect this.
adapter.block_received(b.into(), &self.peer_info, chain::Options::NONE)?;
Ok(None)
Consumed::None
}
Type::GetCompactBlock => {
let h: Hash = msg.body()?;
Message::GetCompactBlock(h) => {
if let Some(b) = adapter.get_block(h, &self.peer_info) {
let cb: CompactBlock = b.into();
Ok(Some(Msg::new(
Type::CompactBlock,
cb,
self.peer_info.version,
)?))
Consumed::Response(Msg::new(Type::CompactBlock, cb, self.peer_info.version)?)
} else {
Ok(None)
Consumed::None
}
}
Type::CompactBlock => {
debug!(
"handle_payload: received compact block: msg_len: {}",
msg.header.msg_len
);
let b: core::UntrustedCompactBlock = msg.body()?;
Message::CompactBlock(b) => {
debug!("handle_payload: received compact block");
adapter.compact_block_received(b.into(), &self.peer_info)?;
Ok(None)
Consumed::None
}
Type::GetHeaders => {
Message::GetHeaders(loc) => {
// load headers from the locator
let loc: Locator = msg.body()?;
let headers = adapter.locate_headers(&loc.hashes)?;
// serialize and send all the headers over
Ok(Some(Msg::new(
Consumed::Response(Msg::new(
Type::Headers,
Headers { headers },
self.peer_info.version,
)?))
)?)
}
// "header first" block propagation - if we have not yet seen this block
// we can go request it from some of our peers
Type::Header => {
let header: core::UntrustedBlockHeader = msg.body()?;
Message::Header(header) => {
adapter.header_received(header.into(), &self.peer_info)?;
Ok(None)
Consumed::None
}
Type::Headers => {
let mut total_bytes_read = 0;
// Read the count (u16) so we now how many headers to read.
let (count, bytes_read): (u16, _) = msg.streaming_read()?;
total_bytes_read += bytes_read;
// Read chunks of headers off the stream and pass them off to the adapter.
let chunk_size = 32u16;
let mut headers = Vec::with_capacity(chunk_size as usize);
for i in 1..=count {
let (header, bytes_read) =
msg.streaming_read::<core::UntrustedBlockHeader>()?;
headers.push(header.into());
total_bytes_read += bytes_read;
if i % chunk_size == 0 || i == count {
adapter.headers_received(&headers, &self.peer_info)?;
headers.clear();
}
}
// Now check we read the correct total number of bytes off the stream.
if total_bytes_read != msg.header.msg_len {
return Err(Error::MsgLen);
}
Ok(None)
Message::Headers(headers) => {
adapter.headers_received(&headers, &self.peer_info)?;
Consumed::None
}
Type::GetPeerAddrs => {
let get_peers: GetPeerAddrs = msg.body()?;
Message::GetPeerAddrs(get_peers) => {
let peers = adapter.find_peer_addrs(get_peers.capabilities);
Ok(Some(Msg::new(
Consumed::Response(Msg::new(
Type::PeerAddrs,
PeerAddrs { peers },
self.peer_info.version,
)?))
)?)
}
Type::PeerAddrs => {
let peer_addrs: PeerAddrs = msg.body()?;
Message::PeerAddrs(peer_addrs) => {
adapter.peer_addrs_received(peer_addrs.peers);
Ok(None)
Consumed::None
}
Type::TxHashSetRequest => {
let sm_req: TxHashSetRequest = msg.body()?;
Message::TxHashSetRequest(sm_req) => {
debug!(
"handle_payload: txhashset req for {} at {}",
sm_req.hash, sm_req.height
@ -288,14 +237,13 @@ impl MessageHandler for Protocol {
self.peer_info.version,
)?;
resp.add_attachment(txhashset.reader);
Ok(Some(resp))
Consumed::Response(resp)
} else {
Ok(None)
Consumed::None
}
}
Type::TxHashSetArchive => {
let sm_arch: TxHashSetArchive = msg.body()?;
Message::TxHashSetArchive(sm_arch) => {
debug!(
"handle_payload: txhashset archive for {} at {}. size={}",
sm_arch.hash, sm_arch.height, sm_arch.bytes,
@ -313,93 +261,34 @@ impl MessageHandler for Protocol {
// Update the sync state requested status
self.state_sync_requested.store(false, Ordering::Relaxed);
let download_start_time = Utc::now();
let start_time = Utc::now();
self.adapter
.txhashset_download_update(download_start_time, 0, sm_arch.bytes);
.txhashset_download_update(start_time, 0, sm_arch.bytes);
let nonce: u32 = thread_rng().gen_range(0, 1_000_000);
let tmp = self.adapter.get_tmpfile_pathname(format!(
let path = self.adapter.get_tmpfile_pathname(format!(
"txhashset-{}-{}.zip",
download_start_time.timestamp(),
start_time.timestamp(),
nonce
));
let mut now = Instant::now();
let mut save_txhashset_to_file = |file| -> Result<(), Error> {
let mut tmp_zip =
BufWriter::new(OpenOptions::new().write(true).create_new(true).open(file)?);
let total_size = sm_arch.bytes as usize;
let mut downloaded_size: usize = 0;
let mut request_size = cmp::min(48_000, total_size);
while request_size > 0 {
let size = msg.copy_attachment(request_size, &mut tmp_zip)?;
downloaded_size += size;
request_size = cmp::min(48_000, total_size - downloaded_size);
self.adapter.txhashset_download_update(
download_start_time,
downloaded_size as u64,
total_size as u64,
);
if now.elapsed().as_secs() > 10 {
now = Instant::now();
debug!(
"handle_payload: txhashset archive: {}/{}",
downloaded_size, total_size
);
}
// Increase received bytes quietly (without affecting the counters).
// Otherwise we risk banning a peer as "abusive".
tracker.inc_quiet_received(size as u64);
// check the close channel
if stopped.load(Ordering::Relaxed) {
debug!("stopping txhashset download early");
return Err(Error::ConnectionClose);
}
}
debug!(
"handle_payload: txhashset archive: {}/{} ... DONE",
downloaded_size, total_size
);
tmp_zip
.into_inner()
.map_err(|_| Error::Internal)?
.sync_all()?;
Ok(())
let file = fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(path.clone())?;
let meta = AttachmentMeta {
size: sm_arch.bytes as usize,
hash: sm_arch.hash,
height: sm_arch.height,
start_time,
path,
};
if let Err(e) = save_txhashset_to_file(tmp.clone()) {
error!(
"handle_payload: txhashset archive save to file fail. err={:?}",
e
);
return Err(e);
}
trace!(
"handle_payload: txhashset archive save to file {:?} success",
tmp,
);
let tmp_zip = File::open(tmp.clone())?;
let res = self
.adapter
.txhashset_write(sm_arch.hash, tmp_zip, &self.peer_info)?;
debug!(
"handle_payload: txhashset archive for {} at {}, DONE. Data Ok: {}",
sm_arch.hash, sm_arch.height, !res
);
if let Err(e) = fs::remove_file(tmp.clone()) {
warn!("fail to remove tmp file: {:?}. err: {}", tmp, e);
}
Ok(None)
Consumed::Attachment(Arc::new(meta), file)
}
Type::Error | Type::Hand | Type::Shake => {
debug!("Received an unexpected msg: {:?}", msg.header.msg_type);
Ok(None)
}
}
Message::Unknown(_) => Consumed::None,
};
Ok(consumed)
}
}

View file

@ -71,6 +71,7 @@ pub enum Error {
Connection(io::Error),
/// Header type does not match the expected message type
BadMessage,
UnexpectedMessage,
MsgLen,
Banned,
ConnectionClose,
@ -659,3 +660,19 @@ pub trait NetAdapter: ChainAdapter {
/// Is this peer currently banned?
fn is_banned(&self, addr: PeerAddr) -> bool;
}
#[derive(Clone, Debug)]
pub struct AttachmentMeta {
pub size: usize,
pub hash: Hash,
pub height: u64,
pub start_time: DateTime<Utc>,
pub path: PathBuf,
}
#[derive(Clone, Debug)]
pub struct AttachmentUpdate {
pub read: usize,
pub left: usize,
pub meta: Arc<AttachmentMeta>,
}