From defc714f6e5920f6ee5f13f5af379735fce1a9e6 Mon Sep 17 00:00:00 2001 From: jaspervdm Date: Mon, 28 Sep 2020 15:43:37 +0200 Subject: [PATCH] Refactor p2p reader (#3433) * Refactor p2p reader * Rename Output enum * Consume enum takes owned values instead of references * Deserialization in codec, remove Consume enum * Calculate block header size * Read headers in batches * Remove headers type from deserializer --- Cargo.lock | 2 + core/Cargo.toml | 1 + core/src/global.rs | 50 ++++++- core/src/pow/types.rs | 6 +- core/src/ser.rs | 110 ++++++++++++++ p2p/Cargo.toml | 1 + p2p/src/codec.rs | 245 ++++++++++++++++++++++++++++++ p2p/src/conn.rs | 189 ++++++++++------------- p2p/src/lib.rs | 1 + p2p/src/msg.rs | 83 ++++++++++- p2p/src/protocol.rs | 337 ++++++++++++++---------------------------- p2p/src/types.rs | 17 +++ 12 files changed, 701 insertions(+), 341 deletions(-) create mode 100644 p2p/src/codec.rs diff --git a/Cargo.lock b/Cargo.lock index 2cecdad35..437057df0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1027,6 +1027,7 @@ version = "4.2.0-alpha.1" dependencies = [ "blake2-rfc", "byteorder", + "bytes", "chrono", "croaring-mw", "enum_primitive", @@ -1073,6 +1074,7 @@ name = "grin_p2p" version = "4.2.0-alpha.1" dependencies = [ "bitflags 1.2.1", + "bytes", "chrono", "enum_primitive", "grin_chain", diff --git a/core/Cargo.toml b/core/Cargo.toml index 849dc3ec8..21ac4868b 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -27,6 +27,7 @@ siphasher = "0.3" log = "0.4" chrono = { version = "0.4.11", features = ["serde"] } zeroize = { version = "1.1", features =["zeroize_derive"] } +bytes = "0.5" keychain = { package = "grin_keychain", path = "../keychain", version = "4.2.0-alpha.1" } util = { package = "grin_util", path = "../util", version = "4.2.0-alpha.1" } diff --git a/core/src/global.rs b/core/src/global.rs index 5d2732b0d..6678b0f36 100644 --- a/core/src/global.rs +++ b/core/src/global.rs @@ -26,7 +26,7 @@ use crate::core::block::HeaderVersion; use crate::{ pow::{ self, new_cuckaroo_ctx, new_cuckarood_ctx, new_cuckaroom_ctx, new_cuckarooz_ctx, - new_cuckatoo_ctx, PoWContext, + new_cuckatoo_ctx, BitVec, PoWContext, }, ser::ProtocolVersion, }; @@ -411,3 +411,51 @@ where last_n.reverse(); last_n } + +/// Calculates the size of a header (in bytes) given a number of edge bits in the PoW +#[inline] +pub fn header_size_bytes(edge_bits: u8) -> usize { + let size = 2 + 2 * 8 + 5 * 32 + 32 + 2 * 8; + let proof_size = 8 + 4 + 8 + 1 + BitVec::bytes_len(edge_bits as usize * proofsize()); + size + proof_size +} + +#[cfg(test)] +mod test { + use super::*; + use crate::core::Block; + use crate::genesis::*; + use crate::pow::mine_genesis_block; + use crate::ser::{BinWriter, Writeable}; + + fn test_header_len(genesis: Block) { + let mut raw = Vec::::with_capacity(1_024); + let mut writer = BinWriter::new(&mut raw, ProtocolVersion::local()); + genesis.header.write(&mut writer).unwrap(); + assert_eq!(raw.len(), header_size_bytes(genesis.header.pow.edge_bits())); + } + + #[test] + fn automated_testing_header_len() { + set_local_chain_type(ChainTypes::AutomatedTesting); + test_header_len(mine_genesis_block().unwrap()); + } + + #[test] + fn user_testing_header_len() { + set_local_chain_type(ChainTypes::UserTesting); + test_header_len(mine_genesis_block().unwrap()); + } + + #[test] + fn floonet_header_len() { + set_local_chain_type(ChainTypes::Floonet); + test_header_len(genesis_floo()); + } + + #[test] + fn mainnet_header_len() { + set_local_chain_type(ChainTypes::Mainnet); + test_header_len(genesis_main()); + } +} diff --git a/core/src/pow/types.rs b/core/src/pow/types.rs index 0f75c33bd..07824359e 100644 --- a/core/src/pow/types.rs +++ b/core/src/pow/types.rs @@ -471,15 +471,17 @@ impl Writeable for Proof { } } +/// A bit vector // TODO this could likely be optimized by writing whole bytes (or even words) // in the `BitVec` at once, dealing with the truncation, instead of bits by bits -struct BitVec { +pub struct BitVec { bits: Vec, } impl BitVec { /// Number of bytes required to store the provided number of bits - fn bytes_len(bits_len: usize) -> usize { + #[inline] + pub fn bytes_len(bits_len: usize) -> usize { (bits_len + 7) / 8 } diff --git a/core/src/ser.rs b/core/src/ser.rs index b6dcbdaad..eb5af150d 100644 --- a/core/src/ser.rs +++ b/core/src/ser.rs @@ -22,6 +22,7 @@ use crate::core::hash::{DefaultHashable, Hash, Hashed}; use crate::global::PROTOCOL_VERSION; use byteorder::{BigEndian, ByteOrder, ReadBytesExt}; +use bytes::Buf; use keychain::{BlindingFactor, Identifier, IDENTIFIER_SIZE}; use std::convert::TryInto; use std::fmt::{self, Debug}; @@ -79,6 +80,12 @@ impl From for Error { } } +impl From for Error { + fn from(e: io::ErrorKind) -> Error { + Error::IOErr(format!("{}", io::Error::from(e)), e) + } +} + impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match *self { @@ -575,6 +582,109 @@ impl<'a> Reader for StreamingReader<'a> { } } +/// Protocol version-aware wrapper around a `Buf` impl +pub struct BufReader<'a, B: Buf> { + inner: &'a mut B, + version: ProtocolVersion, + bytes_read: usize, +} + +impl<'a, B: Buf> BufReader<'a, B> { + /// Construct a new BufReader + pub fn new(buf: &'a mut B, version: ProtocolVersion) -> Self { + Self { + inner: buf, + version, + bytes_read: 0, + } + } + + /// Check whether the buffer has enough bytes remaining to perform a read + fn has_remaining(&mut self, len: usize) -> Result<(), Error> { + if self.inner.remaining() >= len { + self.bytes_read += len; + Ok(()) + } else { + Err(io::ErrorKind::UnexpectedEof.into()) + } + } + + /// The total bytes read + pub fn bytes_read(&self) -> u64 { + self.bytes_read as u64 + } + + /// Convenience function to read from the buffer and deserialize + pub fn body(&mut self) -> Result { + T::read(self) + } +} + +impl<'a, B: Buf> Reader for BufReader<'a, B> { + fn read_u8(&mut self) -> Result { + self.has_remaining(1)?; + Ok(self.inner.get_u8()) + } + + fn read_u16(&mut self) -> Result { + self.has_remaining(2)?; + Ok(self.inner.get_u16()) + } + + fn read_u32(&mut self) -> Result { + self.has_remaining(4)?; + Ok(self.inner.get_u32()) + } + + fn read_u64(&mut self) -> Result { + self.has_remaining(8)?; + Ok(self.inner.get_u64()) + } + + fn read_i32(&mut self) -> Result { + self.has_remaining(4)?; + Ok(self.inner.get_i32()) + } + + fn read_i64(&mut self) -> Result { + self.has_remaining(8)?; + Ok(self.inner.get_i64()) + } + + fn read_bytes_len_prefix(&mut self) -> Result, Error> { + let len = self.read_u64()?; + self.read_fixed_bytes(len as usize) + } + + fn read_fixed_bytes(&mut self, len: usize) -> Result, Error> { + // not reading more than 100k bytes in a single read + if len > 100_000 { + return Err(Error::TooLargeReadErr); + } + self.has_remaining(len)?; + + let mut buf = vec![0; len]; + self.inner.copy_to_slice(&mut buf[..]); + Ok(buf) + } + + fn expect_u8(&mut self, val: u8) -> Result { + let b = self.read_u8()?; + if b == val { + Ok(b) + } else { + Err(Error::UnexpectedData { + expected: vec![val], + received: vec![b], + }) + } + } + + fn protocol_version(&self) -> ProtocolVersion { + self.version + } +} + impl Readable for Commitment { fn read(reader: &mut R) -> Result { let a = reader.read_fixed_bytes(PEDERSEN_COMMITMENT_SIZE)?; diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 8429d8c3a..ce9cb3774 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -20,6 +20,7 @@ serde_derive = "1" tempfile = "3.1" log = "0.4" chrono = { version = "0.4.11", features = ["serde"] } +bytes = "0.5" grin_core = { path = "../core", version = "4.2.0-alpha.1" } grin_store = { path = "../store", version = "4.2.0-alpha.1" } diff --git a/p2p/src/codec.rs b/p2p/src/codec.rs new file mode 100644 index 000000000..9fd6043c4 --- /dev/null +++ b/p2p/src/codec.rs @@ -0,0 +1,245 @@ +use crate::core::core::block::{BlockHeader, UntrustedBlockHeader}; +use crate::core::global::header_size_bytes; +use crate::core::ser::{BufReader, ProtocolVersion, Readable}; +use crate::msg::{Message, MsgHeader, MsgHeaderWrapper, Type}; +use crate::types::{AttachmentMeta, AttachmentUpdate, Error}; +use bytes::{Buf, BufMut, Bytes, BytesMut}; +use core::ser::Reader; +use std::cmp::min; +use std::io::Read; +use std::mem; +use std::net::TcpStream; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use MsgHeaderWrapper::*; +use State::*; + +const HEADER_IO_TIMEOUT: Duration = Duration::from_millis(2000); +pub const BODY_IO_TIMEOUT: Duration = Duration::from_millis(60000); +const HEADER_BATCH_SIZE: usize = 32; + +enum State { + None, + Header(MsgHeaderWrapper), + BlockHeaders { + bytes_left: usize, + items_left: usize, + headers: Vec, + }, + Attachment(usize, Arc, Instant), +} + +impl State { + fn is_none(&self) -> bool { + match self { + State::None => true, + _ => false, + } + } +} + +pub struct Codec { + pub version: ProtocolVersion, + stream: TcpStream, + buffer: BytesMut, + state: State, + bytes_read: usize, +} + +impl Codec { + pub fn new(version: ProtocolVersion, stream: TcpStream) -> Self { + Self { + version, + stream, + buffer: BytesMut::with_capacity(8 * 1024), + state: None, + bytes_read: 0, + } + } + + /// Destroy the codec and return the reader + pub fn stream(self) -> TcpStream { + self.stream + } + + /// Inform codec next `len` bytes are an attachment + /// Panics if already reading a body + pub fn expect_attachment(&mut self, meta: Arc) { + assert!(self.state.is_none()); + self.state = Attachment(meta.size, meta, Instant::now()); + } + + /// Length of the next item we are expecting, could be msg header, body, block header or attachment chunk + fn next_len(&self) -> usize { + match &self.state { + None => MsgHeader::LEN, + Header(Known(h)) if h.msg_type == Type::Headers => { + // If we are receiving a list of headers, read off the item count first + min(h.msg_len as usize, 2) + } + Header(Known(header)) => header.msg_len as usize, + Header(Unknown(len, _)) => *len as usize, + BlockHeaders { bytes_left, .. } => { + // The header length varies with the number of edge bits. Therefore we overestimate + // its size and only actually read the bytes we need + min(*bytes_left, header_size_bytes(63)) + } + Attachment(left, _, _) => min(*left, 48_000), + } + } + + /// Set stream timeout depending on the next expected item + fn set_stream_timeout(&self) -> Result<(), Error> { + let timeout = match &self.state { + None => HEADER_IO_TIMEOUT, + _ => BODY_IO_TIMEOUT, + }; + self.stream.set_read_timeout(Some(timeout))?; + Ok(()) + } + + fn read_inner(&mut self) -> Result { + self.bytes_read = 0; + loop { + let next_len = self.next_len(); + let pre_len = self.buffer.len(); + // Buffer could already be partially filled, calculate additional bytes we need + let to_read = next_len.saturating_sub(pre_len); + if to_read > 0 { + self.buffer.reserve(to_read); + for _ in 0..to_read { + self.buffer.put_u8(0); + } + self.set_stream_timeout()?; + if let Err(e) = self.stream.read_exact(&mut self.buffer[pre_len..]) { + // Undo reserved bytes on a failed read + self.buffer.truncate(pre_len); + return Err(e.into()); + } + self.bytes_read += to_read; + } + match &mut self.state { + None => { + // Parse header and keep reading + let mut raw = self.buffer.split_to(next_len).freeze(); + let mut reader = BufReader::new(&mut raw, self.version); + let header = MsgHeaderWrapper::read(&mut reader)?; + self.state = Header(header); + } + Header(Known(header)) => { + let mut raw = self.buffer.split_to(next_len).freeze(); + if header.msg_type == Type::Headers { + // Special consideration for a list of headers, as we want to verify and process + // them as they come in instead of only after the full list has been received + let mut reader = BufReader::new(&mut raw, self.version); + let items_left = reader.read_u16()? as usize; + self.state = BlockHeaders { + bytes_left: header.msg_len as usize - 2, + items_left, + headers: Vec::with_capacity(min(HEADER_BATCH_SIZE, items_left)), + }; + } else { + // Return full message + let msg = decode_message(header, &mut raw, self.version); + self.state = None; + return msg; + } + } + Header(Unknown(_, msg_type)) => { + // Discard body and return + let msg_type = *msg_type; + self.buffer.advance(next_len); + self.state = None; + return Ok(Message::Unknown(msg_type)); + } + BlockHeaders { + bytes_left, + items_left, + headers, + } => { + if *bytes_left == 0 { + // Incorrect item count + self.state = None; + return Err(Error::BadMessage); + } + + let mut reader = BufReader::new(&mut self.buffer, self.version); + let header: UntrustedBlockHeader = reader.body()?; + let bytes_read = reader.bytes_read() as usize; + headers.push(header.into()); + *bytes_left = bytes_left.saturating_sub(bytes_read); + *items_left -= 1; + + if headers.len() == HEADER_BATCH_SIZE || *items_left == 0 { + let mut h = Vec::with_capacity(min(HEADER_BATCH_SIZE, *items_left)); + mem::swap(headers, &mut h); + if *items_left == 0 { + let bytes_left = *bytes_left; + self.state = None; + if bytes_left > 0 { + return Err(Error::BadMessage); + } + } + return Ok(Message::Headers(h)); + } + } + Attachment(left, meta, now) => { + let raw = self.buffer.split_to(next_len).freeze(); + *left -= next_len; + if now.elapsed().as_secs() > 10 { + *now = Instant::now(); + debug!("attachment: {}/{}", meta.size - *left, meta.size); + } + let update = AttachmentUpdate { + read: next_len, + left: *left, + meta: Arc::clone(meta), + }; + if *left == 0 { + self.state = None; + debug!("attachment: DONE"); + } + return Ok(Message::Attachment(update, Some(raw))); + } + } + } + } + + /// Blocking read of the next message + pub fn read(&mut self) -> (Result, u64) { + let msg = self.read_inner(); + (msg, self.bytes_read as u64) + } +} + +// TODO: replace with a macro? +fn decode_message( + header: &MsgHeader, + body: &mut Bytes, + version: ProtocolVersion, +) -> Result { + let mut msg = BufReader::new(body, version); + let c = match header.msg_type { + Type::Ping => Message::Ping(msg.body()?), + Type::Pong => Message::Pong(msg.body()?), + Type::BanReason => Message::BanReason(msg.body()?), + Type::TransactionKernel => Message::TransactionKernel(msg.body()?), + Type::GetTransaction => Message::GetTransaction(msg.body()?), + Type::Transaction => Message::Transaction(msg.body()?), + Type::StemTransaction => Message::StemTransaction(msg.body()?), + Type::GetBlock => Message::GetBlock(msg.body()?), + Type::Block => Message::Block(msg.body()?), + Type::GetCompactBlock => Message::GetCompactBlock(msg.body()?), + Type::CompactBlock => Message::CompactBlock(msg.body()?), + Type::GetHeaders => Message::GetHeaders(msg.body()?), + Type::Header => Message::Header(msg.body()?), + Type::GetPeerAddrs => Message::GetPeerAddrs(msg.body()?), + Type::PeerAddrs => Message::PeerAddrs(msg.body()?), + Type::TxHashSetRequest => Message::TxHashSetRequest(msg.body()?), + Type::TxHashSetArchive => Message::TxHashSetArchive(msg.body()?), + Type::Error | Type::Hand | Type::Shake | Type::Headers => { + return Err(Error::UnexpectedMessage) + } + }; + Ok(c) +} diff --git a/p2p/src/conn.rs b/p2p/src/conn.rs index f1a7c9b35..6be82190d 100644 --- a/p2p/src/conn.rs +++ b/p2p/src/conn.rs @@ -20,40 +20,28 @@ //! forces us to go through some additional gymnastic to loop over the async //! stream and make sure we get the right number of bytes out. -use crate::core::ser; +use crate::codec::{Codec, BODY_IO_TIMEOUT}; use crate::core::ser::ProtocolVersion; -use crate::msg::{ - read_body, read_discard, read_header, read_item, write_message, Msg, MsgHeader, - MsgHeaderWrapper, -}; +use crate::msg::{write_message, Consumed, Message, Msg}; use crate::types::Error; use crate::util::{RateCounter, RwLock}; -use std::io::{self, Read, Write}; +use std::fs::File; +use std::io::{self, Write}; use std::net::{Shutdown, TcpStream}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::RecvTimeoutError; use std::sync::{mpsc, Arc}; +use std::thread::{self, JoinHandle}; use std::time::Duration; -use std::{ - cmp, - thread::{self, JoinHandle}, -}; pub const SEND_CHANNEL_CAP: usize = 100; -const HEADER_IO_TIMEOUT: Duration = Duration::from_millis(2000); const CHANNEL_TIMEOUT: Duration = Duration::from_millis(1000); -const BODY_IO_TIMEOUT: Duration = Duration::from_millis(60000); /// A trait to be implemented in order to receive messages from the /// connection. Allows providing an optional response. pub trait MessageHandler: Send + 'static { - fn consume<'a, R: Read>( - &self, - msg: Message<'a, R>, - stopped: Arc, - tracker: Arc, - ) -> Result, Error>; + fn consume(&self, message: Message) -> Result; } // Macro to simplify the boilerplate around I/O and Grin error handling @@ -79,54 +67,6 @@ macro_rules! try_break { }; } -macro_rules! try_header { - ($res:expr, $conn: expr) => {{ - let _ = $conn.set_read_timeout(Some(HEADER_IO_TIMEOUT)); - try_break!($res) - }}; -} - -/// A message as received by the connection. Provides access to the message -/// header lazily consumes the message body, handling its deserialization. -pub struct Message<'a, R: Read> { - pub header: MsgHeader, - stream: &'a mut R, - version: ProtocolVersion, -} - -impl<'a, R: Read> Message<'a, R> { - fn from_header(header: MsgHeader, stream: &'a mut R, version: ProtocolVersion) -> Self { - Message { - header, - stream, - version, - } - } - - /// Read the message body from the underlying connection - pub fn body(&mut self) -> Result { - read_body(&self.header, self.stream, self.version) - } - - /// Read a single "thing" from the underlying connection. - /// Return the thing and the total bytes read. - pub fn streaming_read(&mut self) -> Result<(T, u64), Error> { - read_item(self.stream, self.version) - } - - pub fn copy_attachment(&mut self, len: usize, writer: &mut dyn Write) -> Result { - let mut written = 0; - while written < len { - let read_len = cmp::min(8000, len - written); - let mut buf = vec![0u8; read_len]; - self.stream.read_exact(&mut buf[..])?; - writer.write_all(&buf)?; - written += read_len; - } - Ok(written) - } -} - pub struct StopHandle { /// Channel to close the connection stopped: Arc, @@ -281,7 +221,7 @@ where H: MessageHandler, { // Split out tcp stream out into separate reader/writer halves. - let mut reader = conn.try_clone().expect("clone conn for reader failed"); + let reader = conn.try_clone().expect("clone conn for reader failed"); let mut writer = conn.try_clone().expect("clone conn for writer failed"); let reader_stopped = stopped.clone(); @@ -291,58 +231,83 @@ where let reader_thread = thread::Builder::new() .name("peer_read".to_string()) .spawn(move || { + let peer_addr = reader + .peer_addr() + .map(|a| a.to_string()) + .unwrap_or_else(|_| "?".to_owned()); + let mut codec = Codec::new(version, reader); + let mut attachment: Option = None; loop { - // check the read end - match try_header!(read_header(&mut reader, version), &reader) { - Some(MsgHeaderWrapper::Known(header)) => { - let _ = reader.set_read_timeout(Some(BODY_IO_TIMEOUT)); - let msg = Message::from_header(header, &mut reader, version); - - trace!( - "Received message header, type {:?}, len {}.", - msg.header.msg_type, - msg.header.msg_len - ); - - // Increase received bytes counter - reader_tracker.inc_received(MsgHeader::LEN as u64 + msg.header.msg_len); - - let resp_msg = try_break!(handler.consume( - msg, - reader_stopped.clone(), - reader_tracker.clone() - )); - if let Some(Some(resp_msg)) = resp_msg { - try_break!(conn_handle.send(resp_msg)); - } - } - Some(MsgHeaderWrapper::Unknown(msg_len, type_byte)) => { - debug!( - "Received unknown message header, type {:?}, len {}.", - type_byte, msg_len - ); - // Increase received bytes counter - reader_tracker.inc_received(MsgHeader::LEN as u64 + msg_len); - - try_break!(read_discard(msg_len, &mut reader)); - } - None => {} - } - // check the close channel if reader_stopped.load(Ordering::Relaxed) { break; } + + // check the read end + let (next, bytes_read) = codec.read(); + + // increase the appropriate counter + match &next { + Ok(Message::Attachment(_, _)) => reader_tracker.inc_quiet_received(bytes_read), + _ => reader_tracker.inc_received(bytes_read), + } + + let message = match try_break!(next) { + Some(Message::Unknown(type_byte)) => { + debug!( + "Received unknown message, type {:?}, len {}.", + type_byte, bytes_read + ); + continue; + } + Some(Message::Attachment(update, bytes)) => { + let a = match &mut attachment { + Some(a) => a, + None => { + error!("Received unexpected attachment chunk"); + break; + } + }; + + let bytes = bytes.unwrap(); + if let Err(e) = a.write_all(&bytes) { + error!("Unable to write attachment file: {}", e); + break; + } + if update.left == 0 { + if let Err(e) = a.sync_all() { + error!("Unable to sync attachment file: {}", e); + break; + } + attachment.take(); + } + + Message::Attachment(update, None) + } + Some(message) => { + trace!("Received message, type {}, len {}.", message, bytes_read); + message + } + None => continue, + }; + + let consumed = try_break!(handler.consume(message)).unwrap_or(Consumed::None); + match consumed { + Consumed::Response(resp_msg) => { + try_break!(conn_handle.send(resp_msg)); + } + Consumed::Attachment(meta, file) => { + // Start attachment + codec.expect_attachment(meta); + attachment = Some(file); + } + Consumed::Disconnect => break, + Consumed::None => {} + } } - debug!( - "Shutting down reader connection with {}", - reader - .peer_addr() - .map(|a| a.to_string()) - .unwrap_or_else(|_| "?".to_owned()) - ); - let _ = reader.shutdown(Shutdown::Both); + debug!("Shutting down reader connection with {}", peer_addr); + let _ = codec.stream().shutdown(Shutdown::Both); })?; let writer_thread = thread::Builder::new() diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index 5cc1b2168..203001a2e 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -36,6 +36,7 @@ extern crate serde_derive; #[macro_use] extern crate log; +mod codec; mod conn; pub mod handshake; pub mod msg; diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index d9a5cef8e..11a201359 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -16,16 +16,21 @@ use crate::conn::Tracker; use crate::core::core::hash::Hash; -use crate::core::core::BlockHeader; +use crate::core::core::{ + BlockHeader, Transaction, UntrustedBlock, UntrustedBlockHeader, UntrustedCompactBlock, +}; use crate::core::pow::Difficulty; use crate::core::ser::{ self, ProtocolVersion, Readable, Reader, StreamingReader, Writeable, Writer, }; use crate::core::{consensus, global}; use crate::types::{ - Capabilities, Error, PeerAddr, ReasonForBan, MAX_BLOCK_HEADERS, MAX_LOCATORS, MAX_PEER_ADDRS, + AttachmentMeta, AttachmentUpdate, Capabilities, Error, PeerAddr, ReasonForBan, + MAX_BLOCK_HEADERS, MAX_LOCATORS, MAX_PEER_ADDRS, }; +use bytes::Bytes; use num::FromPrimitive; +use std::fmt; use std::fs::File; use std::io::{Read, Write}; use std::sync::Arc; @@ -704,3 +709,77 @@ impl Readable for TxHashSetArchive { }) } } + +pub enum Message { + Unknown(u8), + Ping(Ping), + Pong(Pong), + BanReason(BanReason), + TransactionKernel(Hash), + GetTransaction(Hash), + Transaction(Transaction), + StemTransaction(Transaction), + GetBlock(Hash), + Block(UntrustedBlock), + GetCompactBlock(Hash), + CompactBlock(UntrustedCompactBlock), + GetHeaders(Locator), + Header(UntrustedBlockHeader), + Headers(Vec), + GetPeerAddrs(GetPeerAddrs), + PeerAddrs(PeerAddrs), + TxHashSetRequest(TxHashSetRequest), + TxHashSetArchive(TxHashSetArchive), + Attachment(AttachmentUpdate, Option), +} + +impl fmt::Display for Message { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Message::Unknown(_) => write!(f, "unknown"), + Message::Ping(_) => write!(f, "ping"), + Message::Pong(_) => write!(f, "pong"), + Message::BanReason(_) => write!(f, "ban reason"), + Message::TransactionKernel(_) => write!(f, "tx kernel"), + Message::GetTransaction(_) => write!(f, "get tx"), + Message::Transaction(_) => write!(f, "tx"), + Message::StemTransaction(_) => write!(f, "stem tx"), + Message::GetBlock(_) => write!(f, "get block"), + Message::Block(_) => write!(f, "block"), + Message::GetCompactBlock(_) => write!(f, "get compact block"), + Message::CompactBlock(_) => write!(f, "compact block"), + Message::GetHeaders(_) => write!(f, "get headers"), + Message::Header(_) => write!(f, "header"), + Message::Headers(_) => write!(f, "headers"), + Message::GetPeerAddrs(_) => write!(f, "get peer addrs"), + Message::PeerAddrs(_) => write!(f, "peer addrs"), + Message::TxHashSetRequest(_) => write!(f, "tx hash set request"), + Message::TxHashSetArchive(_) => write!(f, "tx hash set"), + Message::Attachment(_, _) => write!(f, "attachment"), + } + } +} + +impl fmt::Debug for Message { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Consume({})", self) + } +} + +pub enum Consumed { + Response(Msg), + Attachment(Arc, File), + None, + Disconnect, +} + +impl fmt::Debug for Consumed { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Consumed::Response(msg) => write!(f, "Consumed::Response({:?})", msg.header.msg_type), + Consumed::Attachment(meta, _) => write!(f, "Consumed::Attachment({:?})", meta.size), + Consumed::None => write!(f, "Consumed::None"), + Consumed::Disconnect => write!(f, "Consumed::Disconnect"), + } + } +} diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index 1112b3078..15f869497 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -13,21 +13,16 @@ // limitations under the License. use crate::chain; -use crate::conn::{Message, MessageHandler, Tracker}; -use crate::core::core::{self, hash::Hash, hash::Hashed, CompactBlock}; -use crate::msg::{ - BanReason, GetPeerAddrs, Headers, Locator, Msg, PeerAddrs, Ping, Pong, TxHashSetArchive, - TxHashSetRequest, Type, -}; -use crate::types::{Error, NetAdapter, PeerInfo}; +use crate::conn::MessageHandler; +use crate::core::core::{hash::Hashed, CompactBlock}; + +use crate::msg::{Consumed, Headers, Message, Msg, PeerAddrs, Pong, TxHashSetArchive, Type}; +use crate::types::{AttachmentMeta, Error, NetAdapter, PeerInfo}; use chrono::prelude::Utc; use rand::{thread_rng, Rng}; -use std::cmp; -use std::fs::{self, File, OpenOptions}; -use std::io::{BufWriter, Read}; +use std::fs::{self, File}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use std::time::Instant; pub struct Protocol { adapter: Arc, @@ -50,12 +45,7 @@ impl Protocol { } impl MessageHandler for Protocol { - fn consume( - &self, - mut msg: Message, - stopped: Arc, - tracker: Arc, - ) -> Result, Error> { + fn consume(&self, message: Message) -> Result { let adapter = &self.adapter; // If we received a msg from a banned peer then log and drop it. @@ -63,210 +53,169 @@ impl MessageHandler for Protocol { // banned peers up correctly? if adapter.is_banned(self.peer_info.addr) { debug!( - "handler: consume: peer {:?} banned, received: {:?}, dropping.", - self.peer_info.addr, msg.header.msg_type, + "handler: consume: peer {:?} banned, received: {}, dropping.", + self.peer_info.addr, message, ); - return Ok(None); + return Ok(Consumed::Disconnect); } - match msg.header.msg_type { - Type::Ping => { - let ping: Ping = msg.body()?; - adapter.peer_difficulty(self.peer_info.addr, ping.total_difficulty, ping.height); + let consumed = match message { + Message::Attachment(update, _) => { + self.adapter.txhashset_download_update( + update.meta.start_time, + (update.meta.size - update.left) as u64, + update.meta.size as u64, + ); - Ok(Some(Msg::new( + if update.left == 0 { + let meta = update.meta; + trace!( + "handle_payload: txhashset archive save to file {:?} success", + meta.path, + ); + + let zip = File::open(meta.path.clone())?; + let res = + self.adapter + .txhashset_write(meta.hash.clone(), zip, &self.peer_info)?; + + debug!( + "handle_payload: txhashset archive for {} at {}, DONE. Data Ok: {}", + meta.hash, meta.height, !res + ); + + if let Err(e) = fs::remove_file(meta.path.clone()) { + warn!("fail to remove tmp file: {:?}. err: {}", meta.path, e); + } + } + + Consumed::None + } + + Message::Ping(ping) => { + adapter.peer_difficulty(self.peer_info.addr, ping.total_difficulty, ping.height); + Consumed::Response(Msg::new( Type::Pong, Pong { total_difficulty: adapter.total_difficulty()?, height: adapter.total_height()?, }, self.peer_info.version, - )?)) + )?) } - Type::Pong => { - let pong: Pong = msg.body()?; + Message::Pong(pong) => { adapter.peer_difficulty(self.peer_info.addr, pong.total_difficulty, pong.height); - Ok(None) + Consumed::None } - Type::BanReason => { - let ban_reason: BanReason = msg.body()?; + Message::BanReason(ban_reason) => { error!("handle_payload: BanReason {:?}", ban_reason); - Ok(None) + Consumed::Disconnect } - Type::TransactionKernel => { - let h: Hash = msg.body()?; - debug!( - "handle_payload: received tx kernel: {}, msg_len: {}", - h, msg.header.msg_len - ); + Message::TransactionKernel(h) => { + debug!("handle_payload: received tx kernel: {}", h); adapter.tx_kernel_received(h, &self.peer_info)?; - Ok(None) + Consumed::None } - Type::GetTransaction => { - let h: Hash = msg.body()?; - debug!( - "handle_payload: GetTransaction: {}, msg_len: {}", - h, msg.header.msg_len, - ); + Message::GetTransaction(h) => { + debug!("handle_payload: GetTransaction: {}", h); let tx = adapter.get_transaction(h); if let Some(tx) = tx { - Ok(Some(Msg::new( - Type::Transaction, - tx, - self.peer_info.version, - )?)) + Consumed::Response(Msg::new(Type::Transaction, tx, self.peer_info.version)?) } else { - Ok(None) + Consumed::None } } - Type::Transaction => { - debug!( - "handle_payload: received tx: msg_len: {}", - msg.header.msg_len - ); - let tx: core::Transaction = msg.body()?; + Message::Transaction(tx) => { + debug!("handle_payload: received tx"); adapter.transaction_received(tx, false)?; - Ok(None) + Consumed::None } - Type::StemTransaction => { - debug!( - "handle_payload: received stem tx: msg_len: {}", - msg.header.msg_len - ); - let tx: core::Transaction = msg.body()?; + Message::StemTransaction(tx) => { + debug!("handle_payload: received stem tx"); adapter.transaction_received(tx, true)?; - Ok(None) + Consumed::None } - Type::GetBlock => { - let h: Hash = msg.body()?; - trace!( - "handle_payload: GetBlock: {}, msg_len: {}", - h, - msg.header.msg_len, - ); - + Message::GetBlock(h) => { + trace!("handle_payload: GetBlock: {}", h); let bo = adapter.get_block(h, &self.peer_info); if let Some(b) = bo { - return Ok(Some(Msg::new(Type::Block, b, self.peer_info.version)?)); + Consumed::Response(Msg::new(Type::Block, b, self.peer_info.version)?) + } else { + Consumed::None } - Ok(None) } - Type::Block => { - debug!( - "handle_payload: received block: msg_len: {}", - msg.header.msg_len - ); - let b: core::UntrustedBlock = msg.body()?; - + Message::Block(b) => { + debug!("handle_payload: received block"); // We default to NONE opts here as we do not know know yet why this block was // received. // If we requested this block from a peer due to our node syncing then // the peer adapter will override opts to reflect this. adapter.block_received(b.into(), &self.peer_info, chain::Options::NONE)?; - Ok(None) + Consumed::None } - Type::GetCompactBlock => { - let h: Hash = msg.body()?; + Message::GetCompactBlock(h) => { if let Some(b) = adapter.get_block(h, &self.peer_info) { let cb: CompactBlock = b.into(); - Ok(Some(Msg::new( - Type::CompactBlock, - cb, - self.peer_info.version, - )?)) + Consumed::Response(Msg::new(Type::CompactBlock, cb, self.peer_info.version)?) } else { - Ok(None) + Consumed::None } } - Type::CompactBlock => { - debug!( - "handle_payload: received compact block: msg_len: {}", - msg.header.msg_len - ); - let b: core::UntrustedCompactBlock = msg.body()?; - + Message::CompactBlock(b) => { + debug!("handle_payload: received compact block"); adapter.compact_block_received(b.into(), &self.peer_info)?; - Ok(None) + Consumed::None } - Type::GetHeaders => { + Message::GetHeaders(loc) => { // load headers from the locator - let loc: Locator = msg.body()?; let headers = adapter.locate_headers(&loc.hashes)?; // serialize and send all the headers over - Ok(Some(Msg::new( + Consumed::Response(Msg::new( Type::Headers, Headers { headers }, self.peer_info.version, - )?)) + )?) } // "header first" block propagation - if we have not yet seen this block // we can go request it from some of our peers - Type::Header => { - let header: core::UntrustedBlockHeader = msg.body()?; + Message::Header(header) => { adapter.header_received(header.into(), &self.peer_info)?; - Ok(None) + Consumed::None } - Type::Headers => { - let mut total_bytes_read = 0; - - // Read the count (u16) so we now how many headers to read. - let (count, bytes_read): (u16, _) = msg.streaming_read()?; - total_bytes_read += bytes_read; - - // Read chunks of headers off the stream and pass them off to the adapter. - let chunk_size = 32u16; - let mut headers = Vec::with_capacity(chunk_size as usize); - for i in 1..=count { - let (header, bytes_read) = - msg.streaming_read::()?; - headers.push(header.into()); - total_bytes_read += bytes_read; - if i % chunk_size == 0 || i == count { - adapter.headers_received(&headers, &self.peer_info)?; - headers.clear(); - } - } - - // Now check we read the correct total number of bytes off the stream. - if total_bytes_read != msg.header.msg_len { - return Err(Error::MsgLen); - } - - Ok(None) + Message::Headers(headers) => { + adapter.headers_received(&headers, &self.peer_info)?; + Consumed::None } - Type::GetPeerAddrs => { - let get_peers: GetPeerAddrs = msg.body()?; + Message::GetPeerAddrs(get_peers) => { let peers = adapter.find_peer_addrs(get_peers.capabilities); - Ok(Some(Msg::new( + Consumed::Response(Msg::new( Type::PeerAddrs, PeerAddrs { peers }, self.peer_info.version, - )?)) + )?) } - Type::PeerAddrs => { - let peer_addrs: PeerAddrs = msg.body()?; + Message::PeerAddrs(peer_addrs) => { adapter.peer_addrs_received(peer_addrs.peers); - Ok(None) + Consumed::None } - Type::TxHashSetRequest => { - let sm_req: TxHashSetRequest = msg.body()?; + Message::TxHashSetRequest(sm_req) => { debug!( "handle_payload: txhashset req for {} at {}", sm_req.hash, sm_req.height @@ -288,14 +237,13 @@ impl MessageHandler for Protocol { self.peer_info.version, )?; resp.add_attachment(txhashset.reader); - Ok(Some(resp)) + Consumed::Response(resp) } else { - Ok(None) + Consumed::None } } - Type::TxHashSetArchive => { - let sm_arch: TxHashSetArchive = msg.body()?; + Message::TxHashSetArchive(sm_arch) => { debug!( "handle_payload: txhashset archive for {} at {}. size={}", sm_arch.hash, sm_arch.height, sm_arch.bytes, @@ -313,93 +261,34 @@ impl MessageHandler for Protocol { // Update the sync state requested status self.state_sync_requested.store(false, Ordering::Relaxed); - let download_start_time = Utc::now(); + let start_time = Utc::now(); self.adapter - .txhashset_download_update(download_start_time, 0, sm_arch.bytes); + .txhashset_download_update(start_time, 0, sm_arch.bytes); let nonce: u32 = thread_rng().gen_range(0, 1_000_000); - let tmp = self.adapter.get_tmpfile_pathname(format!( + let path = self.adapter.get_tmpfile_pathname(format!( "txhashset-{}-{}.zip", - download_start_time.timestamp(), + start_time.timestamp(), nonce )); - let mut now = Instant::now(); - let mut save_txhashset_to_file = |file| -> Result<(), Error> { - let mut tmp_zip = - BufWriter::new(OpenOptions::new().write(true).create_new(true).open(file)?); - let total_size = sm_arch.bytes as usize; - let mut downloaded_size: usize = 0; - let mut request_size = cmp::min(48_000, total_size); - while request_size > 0 { - let size = msg.copy_attachment(request_size, &mut tmp_zip)?; - downloaded_size += size; - request_size = cmp::min(48_000, total_size - downloaded_size); - self.adapter.txhashset_download_update( - download_start_time, - downloaded_size as u64, - total_size as u64, - ); - if now.elapsed().as_secs() > 10 { - now = Instant::now(); - debug!( - "handle_payload: txhashset archive: {}/{}", - downloaded_size, total_size - ); - } - // Increase received bytes quietly (without affecting the counters). - // Otherwise we risk banning a peer as "abusive". - tracker.inc_quiet_received(size as u64); - // check the close channel - if stopped.load(Ordering::Relaxed) { - debug!("stopping txhashset download early"); - return Err(Error::ConnectionClose); - } - } - debug!( - "handle_payload: txhashset archive: {}/{} ... DONE", - downloaded_size, total_size - ); - tmp_zip - .into_inner() - .map_err(|_| Error::Internal)? - .sync_all()?; - Ok(()) + let file = fs::OpenOptions::new() + .write(true) + .create_new(true) + .open(path.clone())?; + + let meta = AttachmentMeta { + size: sm_arch.bytes as usize, + hash: sm_arch.hash, + height: sm_arch.height, + start_time, + path, }; - if let Err(e) = save_txhashset_to_file(tmp.clone()) { - error!( - "handle_payload: txhashset archive save to file fail. err={:?}", - e - ); - return Err(e); - } - - trace!( - "handle_payload: txhashset archive save to file {:?} success", - tmp, - ); - - let tmp_zip = File::open(tmp.clone())?; - let res = self - .adapter - .txhashset_write(sm_arch.hash, tmp_zip, &self.peer_info)?; - - debug!( - "handle_payload: txhashset archive for {} at {}, DONE. Data Ok: {}", - sm_arch.hash, sm_arch.height, !res - ); - - if let Err(e) = fs::remove_file(tmp.clone()) { - warn!("fail to remove tmp file: {:?}. err: {}", tmp, e); - } - - Ok(None) + Consumed::Attachment(Arc::new(meta), file) } - Type::Error | Type::Hand | Type::Shake => { - debug!("Received an unexpected msg: {:?}", msg.header.msg_type); - Ok(None) - } - } + Message::Unknown(_) => Consumed::None, + }; + Ok(consumed) } } diff --git a/p2p/src/types.rs b/p2p/src/types.rs index a019adbe6..2f93929eb 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -71,6 +71,7 @@ pub enum Error { Connection(io::Error), /// Header type does not match the expected message type BadMessage, + UnexpectedMessage, MsgLen, Banned, ConnectionClose, @@ -659,3 +660,19 @@ pub trait NetAdapter: ChainAdapter { /// Is this peer currently banned? fn is_banned(&self, addr: PeerAddr) -> bool; } + +#[derive(Clone, Debug)] +pub struct AttachmentMeta { + pub size: usize, + pub hash: Hash, + pub height: u64, + pub start_time: DateTime, + pub path: PathBuf, +} + +#[derive(Clone, Debug)] +pub struct AttachmentUpdate { + pub read: usize, + pub left: usize, + pub meta: Arc, +}