diff --git a/api/src/handlers/pool_api.rs b/api/src/handlers/pool_api.rs index da26e25a7..b4e49bd1e 100644 --- a/api/src/handlers/pool_api.rs +++ b/api/src/handlers/pool_api.rs @@ -15,7 +15,7 @@ use super::utils::w; use crate::core::core::hash::Hashed; use crate::core::core::Transaction; -use crate::core::ser; +use crate::core::ser::{self, ProtocolVersion}; use crate::pool; use crate::rest::*; use crate::router::{Handler, ResponseFuture}; @@ -64,7 +64,6 @@ impl PoolPushHandler { let fluff = params.get("fluff").is_some(); let pool_arc = match w(&self.tx_pool) { - //w(&self.tx_pool).clone(); Ok(p) => p, Err(e) => return Box::new(err(e)), }; @@ -76,7 +75,10 @@ impl PoolPushHandler { .map_err(|e| ErrorKind::RequestError(format!("Bad request: {}", e)).into()) }) .and_then(move |tx_bin| { - ser::deserialize(&mut &tx_bin[..]) + // TODO - pass protocol version in via the api call? + let version = ProtocolVersion::default(); + + ser::deserialize(&mut &tx_bin[..], version) .map_err(|e| ErrorKind::RequestError(format!("Bad request: {}", e)).into()) }) .and_then(move |tx: Transaction| { diff --git a/api/src/types.rs b/api/src/types.rs index b374f4757..3bb5b28c9 100644 --- a/api/src/types.rs +++ b/api/src/types.rs @@ -83,7 +83,7 @@ pub struct Status { impl Status { pub fn from_tip_and_peers(current_tip: chain::Tip, connections: u32) -> Status { Status { - protocol_version: p2p::msg::ProtocolVersion::default().into(), + protocol_version: ser::ProtocolVersion::default().into(), user_agent: p2p::msg::USER_AGENT.to_string(), connections: connections, tip: Tip::from_tip(current_tip), diff --git a/chain/src/chain.rs b/chain/src/chain.rs index ca5311595..ded052021 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -23,7 +23,7 @@ use crate::core::core::{ }; use crate::core::global; use crate::core::pow; -use crate::core::ser::{Readable, StreamingReader}; +use crate::core::ser::{ProtocolVersion, Readable, StreamingReader}; use crate::error::{Error, ErrorKind}; use crate::pipe; use crate::store; @@ -647,7 +647,7 @@ impl Chain { /// TODO - Write this data to disk and validate the rebuilt kernel MMR. pub fn kernel_data_write(&self, reader: &mut Read) -> Result<(), Error> { let mut count = 0; - let mut stream = StreamingReader::new(reader, Duration::from_secs(1)); + let mut stream = StreamingReader::new(reader, ProtocolVersion::default(), Duration::from_secs(1)); while let Ok(_kernel) = TxKernelEntry::read(&mut stream) { count += 1; } diff --git a/core/src/core/merkle_proof.rs b/core/src/core/merkle_proof.rs index c66c13c2e..747819c2c 100644 --- a/core/src/core/merkle_proof.rs +++ b/core/src/core/merkle_proof.rs @@ -17,7 +17,7 @@ use crate::core::hash::Hash; use crate::core::pmmr; use crate::ser; -use crate::ser::{PMMRIndexHashable, Readable, Reader, Writeable, Writer}; +use crate::ser::{PMMRIndexHashable, ProtocolVersion, Readable, Reader, Writeable, Writer}; use crate::util; /// Merkle proof errors. @@ -85,7 +85,7 @@ impl MerkleProof { /// Convert hex string representation back to a Merkle proof instance pub fn from_hex(hex: &str) -> Result { let bytes = util::from_hex(hex.to_string()).unwrap(); - let res = ser::deserialize(&mut &bytes[..]) + let res = ser::deserialize(&mut &bytes[..], ProtocolVersion::default()) .map_err(|_| "failed to deserialize a Merkle Proof".to_string())?; Ok(res) } diff --git a/core/src/core/transaction.rs b/core/src/core/transaction.rs index 7b8910c7e..32fa308cb 100644 --- a/core/src/core/transaction.rs +++ b/core/src/core/transaction.rs @@ -202,6 +202,12 @@ impl Writeable for TxKernel { impl Readable for TxKernel { fn read(reader: &mut dyn Reader) -> Result { + // We have access to the protocol version here. + // This may be a protocol version based on a peer connection + // or the version used locally for db storage. + // We can handle version specific deserialization here. + let _version = reader.protocol_version(); + Ok(TxKernel { features: KernelFeatures::read(reader)?, fee: reader.read_u64()?, @@ -338,7 +344,7 @@ impl Writeable for TxKernelEntry { } impl Readable for TxKernelEntry { - fn read(reader: &mut Reader) -> Result { + fn read(reader: &mut dyn Reader) -> Result { let kernel = TxKernel::read(reader)?; Ok(TxKernelEntry { kernel }) } @@ -1523,7 +1529,7 @@ mod test { let mut vec = vec![]; ser::serialize(&mut vec, &kernel).expect("serialized failed"); - let kernel2: TxKernel = ser::deserialize(&mut &vec[..]).unwrap(); + let kernel2: TxKernel = ser::deserialize_default(&mut &vec[..]).unwrap(); assert_eq!(kernel2.features, KernelFeatures::Plain); assert_eq!(kernel2.lock_height, 0); assert_eq!(kernel2.excess, commit); @@ -1541,7 +1547,7 @@ mod test { let mut vec = vec![]; ser::serialize(&mut vec, &kernel).expect("serialized failed"); - let kernel2: TxKernel = ser::deserialize(&mut &vec[..]).unwrap(); + let kernel2: TxKernel = ser::deserialize_default(&mut &vec[..]).unwrap(); assert_eq!(kernel2.features, KernelFeatures::HeightLocked); assert_eq!(kernel2.lock_height, 100); assert_eq!(kernel2.excess, commit); diff --git a/core/src/ser.rs b/core/src/ser.rs index 9b0a2fac3..035ad7bcd 100644 --- a/core/src/ser.rs +++ b/core/src/ser.rs @@ -31,11 +31,11 @@ use crate::util::secp::pedersen::{Commitment, RangeProof}; use crate::util::secp::Signature; use crate::util::secp::{ContextFlag, Secp256k1}; use byteorder::{BigEndian, ByteOrder, ReadBytesExt}; -use std::fmt::Debug; +use std::fmt::{self, Debug}; use std::io::{self, Read, Write}; use std::marker; use std::time::Duration; -use std::{cmp, error, fmt}; +use std::{cmp, error}; /// Possible errors deriving from serializing or deserializing. #[derive(Clone, Eq, PartialEq, Debug, Serialize, Deserialize)] @@ -209,6 +209,9 @@ pub trait Reader { /// Consumes a byte from the reader, producing an error if it doesn't have /// the expected value fn expect_u8(&mut self, val: u8) -> Result; + /// Access to underlying protocol version to support + /// version specific deserialization logic. + fn protocol_version(&self) -> ProtocolVersion; } /// Trait that every type that can be serialized as binary must implement. @@ -275,6 +278,54 @@ where Ok(res) } +/// Protocol version for serialization/deserialization. +/// Note: This is used in various places including but limited to +/// the p2p layer and our local db storage layer. +/// We may speak multiple versions to various peers and a potentially *different* +/// version for our local db. +#[derive(Clone, Copy, Debug, Deserialize, Eq, Ord, PartialOrd, PartialEq, Serialize)] +pub struct ProtocolVersion(pub u32); + +impl Default for ProtocolVersion { + fn default() -> ProtocolVersion { + ProtocolVersion(1) + } +} + +impl fmt::Display for ProtocolVersion { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +impl ProtocolVersion { + /// We need to specify a protocol version for our local database. + /// Regardless of specific version used when sending/receiving data between peers + /// we need to take care with serialization/deserialization of data locally in the db. + pub fn local_db() -> ProtocolVersion { + ProtocolVersion(1) + } +} + +impl From for u32 { + fn from(v: ProtocolVersion) -> u32 { + v.0 + } +} + +impl Writeable for ProtocolVersion { + fn write(&self, writer: &mut W) -> Result<(), Error> { + writer.write_u32(self.0) + } +} + +impl Readable for ProtocolVersion { + fn read(reader: &mut dyn Reader) -> Result { + let version = reader.read_u32()?; + Ok(ProtocolVersion(version)) + } +} + /// Trait that every type that can be deserialized from binary must implement. /// Reads directly to a Reader, a utility type thinly wrapping an /// underlying Read implementation. @@ -287,11 +338,24 @@ where } /// Deserializes a Readable from any std::io::Read implementation. -pub fn deserialize(source: &mut dyn Read) -> Result { - let mut reader = BinReader { source }; +pub fn deserialize( + source: &mut dyn Read, + version: ProtocolVersion, +) -> Result { + let mut reader = BinReader::new(source, version); T::read(&mut reader) } +/// Deserialize a Readable based on our local db version protocol. +pub fn deserialize_db(source: &mut dyn Read) -> Result { + deserialize(source, ProtocolVersion::local_db()) +} + +/// Deserialize a Readable based on our local "default" version protocol. +pub fn deserialize_default(source: &mut dyn Read) -> Result { + deserialize(source, ProtocolVersion::default()) +} + /// Serializes a Writeable into any std::io::Write implementation. pub fn serialize(sink: &mut dyn Write, thing: &W) -> Result<(), Error> { let mut writer = BinWriter { sink }; @@ -309,6 +373,14 @@ pub fn ser_vec(thing: &W) -> Result, Error> { /// Utility to read from a binary source pub struct BinReader<'a> { source: &'a mut dyn Read, + version: ProtocolVersion, +} + +impl<'a> BinReader<'a> { + /// Constructor for a new BinReader for the provided source and protocol version. + pub fn new(source: &'a mut dyn Read, version: ProtocolVersion) -> BinReader<'a> { + BinReader { source, version } + } } fn map_io_err(err: io::Error) -> Error { @@ -366,12 +438,17 @@ impl<'a> Reader for BinReader<'a> { }) } } + + fn protocol_version(&self) -> ProtocolVersion { + self.version + } } /// A reader that reads straight off a stream. /// Tracks total bytes read so we can verify we read the right number afterwards. pub struct StreamingReader<'a> { total_bytes_read: u64, + version: ProtocolVersion, stream: &'a mut dyn Read, timeout: Duration, } @@ -379,9 +456,14 @@ pub struct StreamingReader<'a> { impl<'a> StreamingReader<'a> { /// Create a new streaming reader with the provided underlying stream. /// Also takes a duration to be used for each individual read_exact call. - pub fn new(stream: &'a mut dyn Read, timeout: Duration) -> StreamingReader<'a> { + pub fn new( + stream: &'a mut dyn Read, + version: ProtocolVersion, + timeout: Duration, + ) -> StreamingReader<'a> { StreamingReader { total_bytes_read: 0, + version, stream, timeout, } @@ -450,6 +532,10 @@ impl<'a> Reader for StreamingReader<'a> { }) } } + + fn protocol_version(&self) -> ProtocolVersion { + self.version + } } impl Readable for Commitment { diff --git a/core/tests/block.rs b/core/tests/block.rs index e561f2bcf..f1b142e0f 100644 --- a/core/tests/block.rs +++ b/core/tests/block.rs @@ -221,7 +221,7 @@ fn serialize_deserialize_header_version() { assert_eq!(vec1, vec2); // Check we can successfully deserialize a header_version. - let version: HeaderVersion = ser::deserialize(&mut &vec2[..]).unwrap(); + let version: HeaderVersion = ser::deserialize_default(&mut &vec2[..]).unwrap(); assert_eq!(version.0, 1) } @@ -236,7 +236,7 @@ fn serialize_deserialize_block_header() { let mut vec = Vec::new(); ser::serialize(&mut vec, &header1).expect("serialization failed"); - let header2: BlockHeader = ser::deserialize(&mut &vec[..]).unwrap(); + let header2: BlockHeader = ser::deserialize_default(&mut &vec[..]).unwrap(); assert_eq!(header1.hash(), header2.hash()); assert_eq!(header1, header2); @@ -253,7 +253,7 @@ fn serialize_deserialize_block() { let mut vec = Vec::new(); ser::serialize(&mut vec, &b).expect("serialization failed"); - let b2: Block = ser::deserialize(&mut &vec[..]).unwrap(); + let b2: Block = ser::deserialize_default(&mut &vec[..]).unwrap(); assert_eq!(b.hash(), b2.hash()); assert_eq!(b.header, b2.header); @@ -447,7 +447,7 @@ fn serialize_deserialize_compact_block() { cb1.header.timestamp = origin_ts - Duration::nanoseconds(origin_ts.timestamp_subsec_nanos() as i64); - let cb2: CompactBlock = ser::deserialize(&mut &vec[..]).unwrap(); + let cb2: CompactBlock = ser::deserialize_default(&mut &vec[..]).unwrap(); assert_eq!(cb1.header, cb2.header); assert_eq!(cb1.kern_ids(), cb2.kern_ids()); diff --git a/core/tests/core.rs b/core/tests/core.rs index 711436df9..d0f5dff17 100644 --- a/core/tests/core.rs +++ b/core/tests/core.rs @@ -49,7 +49,7 @@ fn simple_tx_ser_deser() { let tx = tx2i1o(); let mut vec = Vec::new(); ser::serialize(&mut vec, &tx).expect("serialization failed"); - let dtx: Transaction = ser::deserialize(&mut &vec[..]).unwrap(); + let dtx: Transaction = ser::deserialize_default(&mut &vec[..]).unwrap(); assert_eq!(dtx.fee(), 2); assert_eq!(dtx.inputs().len(), 2); assert_eq!(dtx.outputs().len(), 1); @@ -63,11 +63,11 @@ fn tx_double_ser_deser() { let mut vec = Vec::new(); assert!(ser::serialize(&mut vec, &btx).is_ok()); - let dtx: Transaction = ser::deserialize(&mut &vec[..]).unwrap(); + let dtx: Transaction = ser::deserialize_default(&mut &vec[..]).unwrap(); let mut vec2 = Vec::new(); assert!(ser::serialize(&mut vec2, &btx).is_ok()); - let dtx2: Transaction = ser::deserialize(&mut &vec2[..]).unwrap(); + let dtx2: Transaction = ser::deserialize_default(&mut &vec2[..]).unwrap(); assert_eq!(btx.hash(), dtx.hash()); assert_eq!(dtx.hash(), dtx2.hash()); diff --git a/core/tests/merkle_proof.rs b/core/tests/merkle_proof.rs index 731626e69..56b945c6c 100644 --- a/core/tests/merkle_proof.rs +++ b/core/tests/merkle_proof.rs @@ -16,8 +16,7 @@ mod vec_backend; use self::core::core::merkle_proof::MerkleProof; use self::core::core::pmmr::PMMR; -use self::core::ser; -use self::core::ser::PMMRIndexHashable; +use self::core::ser::{self, PMMRIndexHashable}; use crate::vec_backend::{TestElem, VecBackend}; use grin_core as core; @@ -39,7 +38,7 @@ fn merkle_proof_ser_deser() { let mut vec = Vec::new(); ser::serialize(&mut vec, &proof).expect("serialization failed"); - let proof_2: MerkleProof = ser::deserialize(&mut &vec[..]).unwrap(); + let proof_2: MerkleProof = ser::deserialize_default(&mut &vec[..]).unwrap(); assert_eq!(proof, proof_2); } diff --git a/core/tests/transaction.rs b/core/tests/transaction.rs index 14a9a40db..3e9548b49 100644 --- a/core/tests/transaction.rs +++ b/core/tests/transaction.rs @@ -40,7 +40,7 @@ fn test_output_ser_deser() { let mut vec = vec![]; ser::serialize(&mut vec, &out).expect("serialized failed"); - let dout: Output = ser::deserialize(&mut &vec[..]).unwrap(); + let dout: Output = ser::deserialize_default(&mut &vec[..]).unwrap(); assert_eq!(dout.features, OutputFeatures::Plain); assert_eq!(dout.commit, out.commit); diff --git a/p2p/src/conn.rs b/p2p/src/conn.rs index 394902fd3..ad6fb7c8b 100644 --- a/p2p/src/conn.rs +++ b/p2p/src/conn.rs @@ -30,8 +30,7 @@ use std::{ time, }; -use crate::core::ser; -use crate::core::ser::FixedLength; +use crate::core::ser::{self, FixedLength, ProtocolVersion}; use crate::msg::{ read_body, read_discard, read_header, read_item, write_to_buf, MsgHeader, MsgHeaderWrapper, Type, @@ -75,22 +74,31 @@ macro_rules! try_break { pub struct Message<'a> { pub header: MsgHeader, stream: &'a mut dyn Read, + version: ProtocolVersion, } impl<'a> Message<'a> { - fn from_header(header: MsgHeader, stream: &'a mut dyn Read) -> Message<'a> { - Message { header, stream } + fn from_header( + header: MsgHeader, + stream: &'a mut dyn Read, + version: ProtocolVersion, + ) -> Message<'a> { + Message { + header, + stream, + version, + } } /// Read the message body from the underlying connection pub fn body(&mut self) -> Result { - read_body(&self.header, self.stream) + read_body(&self.header, self.stream, self.version) } /// Read a single "thing" from the underlying connection. /// Return the thing and the total bytes read. pub fn streaming_read(&mut self) -> Result<(T, u64), Error> { - read_item(self.stream) + read_item(self.stream, self.version) } pub fn copy_attachment(&mut self, len: usize, writer: &mut dyn Write) -> Result { @@ -255,6 +263,7 @@ impl Tracker { /// itself. pub fn listen( stream: TcpStream, + version: ProtocolVersion, tracker: Arc, handler: H, ) -> io::Result<(ConnHandle, StopHandle)> @@ -267,7 +276,7 @@ where stream .set_nonblocking(true) .expect("Non-blocking IO not available."); - let peer_thread = poll(stream, handler, send_rx, close_rx, tracker)?; + let peer_thread = poll(stream, version, handler, send_rx, close_rx, tracker)?; Ok(( ConnHandle { @@ -282,6 +291,7 @@ where fn poll( conn: TcpStream, + version: ProtocolVersion, handler: H, send_rx: mpsc::Receiver>, close_rx: mpsc::Receiver<()>, @@ -301,9 +311,9 @@ where let mut retry_send = Err(()); loop { // check the read end - match try_break!(read_header(&mut reader, None)) { + match try_break!(read_header(&mut reader, version, None)) { Some(MsgHeaderWrapper::Known(header)) => { - let msg = Message::from_header(header, &mut reader); + let msg = Message::from_header(header, &mut reader, version); trace!( "Received message header, type {:?}, len {}.", diff --git a/p2p/src/handshake.rs b/p2p/src/handshake.rs index 820e717f9..94f5305f0 100644 --- a/p2p/src/handshake.rs +++ b/p2p/src/handshake.rs @@ -14,7 +14,8 @@ use crate::core::core::hash::Hash; use crate::core::pow::Difficulty; -use crate::msg::{read_message, write_message, Hand, ProtocolVersion, Shake, Type, USER_AGENT}; +use crate::core::ser::ProtocolVersion; +use crate::msg::{read_message, write_message, Hand, Shake, Type, USER_AGENT}; use crate::peer::Peer; use crate::types::{Capabilities, Direction, Error, P2PConfig, PeerAddr, PeerInfo, PeerLiveInfo}; use crate::util::RwLock; @@ -60,7 +61,7 @@ impl Handshake { pub fn initiate( &self, - capab: Capabilities, + capabilities: Capabilities, total_difficulty: Difficulty, self_addr: PeerAddr, conn: &mut TcpStream, @@ -72,12 +73,15 @@ impl Handshake { Err(e) => return Err(Error::Connection(e)), }; + // Using our default version here. + let version = ProtocolVersion::default(); + let hand = Hand { - version: ProtocolVersion::default(), - capabilities: capab, - nonce: nonce, + version, + capabilities, + nonce, genesis: self.genesis, - total_difficulty: total_difficulty, + total_difficulty, sender_addr: self_addr, receiver_addr: peer_addr, user_agent: USER_AGENT.to_string(), @@ -85,7 +89,10 @@ impl Handshake { // write and read the handshake response write_message(conn, hand, Type::Hand)?; - let shake: Shake = read_message(conn, Type::Shake)?; + + // Note: We have to read the Shake message *before* we know which protocol + // version our peer supports (it is in the shake message itself). + let shake: Shake = read_message(conn, version, Type::Shake)?; if shake.genesis != self.genesis { return Err(Error::GenesisMismatch { us: self.genesis, @@ -124,7 +131,10 @@ impl Handshake { total_difficulty: Difficulty, conn: &mut TcpStream, ) -> Result { - let hand: Hand = read_message(conn, Type::Hand)?; + // Note: We read the Hand message *before* we know which protocol version + // is supported by our peer (it is in the Hand message). + let version = ProtocolVersion::default(); + let hand: Hand = read_message(conn, version, Type::Hand)?; // all the reasons we could refuse this connection for if hand.genesis != self.genesis { diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index 29a7fc719..1ee0f37e4 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -15,30 +15,21 @@ //! Message types that transit over the network and related serialization code. use num::FromPrimitive; -use std::fmt; use std::io::{Read, Write}; use std::time; use crate::core::core::hash::Hash; use crate::core::core::BlockHeader; use crate::core::pow::Difficulty; -use crate::core::ser::{self, FixedLength, Readable, Reader, StreamingReader, Writeable, Writer}; +use crate::core::ser::{ + self, FixedLength, ProtocolVersion, Readable, Reader, StreamingReader, Writeable, Writer, +}; use crate::core::{consensus, global}; use crate::types::{ Capabilities, Error, PeerAddr, ReasonForBan, MAX_BLOCK_HEADERS, MAX_LOCATORS, MAX_PEER_ADDRS, }; use crate::util::read_write::read_exact; -/// Our local node protocol version. -/// We will increment the protocol version with every change to p2p msg serialization -/// so we will likely connect with peers with both higher and lower protocol versions. -/// We need to be aware that some msg formats will be potentially incompatible and handle -/// this for each individual peer connection. -/// Note: A peer may disconnect and reconnect with an updated protocol version. Normally -/// the protocol version will increase but we need to handle decreasing values also -/// as a peer may rollback to previous version of the code. -const PROTOCOL_VERSION: u32 = 1; - /// Grin's user agent with current version pub const USER_AGENT: &'static str = concat!("MW/Grin ", env!("CARGO_PKG_VERSION")); @@ -134,6 +125,7 @@ fn magic() -> [u8; 2] { /// pub fn read_header( stream: &mut dyn Read, + version: ProtocolVersion, msg_type: Option, ) -> Result { let mut head = vec![0u8; MsgHeader::LEN]; @@ -142,26 +134,33 @@ pub fn read_header( } else { read_exact(stream, &mut head, time::Duration::from_secs(10), false)?; } - let header = ser::deserialize::(&mut &head[..])?; + let header = ser::deserialize::(&mut &head[..], version)?; Ok(header) } /// Read a single item from the provided stream, always blocking until we /// have a result (or timeout). /// Returns the item and the total bytes read. -pub fn read_item(stream: &mut dyn Read) -> Result<(T, u64), Error> { +pub fn read_item( + stream: &mut dyn Read, + version: ProtocolVersion, +) -> Result<(T, u64), Error> { let timeout = time::Duration::from_secs(20); - let mut reader = StreamingReader::new(stream, timeout); + let mut reader = StreamingReader::new(stream, version, timeout); let res = T::read(&mut reader)?; Ok((res, reader.total_bytes_read())) } /// Read a message body from the provided stream, always blocking /// until we have a result (or timeout). -pub fn read_body(h: &MsgHeader, stream: &mut dyn Read) -> Result { +pub fn read_body( + h: &MsgHeader, + stream: &mut dyn Read, + version: ProtocolVersion, +) -> Result { let mut body = vec![0u8; h.msg_len as usize]; read_exact(stream, &mut body, time::Duration::from_secs(20), true)?; - ser::deserialize(&mut &body[..]).map_err(From::from) + ser::deserialize(&mut &body[..], version).map_err(From::from) } /// Read (an unknown) message from the provided stream and discard it. @@ -172,11 +171,15 @@ pub fn read_discard(msg_len: u64, stream: &mut dyn Read) -> Result<(), Error> { } /// Reads a full message from the underlying stream. -pub fn read_message(stream: &mut dyn Read, msg_type: Type) -> Result { - match read_header(stream, Some(msg_type))? { +pub fn read_message( + stream: &mut dyn Read, + version: ProtocolVersion, + msg_type: Type, +) -> Result { + match read_header(stream, version, Some(msg_type))? { MsgHeaderWrapper::Known(header) => { if header.msg_type == msg_type { - read_body(&header, stream) + read_body(&header, stream, version) } else { Err(Error::BadMessage) } @@ -309,40 +312,6 @@ impl Readable for MsgHeaderWrapper { } } -#[derive(Clone, Copy, Debug, Deserialize, Eq, Ord, PartialOrd, PartialEq, Serialize)] -pub struct ProtocolVersion(pub u32); - -impl Default for ProtocolVersion { - fn default() -> ProtocolVersion { - ProtocolVersion(PROTOCOL_VERSION) - } -} - -impl From for u32 { - fn from(v: ProtocolVersion) -> u32 { - v.0 - } -} - -impl fmt::Display for ProtocolVersion { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}", self.0) - } -} - -impl Writeable for ProtocolVersion { - fn write(&self, writer: &mut W) -> Result<(), ser::Error> { - writer.write_u32(self.0) - } -} - -impl Readable for ProtocolVersion { - fn read(reader: &mut dyn Reader) -> Result { - let version = reader.read_u32()?; - Ok(ProtocolVersion(version)) - } -} - /// First part of a handshake, sender advertises its version and /// characteristics. pub struct Hand { @@ -436,10 +405,8 @@ impl Writeable for Shake { impl Readable for Shake { fn read(reader: &mut dyn Reader) -> Result { let version = ProtocolVersion::read(reader)?; - let capab = reader.read_u32()?; let capabilities = Capabilities::from_bits_truncate(capab); - let total_difficulty = Difficulty::read(reader)?; let ua = reader.read_bytes_len_prefix()?; let user_agent = String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData)?; diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index f1d5fc85c..88ae1b436 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -75,7 +75,7 @@ impl Peer { let tracking_adapter = TrackingAdapter::new(adapter); let handler = Protocol::new(Arc::new(tracking_adapter.clone()), info.clone()); let tracker = Arc::new(conn::Tracker::new()); - let (sendh, stoph) = conn::listen(conn, tracker.clone(), handler)?; + let (sendh, stoph) = conn::listen(conn, info.version, tracker.clone(), handler)?; let send_handle = Mutex::new(sendh); let stop_handle = Mutex::new(stoph); Ok(Peer { diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index 9c186a985..721c56848 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -25,6 +25,7 @@ use std::cmp; use std::fs::{self, File, OpenOptions}; use std::io::{BufWriter, Seek, SeekFrom, Write}; use std::sync::Arc; +use std::time::Instant; use tempfile::tempfile; pub struct Protocol { @@ -340,6 +341,7 @@ impl MessageHandler for Protocol { download_start_time.timestamp(), nonce )); + let mut now = Instant::now(); let mut save_txhashset_to_file = |file| -> Result<(), Error> { let mut tmp_zip = BufWriter::new(OpenOptions::new().write(true).create_new(true).open(file)?); @@ -355,11 +357,21 @@ impl MessageHandler for Protocol { downloaded_size as u64, total_size as u64, ); - + if now.elapsed().as_secs() > 10 { + now = Instant::now(); + debug!( + "handle_payload: txhashset archive: {}/{}", + downloaded_size, total_size + ); + } // Increase received bytes quietly (without affecting the counters). // Otherwise we risk banning a peer as "abusive". tracker.inc_quiet_received(size as u64) } + debug!( + "handle_payload: txhashset archive: {}/{} ... DONE", + downloaded_size, total_size + ); tmp_zip .into_inner() .map_err(|_| Error::Internal)? diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 1afa9a136..3924b7bb5 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -29,8 +29,7 @@ use crate::core::core; use crate::core::core::hash::Hash; use crate::core::global; use crate::core::pow::Difficulty; -use crate::core::ser::{self, Readable, Reader, Writeable, Writer}; -use crate::msg::ProtocolVersion; +use crate::core::ser::{self, ProtocolVersion, Readable, Reader, Writeable, Writer}; use grin_store; /// Maximum number of block headers a peer should ever send diff --git a/servers/src/common/stats.rs b/servers/src/common/stats.rs index ed5a70573..9fd6d62e0 100644 --- a/servers/src/common/stats.rs +++ b/servers/src/common/stats.rs @@ -21,6 +21,7 @@ use std::time::SystemTime; use crate::core::consensus::graph_weight; use crate::core::core::hash::Hash; +use crate::core::ser::ProtocolVersion; use chrono::prelude::*; @@ -147,7 +148,7 @@ pub struct PeerStats { /// Address pub addr: String, /// version running - pub version: p2p::msg::ProtocolVersion, + pub version: ProtocolVersion, /// Peer user agent string. pub user_agent: String, /// difficulty reported by peer diff --git a/servers/src/grin/server.rs b/servers/src/grin/server.rs index 9a9e4c398..a3d9fb4b5 100644 --- a/servers/src/grin/server.rs +++ b/servers/src/grin/server.rs @@ -39,6 +39,7 @@ use crate::common::stats::{DiffBlock, DiffStats, PeerStats, ServerStateInfo, Ser use crate::common::types::{Error, ServerConfig, StratumServerConfig, SyncState, SyncStatus}; use crate::core::core::hash::{Hashed, ZERO_HASH}; use crate::core::core::verifier_cache::{LruVerifierCache, VerifierCache}; +use crate::core::ser::ProtocolVersion; use crate::core::{consensus, genesis, global, pow}; use crate::grin::{dandelion_monitor, seed, sync}; use crate::mining::stratumserver; @@ -414,9 +415,9 @@ impl Server { self.chain.header_head().map_err(|e| e.into()) } - /// Current p2p layer protocol version. - pub fn protocol_version() -> p2p::msg::ProtocolVersion { - p2p::msg::ProtocolVersion::default() + /// The p2p layer protocol version for this node. + pub fn protocol_version() -> ProtocolVersion { + ProtocolVersion::default() } /// Returns a set of stats about this server. This and the ServerStats diff --git a/store/src/lmdb.rs b/store/src/lmdb.rs index ba47101f1..e7f52b090 100644 --- a/store/src/lmdb.rs +++ b/store/src/lmdb.rs @@ -230,7 +230,7 @@ impl Store { ) -> Result, Error> { let res: lmdb::error::Result<&[u8]> = access.get(&db.as_ref().unwrap(), key); match res.to_opt() { - Ok(Some(mut res)) => match ser::deserialize(&mut res) { + Ok(Some(mut res)) => match ser::deserialize_db(&mut res) { Ok(res) => Ok(Some(res)), Err(e) => Err(Error::SerErr(format!("{}", e))), }, @@ -393,7 +393,7 @@ where fn deser_if_prefix_match(&self, key: &[u8], value: &[u8]) -> Option<(Vec, T)> { let plen = self.prefix.len(); if plen == 0 || key[0..plen] == self.prefix[..] { - if let Ok(value) = ser::deserialize(&mut &value[..]) { + if let Ok(value) = ser::deserialize_db(&mut &value[..]) { Some((key.to_vec(), value)) } else { None diff --git a/store/src/types.rs b/store/src/types.rs index 6629896a3..2f46f7621 100644 --- a/store/src/types.rs +++ b/store/src/types.rs @@ -16,7 +16,7 @@ use memmap; use tempfile::tempfile; use crate::core::ser::{ - self, BinWriter, FixedLength, Readable, Reader, StreamingReader, Writeable, Writer, + self, BinWriter, FixedLength, ProtocolVersion, Readable, Reader, StreamingReader, Writeable, Writer, }; use std::fmt::Debug; use std::fs::{self, File, OpenOptions}; @@ -415,7 +415,7 @@ where fn read_as_elmt(&self, pos: u64) -> io::Result { let data = self.read(pos)?; - ser::deserialize(&mut &data[..]).map_err(|e| io::Error::new(io::ErrorKind::Other, e)) + ser::deserialize_db(&mut &data[..]).map_err(|e| io::Error::new(io::ErrorKind::Other, e)) } // Read length bytes starting at offset from the buffer. @@ -471,7 +471,7 @@ where let reader = File::open(&self.path)?; let mut buf_reader = BufReader::new(reader); let mut streaming_reader = - StreamingReader::new(&mut buf_reader, time::Duration::from_secs(1)); + StreamingReader::new(&mut buf_reader, ProtocolVersion::local_db(), time::Duration::from_secs(1)); let mut buf_writer = BufWriter::new(File::create(&tmp_path)?); let mut bin_writer = BinWriter::new(&mut buf_writer); @@ -518,7 +518,7 @@ where let reader = File::open(&self.path)?; let mut buf_reader = BufReader::new(reader); let mut streaming_reader = - StreamingReader::new(&mut buf_reader, time::Duration::from_secs(1)); + StreamingReader::new(&mut buf_reader, ProtocolVersion::local_db(), time::Duration::from_secs(1)); let mut buf_writer = BufWriter::new(File::create(&tmp_path)?); let mut bin_writer = BinWriter::new(&mut buf_writer);