diff --git a/api/src/handlers/pool_api.rs b/api/src/handlers/pool_api.rs index 1b0733360..b458787bd 100644 --- a/api/src/handlers/pool_api.rs +++ b/api/src/handlers/pool_api.rs @@ -76,7 +76,7 @@ impl PoolPushHandler { }) .and_then(move |tx_bin| { // TODO - pass protocol version in via the api call? - let version = ProtocolVersion::default(); + let version = ProtocolVersion::local(); ser::deserialize(&mut &tx_bin[..], version) .map_err(|e| ErrorKind::RequestError(format!("Bad request: {}", e)).into()) diff --git a/api/src/types.rs b/api/src/types.rs index 3bb5b28c9..4a384a911 100644 --- a/api/src/types.rs +++ b/api/src/types.rs @@ -83,7 +83,7 @@ pub struct Status { impl Status { pub fn from_tip_and_peers(current_tip: chain::Tip, connections: u32) -> Status { Status { - protocol_version: ser::ProtocolVersion::default().into(), + protocol_version: ser::ProtocolVersion::local().into(), user_agent: p2p::msg::USER_AGENT.to_string(), connections: connections, tip: Tip::from_tip(current_tip), diff --git a/chain/src/chain.rs b/chain/src/chain.rs index ded052021..bc3294908 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -647,7 +647,8 @@ impl Chain { /// TODO - Write this data to disk and validate the rebuilt kernel MMR. pub fn kernel_data_write(&self, reader: &mut Read) -> Result<(), Error> { let mut count = 0; - let mut stream = StreamingReader::new(reader, ProtocolVersion::default(), Duration::from_secs(1)); + let mut stream = + StreamingReader::new(reader, ProtocolVersion::local(), Duration::from_secs(1)); while let Ok(_kernel) = TxKernelEntry::read(&mut stream) { count += 1; } diff --git a/core/src/core/block.rs b/core/src/core/block.rs index aad9b280f..1270d9bc7 100644 --- a/core/src/core/block.rs +++ b/core/src/core/block.rs @@ -359,7 +359,7 @@ impl BlockHeader { pub fn pre_pow(&self) -> Vec { let mut header_buf = vec![]; { - let mut writer = ser::BinWriter::new(&mut header_buf); + let mut writer = ser::BinWriter::default(&mut header_buf); self.write_pre_pow(&mut writer).unwrap(); self.pow.write_pre_pow(&mut writer).unwrap(); writer.write_u64(self.pow.nonce).unwrap(); diff --git a/core/src/core/hash.rs b/core/src/core/hash.rs index e0638762c..b2221e021 100644 --- a/core/src/core/hash.rs +++ b/core/src/core/hash.rs @@ -25,7 +25,9 @@ use std::{fmt, ops}; use crate::blake2::blake2b::Blake2b; -use crate::ser::{self, AsFixedBytes, Error, FixedLength, Readable, Reader, Writeable, Writer}; +use crate::ser::{ + self, AsFixedBytes, Error, FixedLength, ProtocolVersion, Readable, Reader, Writeable, Writer, +}; use crate::util; /// A hash consisting of all zeroes, used as a sentinel. No known preimage. @@ -219,6 +221,10 @@ impl ser::Writer for HashWriter { self.state.update(b32.as_ref()); Ok(()) } + + fn protocol_version(&self) -> ProtocolVersion { + ProtocolVersion::local() + } } /// A trait for types that have a canonical hash diff --git a/core/src/core/merkle_proof.rs b/core/src/core/merkle_proof.rs index 747819c2c..a4a21e524 100644 --- a/core/src/core/merkle_proof.rs +++ b/core/src/core/merkle_proof.rs @@ -17,7 +17,7 @@ use crate::core::hash::Hash; use crate::core::pmmr; use crate::ser; -use crate::ser::{PMMRIndexHashable, ProtocolVersion, Readable, Reader, Writeable, Writer}; +use crate::ser::{PMMRIndexHashable, Readable, Reader, Writeable, Writer}; use crate::util; /// Merkle proof errors. @@ -78,14 +78,14 @@ impl MerkleProof { /// Serialize the Merkle proof as a hex string (for api json endpoints) pub fn to_hex(&self) -> String { let mut vec = Vec::new(); - ser::serialize(&mut vec, &self).expect("serialization failed"); + ser::serialize_default(&mut vec, &self).expect("serialization failed"); util::to_hex(vec) } /// Convert hex string representation back to a Merkle proof instance pub fn from_hex(hex: &str) -> Result { let bytes = util::from_hex(hex.to_string()).unwrap(); - let res = ser::deserialize(&mut &bytes[..], ProtocolVersion::default()) + let res = ser::deserialize_default(&mut &bytes[..]) .map_err(|_| "failed to deserialize a Merkle Proof".to_string())?; Ok(res) } diff --git a/core/src/core/transaction.rs b/core/src/core/transaction.rs index 32fa308cb..8fd4172c6 100644 --- a/core/src/core/transaction.rs +++ b/core/src/core/transaction.rs @@ -185,13 +185,19 @@ hashable_ord!(TxKernel); impl ::std::hash::Hash for TxKernel { fn hash(&self, state: &mut H) { let mut vec = Vec::new(); - ser::serialize(&mut vec, &self).expect("serialization failed"); + ser::serialize_default(&mut vec, &self).expect("serialization failed"); ::std::hash::Hash::hash(&vec, state); } } impl Writeable for TxKernel { fn write(&self, writer: &mut W) -> Result<(), ser::Error> { + // We have access to the protocol version here. + // This may be a protocol version based on a peer connection + // or the version used locally for db storage. + // We can handle version specific serialization here. + let _version = writer.protocol_version(); + self.features.write(writer)?; ser_multiwrite!(writer, [write_u64, self.fee], [write_u64, self.lock_height]); self.excess.write(writer)?; @@ -1165,7 +1171,7 @@ hashable_ord!(Input); impl ::std::hash::Hash for Input { fn hash(&self, state: &mut H) { let mut vec = Vec::new(); - ser::serialize(&mut vec, &self).expect("serialization failed"); + ser::serialize_default(&mut vec, &self).expect("serialization failed"); ::std::hash::Hash::hash(&vec, state); } } @@ -1276,7 +1282,7 @@ hashable_ord!(Output); impl ::std::hash::Hash for Output { fn hash(&self, state: &mut H) { let mut vec = Vec::new(); - ser::serialize(&mut vec, &self).expect("serialization failed"); + ser::serialize_default(&mut vec, &self).expect("serialization failed"); ::std::hash::Hash::hash(&vec, state); } } @@ -1528,7 +1534,7 @@ mod test { }; let mut vec = vec![]; - ser::serialize(&mut vec, &kernel).expect("serialized failed"); + ser::serialize_default(&mut vec, &kernel).expect("serialized failed"); let kernel2: TxKernel = ser::deserialize_default(&mut &vec[..]).unwrap(); assert_eq!(kernel2.features, KernelFeatures::Plain); assert_eq!(kernel2.lock_height, 0); @@ -1546,7 +1552,7 @@ mod test { }; let mut vec = vec![]; - ser::serialize(&mut vec, &kernel).expect("serialized failed"); + ser::serialize_default(&mut vec, &kernel).expect("serialized failed"); let kernel2: TxKernel = ser::deserialize_default(&mut &vec[..]).unwrap(); assert_eq!(kernel2.features, KernelFeatures::HeightLocked); assert_eq!(kernel2.lock_height, 100); diff --git a/core/src/genesis.rs b/core/src/genesis.rs index 8dc37797e..be41a48db 100644 --- a/core/src/genesis.rs +++ b/core/src/genesis.rs @@ -288,13 +288,13 @@ pub fn genesis_main() -> core::Block { mod test { use super::*; use crate::core::hash::Hashed; - use crate::ser; + use crate::ser::{self, ProtocolVersion}; #[test] fn floonet_genesis_hash() { let gen_hash = genesis_floo().hash(); println!("floonet genesis hash: {}", gen_hash.to_hex()); - let gen_bin = ser::ser_vec(&genesis_floo()).unwrap(); + let gen_bin = ser::ser_vec(&genesis_floo(), ProtocolVersion(1)).unwrap(); println!("floonet genesis full hash: {}\n", gen_bin.hash().to_hex()); assert_eq!( gen_hash.to_hex(), @@ -310,7 +310,7 @@ mod test { fn mainnet_genesis_hash() { let gen_hash = genesis_main().hash(); println!("mainnet genesis hash: {}", gen_hash.to_hex()); - let gen_bin = ser::ser_vec(&genesis_main()).unwrap(); + let gen_bin = ser::ser_vec(&genesis_main(), ProtocolVersion(1)).unwrap(); println!("mainnet genesis full hash: {}\n", gen_bin.hash().to_hex()); assert_eq!( gen_hash.to_hex(), diff --git a/core/src/global.rs b/core/src/global.rs index f9bc334c2..719f0a7b6 100644 --- a/core/src/global.rs +++ b/core/src/global.rs @@ -33,6 +33,13 @@ use crate::util::RwLock; /// Define these here, as they should be developer-set, not really tweakable /// by users +/// The default "local" protocol version for this node. +/// We negotiate compatible versions with each peer via Hand/Shake. +/// Note: We also use a specific (possible different) protocol version +/// for both the backend database and MMR data files. +/// This one is p2p layer specific. +pub const PROTOCOL_VERSION: u32 = 1; + /// Automated testing edge_bits pub const AUTOMATED_TESTING_MIN_EDGE_BITS: u8 = 9; diff --git a/core/src/ser.rs b/core/src/ser.rs index 035ad7bcd..57a089816 100644 --- a/core/src/ser.rs +++ b/core/src/ser.rs @@ -20,6 +20,7 @@ //! `serialize` or `deserialize` functions on them as appropriate. use crate::core::hash::{DefaultHashable, Hash, Hashed}; +use crate::global::PROTOCOL_VERSION; use crate::keychain::{BlindingFactor, Identifier, IDENTIFIER_SIZE}; use crate::util::read_write::read_exact; use crate::util::secp::constants::{ @@ -135,6 +136,9 @@ pub trait Writer { /// The mode this serializer is writing in fn serialization_mode(&self) -> SerializationMode; + /// Protocol version for version specific serialization rules. + fn protocol_version(&self) -> ProtocolVersion; + /// Writes a u8 as bytes fn write_u8(&mut self, n: u8) -> Result<(), Error> { self.write_fixed_bytes(&[n]) @@ -286,9 +290,10 @@ where #[derive(Clone, Copy, Debug, Deserialize, Eq, Ord, PartialOrd, PartialEq, Serialize)] pub struct ProtocolVersion(pub u32); -impl Default for ProtocolVersion { - fn default() -> ProtocolVersion { - ProtocolVersion(1) +impl ProtocolVersion { + /// Our default "local" protocol version. + pub fn local() -> ProtocolVersion { + ProtocolVersion(PROTOCOL_VERSION) } } @@ -346,27 +351,31 @@ pub fn deserialize( T::read(&mut reader) } -/// Deserialize a Readable based on our local db version protocol. -pub fn deserialize_db(source: &mut dyn Read) -> Result { - deserialize(source, ProtocolVersion::local_db()) -} - -/// Deserialize a Readable based on our local "default" version protocol. +/// Deserialize a Readable based on our default "local" protocol version. pub fn deserialize_default(source: &mut dyn Read) -> Result { - deserialize(source, ProtocolVersion::default()) + deserialize(source, ProtocolVersion::local()) } /// Serializes a Writeable into any std::io::Write implementation. -pub fn serialize(sink: &mut dyn Write, thing: &W) -> Result<(), Error> { - let mut writer = BinWriter { sink }; +pub fn serialize( + sink: &mut dyn Write, + version: ProtocolVersion, + thing: &W, +) -> Result<(), Error> { + let mut writer = BinWriter::new(sink, version); thing.write(&mut writer) } +/// Serialize a Writeable according to our default "local" protocol version. +pub fn serialize_default(sink: &mut dyn Write, thing: &W) -> Result<(), Error> { + serialize(sink, ProtocolVersion::local(), thing) +} + /// Utility function to serialize a writeable directly in memory using a /// Vec. -pub fn ser_vec(thing: &W) -> Result, Error> { +pub fn ser_vec(thing: &W, version: ProtocolVersion) -> Result, Error> { let mut vec = vec![]; - serialize(&mut vec, thing)?; + serialize(&mut vec, version, thing)?; Ok(vec) } @@ -475,32 +484,28 @@ impl<'a> StreamingReader<'a> { } } +/// Note: We use read_fixed_bytes() here to ensure our "async" I/O behaves as expected. impl<'a> Reader for StreamingReader<'a> { fn read_u8(&mut self) -> Result { let buf = self.read_fixed_bytes(1)?; Ok(buf[0]) } - fn read_u16(&mut self) -> Result { let buf = self.read_fixed_bytes(2)?; Ok(BigEndian::read_u16(&buf[..])) } - fn read_u32(&mut self) -> Result { let buf = self.read_fixed_bytes(4)?; Ok(BigEndian::read_u32(&buf[..])) } - fn read_i32(&mut self) -> Result { let buf = self.read_fixed_bytes(4)?; Ok(BigEndian::read_i32(&buf[..])) } - fn read_u64(&mut self) -> Result { let buf = self.read_fixed_bytes(8)?; Ok(BigEndian::read_u64(&buf[..])) } - fn read_i64(&mut self) -> Result { let buf = self.read_fixed_bytes(8)?; Ok(BigEndian::read_i64(&buf[..])) @@ -683,12 +688,18 @@ impl VerifySortedAndUnique for Vec { /// to write numbers, byte vectors, hashes, etc. pub struct BinWriter<'a> { sink: &'a mut dyn Write, + version: ProtocolVersion, } impl<'a> BinWriter<'a> { /// Wraps a standard Write in a new BinWriter - pub fn new(write: &'a mut dyn Write) -> BinWriter<'a> { - BinWriter { sink: write } + pub fn new(sink: &'a mut dyn Write, version: ProtocolVersion) -> BinWriter<'a> { + BinWriter { sink, version } + } + + /// Constructor for BinWriter with default "local" protocol version. + pub fn default(sink: &'a mut dyn Write) -> BinWriter<'a> { + BinWriter::new(sink, ProtocolVersion::local()) } } @@ -702,6 +713,10 @@ impl<'a> Writer for BinWriter<'a> { self.sink.write_all(bs)?; Ok(()) } + + fn protocol_version(&self) -> ProtocolVersion { + self.version + } } macro_rules! impl_int { diff --git a/core/tests/block.rs b/core/tests/block.rs index f1b142e0f..1e3aa4f95 100644 --- a/core/tests/block.rs +++ b/core/tests/block.rs @@ -211,10 +211,10 @@ fn remove_coinbase_kernel_flag() { #[test] fn serialize_deserialize_header_version() { let mut vec1 = Vec::new(); - ser::serialize(&mut vec1, &1_u16).expect("serialization failed"); + ser::serialize_default(&mut vec1, &1_u16).expect("serialization failed"); let mut vec2 = Vec::new(); - ser::serialize(&mut vec2, &HeaderVersion::default()).expect("serialization failed"); + ser::serialize_default(&mut vec2, &HeaderVersion::default()).expect("serialization failed"); // Check that a header_version serializes to a // single u16 value with no extraneous bytes wrapping it. @@ -235,7 +235,7 @@ fn serialize_deserialize_block_header() { let header1 = b.header; let mut vec = Vec::new(); - ser::serialize(&mut vec, &header1).expect("serialization failed"); + ser::serialize_default(&mut vec, &header1).expect("serialization failed"); let header2: BlockHeader = ser::deserialize_default(&mut &vec[..]).unwrap(); assert_eq!(header1.hash(), header2.hash()); @@ -252,7 +252,7 @@ fn serialize_deserialize_block() { let b = new_block(vec![&tx1], &keychain, &builder, &prev, &key_id); let mut vec = Vec::new(); - ser::serialize(&mut vec, &b).expect("serialization failed"); + ser::serialize_default(&mut vec, &b).expect("serialization failed"); let b2: Block = ser::deserialize_default(&mut &vec[..]).unwrap(); assert_eq!(b.hash(), b2.hash()); @@ -270,7 +270,7 @@ fn empty_block_serialized_size() { let key_id = ExtKeychain::derive_key_id(1, 1, 0, 0, 0); let b = new_block(vec![], &keychain, &builder, &prev, &key_id); let mut vec = Vec::new(); - ser::serialize(&mut vec, &b).expect("serialization failed"); + ser::serialize_default(&mut vec, &b).expect("serialization failed"); let target_len = 1_265; assert_eq!(vec.len(), target_len); } @@ -284,7 +284,7 @@ fn block_single_tx_serialized_size() { let key_id = ExtKeychain::derive_key_id(1, 1, 0, 0, 0); let b = new_block(vec![&tx1], &keychain, &builder, &prev, &key_id); let mut vec = Vec::new(); - ser::serialize(&mut vec, &b).expect("serialization failed"); + ser::serialize_default(&mut vec, &b).expect("serialization failed"); let target_len = 2_847; assert_eq!(vec.len(), target_len); } @@ -298,7 +298,7 @@ fn empty_compact_block_serialized_size() { let b = new_block(vec![], &keychain, &builder, &prev, &key_id); let cb: CompactBlock = b.into(); let mut vec = Vec::new(); - ser::serialize(&mut vec, &cb).expect("serialization failed"); + ser::serialize_default(&mut vec, &cb).expect("serialization failed"); let target_len = 1_273; assert_eq!(vec.len(), target_len); } @@ -313,7 +313,7 @@ fn compact_block_single_tx_serialized_size() { let b = new_block(vec![&tx1], &keychain, &builder, &prev, &key_id); let cb: CompactBlock = b.into(); let mut vec = Vec::new(); - ser::serialize(&mut vec, &cb).expect("serialization failed"); + ser::serialize_default(&mut vec, &cb).expect("serialization failed"); let target_len = 1_279; assert_eq!(vec.len(), target_len); } @@ -333,7 +333,7 @@ fn block_10_tx_serialized_size() { let key_id = ExtKeychain::derive_key_id(1, 1, 0, 0, 0); let b = new_block(txs.iter().collect(), &keychain, &builder, &prev, &key_id); let mut vec = Vec::new(); - ser::serialize(&mut vec, &b).expect("serialization failed"); + ser::serialize_default(&mut vec, &b).expect("serialization failed"); let target_len = 17_085; assert_eq!(vec.len(), target_len,); } @@ -353,7 +353,7 @@ fn compact_block_10_tx_serialized_size() { let b = new_block(txs.iter().collect(), &keychain, &builder, &prev, &key_id); let cb: CompactBlock = b.into(); let mut vec = Vec::new(); - ser::serialize(&mut vec, &cb).expect("serialization failed"); + ser::serialize_default(&mut vec, &cb).expect("serialization failed"); let target_len = 1_333; assert_eq!(vec.len(), target_len,); } @@ -439,7 +439,7 @@ fn serialize_deserialize_compact_block() { let mut cb1: CompactBlock = b.into(); let mut vec = Vec::new(); - ser::serialize(&mut vec, &cb1).expect("serialization failed"); + ser::serialize_default(&mut vec, &cb1).expect("serialization failed"); // After header serialization, timestamp will lose 'nanos' info, that's the designed behavior. // To suppress 'nanos' difference caused assertion fail, we force b.header also lose 'nanos'. diff --git a/core/tests/core.rs b/core/tests/core.rs index d0f5dff17..a3da0037a 100644 --- a/core/tests/core.rs +++ b/core/tests/core.rs @@ -39,7 +39,7 @@ use std::sync::Arc; fn simple_tx_ser() { let tx = tx2i1o(); let mut vec = Vec::new(); - ser::serialize(&mut vec, &tx).expect("serialization failed"); + ser::serialize_default(&mut vec, &tx).expect("serialization failed"); let target_len = 955; assert_eq!(vec.len(), target_len,); } @@ -48,7 +48,7 @@ fn simple_tx_ser() { fn simple_tx_ser_deser() { let tx = tx2i1o(); let mut vec = Vec::new(); - ser::serialize(&mut vec, &tx).expect("serialization failed"); + ser::serialize_default(&mut vec, &tx).expect("serialization failed"); let dtx: Transaction = ser::deserialize_default(&mut &vec[..]).unwrap(); assert_eq!(dtx.fee(), 2); assert_eq!(dtx.inputs().len(), 2); @@ -62,11 +62,11 @@ fn tx_double_ser_deser() { let btx = tx2i1o(); let mut vec = Vec::new(); - assert!(ser::serialize(&mut vec, &btx).is_ok()); + assert!(ser::serialize_default(&mut vec, &btx).is_ok()); let dtx: Transaction = ser::deserialize_default(&mut &vec[..]).unwrap(); let mut vec2 = Vec::new(); - assert!(ser::serialize(&mut vec2, &btx).is_ok()); + assert!(ser::serialize_default(&mut vec2, &btx).is_ok()); let dtx2: Transaction = ser::deserialize_default(&mut &vec2[..]).unwrap(); assert_eq!(btx.hash(), dtx.hash()); diff --git a/core/tests/merkle_proof.rs b/core/tests/merkle_proof.rs index 56b945c6c..f25497398 100644 --- a/core/tests/merkle_proof.rs +++ b/core/tests/merkle_proof.rs @@ -37,7 +37,7 @@ fn merkle_proof_ser_deser() { let proof = pmmr.merkle_proof(9).unwrap(); let mut vec = Vec::new(); - ser::serialize(&mut vec, &proof).expect("serialization failed"); + ser::serialize_default(&mut vec, &proof).expect("serialization failed"); let proof_2: MerkleProof = ser::deserialize_default(&mut &vec[..]).unwrap(); assert_eq!(proof, proof_2); diff --git a/core/tests/transaction.rs b/core/tests/transaction.rs index 3e9548b49..65b7ff6f8 100644 --- a/core/tests/transaction.rs +++ b/core/tests/transaction.rs @@ -39,7 +39,7 @@ fn test_output_ser_deser() { }; let mut vec = vec![]; - ser::serialize(&mut vec, &out).expect("serialized failed"); + ser::serialize_default(&mut vec, &out).expect("serialized failed"); let dout: Output = ser::deserialize_default(&mut &vec[..]).unwrap(); assert_eq!(dout.features, OutputFeatures::Plain); diff --git a/p2p/src/conn.rs b/p2p/src/conn.rs index ad6fb7c8b..e916669b8 100644 --- a/p2p/src/conn.rs +++ b/p2p/src/conn.rs @@ -123,6 +123,7 @@ impl<'a> Message<'a> { pub struct Response<'a> { resp_type: Type, body: Vec, + version: ProtocolVersion, stream: &'a mut dyn Write, attachment: Option, } @@ -130,20 +131,25 @@ pub struct Response<'a> { impl<'a> Response<'a> { pub fn new( resp_type: Type, + version: ProtocolVersion, body: T, stream: &'a mut dyn Write, ) -> Result, Error> { - let body = ser::ser_vec(&body)?; + let body = ser::ser_vec(&body, version)?; Ok(Response { resp_type, body, + version, stream, attachment: None, }) } fn write(mut self, tracker: Arc) -> Result<(), Error> { - let mut msg = ser::ser_vec(&MsgHeader::new(self.resp_type, self.body.len() as u64))?; + let mut msg = ser::ser_vec( + &MsgHeader::new(self.resp_type, self.body.len() as u64), + self.version, + )?; msg.append(&mut self.body); write_all(&mut self.stream, &msg[..], time::Duration::from_secs(10))?; tracker.inc_sent(msg.len() as u64); @@ -213,11 +219,11 @@ pub struct ConnHandle { } impl ConnHandle { - pub fn send(&self, body: T, msg_type: Type) -> Result + pub fn send(&self, body: T, msg_type: Type, version: ProtocolVersion) -> Result where T: ser::Writeable, { - let buf = write_to_buf(body, msg_type)?; + let buf = write_to_buf(body, msg_type, version)?; let buf_len = buf.len(); self.send_channel.try_send(buf)?; Ok(buf_len as u64) diff --git a/p2p/src/handshake.rs b/p2p/src/handshake.rs index 94f5305f0..504ddb242 100644 --- a/p2p/src/handshake.rs +++ b/p2p/src/handshake.rs @@ -73,8 +73,8 @@ impl Handshake { Err(e) => return Err(Error::Connection(e)), }; - // Using our default version here. - let version = ProtocolVersion::default(); + // Using our default "local" protocol version. + let version = ProtocolVersion::local(); let hand = Hand { version, @@ -88,7 +88,7 @@ impl Handshake { }; // write and read the handshake response - write_message(conn, hand, Type::Hand)?; + write_message(conn, hand, Type::Hand, version)?; // Note: We have to read the Shake message *before* we know which protocol // version our peer supports (it is in the shake message itself). @@ -132,8 +132,9 @@ impl Handshake { conn: &mut TcpStream, ) -> Result { // Note: We read the Hand message *before* we know which protocol version - // is supported by our peer (it is in the Hand message). - let version = ProtocolVersion::default(); + // is supported by our peer (in the Hand message). + let version = ProtocolVersion::local(); + let hand: Hand = read_message(conn, version, Type::Hand)?; // all the reasons we could refuse this connection for @@ -177,17 +178,16 @@ impl Handshake { // send our reply with our info let shake = Shake { - version: ProtocolVersion::default(), + version, capabilities: capab, genesis: self.genesis, total_difficulty: total_difficulty, user_agent: USER_AGENT.to_string(), }; - write_message(conn, shake, Type::Shake)?; + write_message(conn, shake, Type::Shake, version)?; trace!("Success handshake with {}.", peer_info.addr); - // when more than one protocol version is supported, choosing should go here Ok(peer_info) } diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index 1ee0f37e4..67338d3c0 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -191,15 +191,19 @@ pub fn read_message( } } -pub fn write_to_buf(msg: T, msg_type: Type) -> Result, Error> { +pub fn write_to_buf( + msg: T, + msg_type: Type, + version: ProtocolVersion, +) -> Result, Error> { // prepare the body first so we know its serialized length let mut body_buf = vec![]; - ser::serialize(&mut body_buf, &msg)?; + ser::serialize(&mut body_buf, version, &msg)?; // build and serialize the header using the body size let mut msg_buf = vec![]; let blen = body_buf.len() as u64; - ser::serialize(&mut msg_buf, &MsgHeader::new(msg_type, blen))?; + ser::serialize(&mut msg_buf, version, &MsgHeader::new(msg_type, blen))?; msg_buf.append(&mut body_buf); Ok(msg_buf) @@ -209,8 +213,9 @@ pub fn write_message( stream: &mut dyn Write, msg: T, msg_type: Type, + version: ProtocolVersion, ) -> Result<(), Error> { - let buf = write_to_buf(msg, msg_type)?; + let buf = write_to_buf(msg, msg_type, version)?; stream.write_all(&buf[..])?; Ok(()) } diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 88ae1b436..89b4e2b01 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -224,7 +224,10 @@ impl Peer { /// Send a msg with given msg_type to our peer via the connection. fn send(&self, msg: T, msg_type: Type) -> Result<(), Error> { - let bytes = self.send_handle.lock().send(msg, msg_type)?; + let bytes = self + .send_handle + .lock() + .send(msg, msg_type, self.info.version)?; self.tracker.inc_sent(bytes); Ok(()) } diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index 721c56848..33eeea479 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -66,6 +66,7 @@ impl MessageHandler for Protocol { Ok(Some(Response::new( Type::Pong, + self.peer_info.version, Pong { total_difficulty: adapter.total_difficulty()?, height: adapter.total_height()?, @@ -104,7 +105,12 @@ impl MessageHandler for Protocol { ); let tx = adapter.get_transaction(h); if let Some(tx) = tx { - Ok(Some(Response::new(Type::Transaction, tx, writer)?)) + Ok(Some(Response::new( + Type::Transaction, + self.peer_info.version, + tx, + writer, + )?)) } else { Ok(None) } @@ -140,7 +146,12 @@ impl MessageHandler for Protocol { let bo = adapter.get_block(h); if let Some(b) = bo { - return Ok(Some(Response::new(Type::Block, b, writer)?)); + return Ok(Some(Response::new( + Type::Block, + self.peer_info.version, + b, + writer, + )?)); } Ok(None) } @@ -162,7 +173,12 @@ impl MessageHandler for Protocol { let h: Hash = msg.body()?; if let Some(b) = adapter.get_block(h) { let cb: CompactBlock = b.into(); - Ok(Some(Response::new(Type::CompactBlock, cb, writer)?)) + Ok(Some(Response::new( + Type::CompactBlock, + self.peer_info.version, + cb, + writer, + )?)) } else { Ok(None) } @@ -187,6 +203,7 @@ impl MessageHandler for Protocol { // serialize and send all the headers over Ok(Some(Response::new( Type::Headers, + self.peer_info.version, Headers { headers }, writer, )?)) @@ -232,6 +249,7 @@ impl MessageHandler for Protocol { let peers = adapter.find_peer_addrs(get_peers.capabilities); Ok(Some(Response::new( Type::PeerAddrs, + self.peer_info.version, PeerAddrs { peers }, writer, )?)) @@ -248,8 +266,12 @@ impl MessageHandler for Protocol { let kernel_data = self.adapter.kernel_data_read()?; let bytes = kernel_data.metadata()?.len(); let kernel_data_response = KernelDataResponse { bytes }; - let mut response = - Response::new(Type::KernelDataResponse, &kernel_data_response, writer)?; + let mut response = Response::new( + Type::KernelDataResponse, + self.peer_info.version, + &kernel_data_response, + writer, + )?; response.add_attachment(kernel_data); Ok(Some(response)) } @@ -304,6 +326,7 @@ impl MessageHandler for Protocol { let file_sz = txhashset.reader.metadata()?.len(); let mut resp = Response::new( Type::TxHashSetArchive, + self.peer_info.version, &TxHashSetArchive { height: sm_req.height as u64, hash: sm_req.hash, diff --git a/servers/src/grin/server.rs b/servers/src/grin/server.rs index a3d9fb4b5..9eecad746 100644 --- a/servers/src/grin/server.rs +++ b/servers/src/grin/server.rs @@ -417,7 +417,7 @@ impl Server { /// The p2p layer protocol version for this node. pub fn protocol_version() -> ProtocolVersion { - ProtocolVersion::default() + ProtocolVersion::local() } /// Returns a set of stats about this server. This and the ServerStats diff --git a/servers/src/mining/stratumserver.rs b/servers/src/mining/stratumserver.rs index 6c8a0c1a7..fd5658ace 100644 --- a/servers/src/mining/stratumserver.rs +++ b/servers/src/mining/stratumserver.rs @@ -331,7 +331,7 @@ impl Handler { // Serialize the block header into pre and post nonce strings let mut header_buf = vec![]; { - let mut writer = ser::BinWriter::new(&mut header_buf); + let mut writer = ser::BinWriter::default(&mut header_buf); bh.write_pre_pow(&mut writer).unwrap(); bh.pow.write_pre_pow(&mut writer).unwrap(); } diff --git a/store/src/lmdb.rs b/store/src/lmdb.rs index e7f52b090..62c6aca23 100644 --- a/store/src/lmdb.rs +++ b/store/src/lmdb.rs @@ -22,7 +22,7 @@ use lmdb_zero as lmdb; use lmdb_zero::traits::CreateCursor; use lmdb_zero::LmdbResultExt; -use crate::core::ser; +use crate::core::ser::{self, ProtocolVersion}; use crate::util::{RwLock, RwLockReadGuard}; /// number of bytes to grow the database by when needed @@ -68,6 +68,7 @@ pub struct Store { env: Arc, db: RwLock>>>, name: String, + version: ProtocolVersion, } impl Store { @@ -111,6 +112,7 @@ impl Store { env: Arc::new(env), db: RwLock::new(None), name: db_name, + version: ProtocolVersion(1), }; { @@ -230,7 +232,7 @@ impl Store { ) -> Result, Error> { let res: lmdb::error::Result<&[u8]> = access.get(&db.as_ref().unwrap(), key); match res.to_opt() { - Ok(Some(mut res)) => match ser::deserialize_db(&mut res) { + Ok(Some(mut res)) => match ser::deserialize(&mut res, self.version) { Ok(res) => Ok(Some(res)), Err(e) => Err(Error::SerErr(format!("{}", e))), }, @@ -259,6 +261,7 @@ impl Store { cursor, seek: false, prefix: from.to_vec(), + version: self.version, _marker: marker::PhantomData, }) } @@ -296,7 +299,7 @@ impl<'a> Batch<'a> { /// Writes a single key and its `Writeable` value to the db. Encapsulates /// serialization. pub fn put_ser(&self, key: &[u8], value: &W) -> Result<(), Error> { - let ser_value = ser::ser_vec(value); + let ser_value = ser::ser_vec(value, self.store.version); match ser_value { Ok(data) => self.put(key, &data), Err(err) => Err(Error::SerErr(format!("{}", err))), @@ -360,6 +363,7 @@ where cursor: Arc>, seek: bool, prefix: Vec, + version: ProtocolVersion, _marker: marker::PhantomData, } @@ -393,7 +397,7 @@ where fn deser_if_prefix_match(&self, key: &[u8], value: &[u8]) -> Option<(Vec, T)> { let plen = self.prefix.len(); if plen == 0 || key[0..plen] == self.prefix[..] { - if let Ok(value) = ser::deserialize_db(&mut &value[..]) { + if let Ok(value) = ser::deserialize(&mut &value[..], self.version) { Some((key.to_vec(), value)) } else { None diff --git a/store/src/pmmr.rs b/store/src/pmmr.rs index ee01f2157..ceed2ef0d 100644 --- a/store/src/pmmr.rs +++ b/store/src/pmmr.rs @@ -19,7 +19,7 @@ use std::{io, time}; use crate::core::core::hash::{Hash, Hashed}; use crate::core::core::pmmr::{self, family, Backend}; use crate::core::core::BlockHeader; -use crate::core::ser::{FixedLength, PMMRable}; +use crate::core::ser::{FixedLength, PMMRable, ProtocolVersion}; use crate::leaf_set::LeafSet; use crate::prune_list::PruneList; use crate::types::{AppendOnlyFile, DataFile, SizeEntry, SizeInfo}; @@ -206,6 +206,11 @@ impl PMMRBackend { fixed_size: bool, header: Option<&BlockHeader>, ) -> io::Result> { + // Note: Explicit protocol version here. + // Regardless of our "default" protocol version we have existing MMR files + // and we need to be able to support these across upgrades. + let version = ProtocolVersion(1); + let data_dir = data_dir.as_ref(); // Are we dealing with "fixed size" data elements or "variable size" data elements @@ -216,14 +221,15 @@ impl PMMRBackend { SizeInfo::VariableSize(Box::new(AppendOnlyFile::open( data_dir.join(PMMR_SIZE_FILE), SizeInfo::FixedSize(SizeEntry::LEN as u16), + version, )?)) }; // Hash file is always "fixed size" and we use 32 bytes per hash. let hash_size_info = SizeInfo::FixedSize(Hash::LEN as u16); - let hash_file = DataFile::open(&data_dir.join(PMMR_HASH_FILE), hash_size_info)?; - let data_file = DataFile::open(&data_dir.join(PMMR_DATA_FILE), size_info)?; + let hash_file = DataFile::open(&data_dir.join(PMMR_HASH_FILE), hash_size_info, version)?; + let data_file = DataFile::open(&data_dir.join(PMMR_DATA_FILE), size_info, version)?; let leaf_set_path = data_dir.join(PMMR_LEAF_FILE); diff --git a/store/src/types.rs b/store/src/types.rs index 2f46f7621..2d4de2870 100644 --- a/store/src/types.rs +++ b/store/src/types.rs @@ -78,12 +78,16 @@ where T: Readable + Writeable + Debug, { /// Open (or create) a file at the provided path on disk. - pub fn open

(path: P, size_info: SizeInfo) -> io::Result> + pub fn open

( + path: P, + size_info: SizeInfo, + version: ProtocolVersion, + ) -> io::Result> where P: AsRef + Debug, { Ok(DataFile { - file: AppendOnlyFile::open(path, size_info)?, + file: AppendOnlyFile::open(path, size_info, version)?, }) } @@ -177,6 +181,7 @@ pub struct AppendOnlyFile { path: PathBuf, file: Option, size_info: SizeInfo, + version: ProtocolVersion, mmap: Option, // Buffer of unsync'd bytes. These bytes will be appended to the file when flushed. @@ -191,7 +196,11 @@ where T: Debug + Readable + Writeable, { /// Open a file (existing or not) as append-only, backed by a mmap. - pub fn open

(path: P, size_info: SizeInfo) -> io::Result> + pub fn open

( + path: P, + size_info: SizeInfo, + version: ProtocolVersion, + ) -> io::Result> where P: AsRef + Debug, { @@ -199,6 +208,7 @@ where file: None, path: path.as_ref().to_path_buf(), size_info, + version, mmap: None, buffer: vec![], buffer_start_pos: 0, @@ -268,7 +278,8 @@ where /// Append element to append-only file by serializing it to bytes and appending the bytes. fn append_elmt(&mut self, data: &T) -> io::Result<()> { - let mut bytes = ser::ser_vec(data).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + let mut bytes = ser::ser_vec(data, self.version) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; self.append(&mut bytes)?; Ok(()) } @@ -415,7 +426,8 @@ where fn read_as_elmt(&self, pos: u64) -> io::Result { let data = self.read(pos)?; - ser::deserialize_db(&mut &data[..]).map_err(|e| io::Error::new(io::ErrorKind::Other, e)) + ser::deserialize(&mut &data[..], self.version) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) } // Read length bytes starting at offset from the buffer. @@ -471,10 +483,10 @@ where let reader = File::open(&self.path)?; let mut buf_reader = BufReader::new(reader); let mut streaming_reader = - StreamingReader::new(&mut buf_reader, ProtocolVersion::local_db(), time::Duration::from_secs(1)); + StreamingReader::new(&mut buf_reader, self.version, time::Duration::from_secs(1)); let mut buf_writer = BufWriter::new(File::create(&tmp_path)?); - let mut bin_writer = BinWriter::new(&mut buf_writer); + let mut bin_writer = BinWriter::new(&mut buf_writer, self.version); let mut current_pos = 0; let mut prune_pos = prune_pos; @@ -517,11 +529,14 @@ where { let reader = File::open(&self.path)?; let mut buf_reader = BufReader::new(reader); - let mut streaming_reader = - StreamingReader::new(&mut buf_reader, ProtocolVersion::local_db(), time::Duration::from_secs(1)); + let mut streaming_reader = StreamingReader::new( + &mut buf_reader, + self.version, + time::Duration::from_secs(1), + ); let mut buf_writer = BufWriter::new(File::create(&tmp_path)?); - let mut bin_writer = BinWriter::new(&mut buf_writer); + let mut bin_writer = BinWriter::new(&mut buf_writer, self.version); let mut current_offset = 0; while let Ok(_) = T::read(&mut streaming_reader) {