[2.x.x] Writeable protocol version aware (#2856)

* introduce protocol version to deserialize and read

* thread protocol version through our reader

* cleanup

* cleanup

* streaming_reader cleanup

* Pass protocol version into BinWriter to allow for version specific serialization rules.

* rustfmt

* read and write now protocol version specific
This commit is contained in:
Antioch Peverell 2019-07-06 15:51:03 +01:00 committed by GitHub
parent b6daf1edfd
commit d284d8f6de
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
24 changed files with 192 additions and 95 deletions

View file

@ -76,7 +76,7 @@ impl PoolPushHandler {
})
.and_then(move |tx_bin| {
// TODO - pass protocol version in via the api call?
let version = ProtocolVersion::default();
let version = ProtocolVersion::local();
ser::deserialize(&mut &tx_bin[..], version)
.map_err(|e| ErrorKind::RequestError(format!("Bad request: {}", e)).into())

View file

@ -83,7 +83,7 @@ pub struct Status {
impl Status {
pub fn from_tip_and_peers(current_tip: chain::Tip, connections: u32) -> Status {
Status {
protocol_version: ser::ProtocolVersion::default().into(),
protocol_version: ser::ProtocolVersion::local().into(),
user_agent: p2p::msg::USER_AGENT.to_string(),
connections: connections,
tip: Tip::from_tip(current_tip),

View file

@ -647,7 +647,8 @@ impl Chain {
/// TODO - Write this data to disk and validate the rebuilt kernel MMR.
pub fn kernel_data_write(&self, reader: &mut Read) -> Result<(), Error> {
let mut count = 0;
let mut stream = StreamingReader::new(reader, ProtocolVersion::default(), Duration::from_secs(1));
let mut stream =
StreamingReader::new(reader, ProtocolVersion::local(), Duration::from_secs(1));
while let Ok(_kernel) = TxKernelEntry::read(&mut stream) {
count += 1;
}

View file

@ -359,7 +359,7 @@ impl BlockHeader {
pub fn pre_pow(&self) -> Vec<u8> {
let mut header_buf = vec![];
{
let mut writer = ser::BinWriter::new(&mut header_buf);
let mut writer = ser::BinWriter::default(&mut header_buf);
self.write_pre_pow(&mut writer).unwrap();
self.pow.write_pre_pow(&mut writer).unwrap();
writer.write_u64(self.pow.nonce).unwrap();

View file

@ -25,7 +25,9 @@ use std::{fmt, ops};
use crate::blake2::blake2b::Blake2b;
use crate::ser::{self, AsFixedBytes, Error, FixedLength, Readable, Reader, Writeable, Writer};
use crate::ser::{
self, AsFixedBytes, Error, FixedLength, ProtocolVersion, Readable, Reader, Writeable, Writer,
};
use crate::util;
/// A hash consisting of all zeroes, used as a sentinel. No known preimage.
@ -219,6 +221,10 @@ impl ser::Writer for HashWriter {
self.state.update(b32.as_ref());
Ok(())
}
fn protocol_version(&self) -> ProtocolVersion {
ProtocolVersion::local()
}
}
/// A trait for types that have a canonical hash

View file

@ -17,7 +17,7 @@
use crate::core::hash::Hash;
use crate::core::pmmr;
use crate::ser;
use crate::ser::{PMMRIndexHashable, ProtocolVersion, Readable, Reader, Writeable, Writer};
use crate::ser::{PMMRIndexHashable, Readable, Reader, Writeable, Writer};
use crate::util;
/// Merkle proof errors.
@ -78,14 +78,14 @@ impl MerkleProof {
/// Serialize the Merkle proof as a hex string (for api json endpoints)
pub fn to_hex(&self) -> String {
let mut vec = Vec::new();
ser::serialize(&mut vec, &self).expect("serialization failed");
ser::serialize_default(&mut vec, &self).expect("serialization failed");
util::to_hex(vec)
}
/// Convert hex string representation back to a Merkle proof instance
pub fn from_hex(hex: &str) -> Result<MerkleProof, String> {
let bytes = util::from_hex(hex.to_string()).unwrap();
let res = ser::deserialize(&mut &bytes[..], ProtocolVersion::default())
let res = ser::deserialize_default(&mut &bytes[..])
.map_err(|_| "failed to deserialize a Merkle Proof".to_string())?;
Ok(res)
}

View file

@ -185,13 +185,19 @@ hashable_ord!(TxKernel);
impl ::std::hash::Hash for TxKernel {
fn hash<H: ::std::hash::Hasher>(&self, state: &mut H) {
let mut vec = Vec::new();
ser::serialize(&mut vec, &self).expect("serialization failed");
ser::serialize_default(&mut vec, &self).expect("serialization failed");
::std::hash::Hash::hash(&vec, state);
}
}
impl Writeable for TxKernel {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
// We have access to the protocol version here.
// This may be a protocol version based on a peer connection
// or the version used locally for db storage.
// We can handle version specific serialization here.
let _version = writer.protocol_version();
self.features.write(writer)?;
ser_multiwrite!(writer, [write_u64, self.fee], [write_u64, self.lock_height]);
self.excess.write(writer)?;
@ -1165,7 +1171,7 @@ hashable_ord!(Input);
impl ::std::hash::Hash for Input {
fn hash<H: ::std::hash::Hasher>(&self, state: &mut H) {
let mut vec = Vec::new();
ser::serialize(&mut vec, &self).expect("serialization failed");
ser::serialize_default(&mut vec, &self).expect("serialization failed");
::std::hash::Hash::hash(&vec, state);
}
}
@ -1276,7 +1282,7 @@ hashable_ord!(Output);
impl ::std::hash::Hash for Output {
fn hash<H: ::std::hash::Hasher>(&self, state: &mut H) {
let mut vec = Vec::new();
ser::serialize(&mut vec, &self).expect("serialization failed");
ser::serialize_default(&mut vec, &self).expect("serialization failed");
::std::hash::Hash::hash(&vec, state);
}
}
@ -1528,7 +1534,7 @@ mod test {
};
let mut vec = vec![];
ser::serialize(&mut vec, &kernel).expect("serialized failed");
ser::serialize_default(&mut vec, &kernel).expect("serialized failed");
let kernel2: TxKernel = ser::deserialize_default(&mut &vec[..]).unwrap();
assert_eq!(kernel2.features, KernelFeatures::Plain);
assert_eq!(kernel2.lock_height, 0);
@ -1546,7 +1552,7 @@ mod test {
};
let mut vec = vec![];
ser::serialize(&mut vec, &kernel).expect("serialized failed");
ser::serialize_default(&mut vec, &kernel).expect("serialized failed");
let kernel2: TxKernel = ser::deserialize_default(&mut &vec[..]).unwrap();
assert_eq!(kernel2.features, KernelFeatures::HeightLocked);
assert_eq!(kernel2.lock_height, 100);

View file

@ -288,13 +288,13 @@ pub fn genesis_main() -> core::Block {
mod test {
use super::*;
use crate::core::hash::Hashed;
use crate::ser;
use crate::ser::{self, ProtocolVersion};
#[test]
fn floonet_genesis_hash() {
let gen_hash = genesis_floo().hash();
println!("floonet genesis hash: {}", gen_hash.to_hex());
let gen_bin = ser::ser_vec(&genesis_floo()).unwrap();
let gen_bin = ser::ser_vec(&genesis_floo(), ProtocolVersion(1)).unwrap();
println!("floonet genesis full hash: {}\n", gen_bin.hash().to_hex());
assert_eq!(
gen_hash.to_hex(),
@ -310,7 +310,7 @@ mod test {
fn mainnet_genesis_hash() {
let gen_hash = genesis_main().hash();
println!("mainnet genesis hash: {}", gen_hash.to_hex());
let gen_bin = ser::ser_vec(&genesis_main()).unwrap();
let gen_bin = ser::ser_vec(&genesis_main(), ProtocolVersion(1)).unwrap();
println!("mainnet genesis full hash: {}\n", gen_bin.hash().to_hex());
assert_eq!(
gen_hash.to_hex(),

View file

@ -33,6 +33,13 @@ use crate::util::RwLock;
/// Define these here, as they should be developer-set, not really tweakable
/// by users
/// The default "local" protocol version for this node.
/// We negotiate compatible versions with each peer via Hand/Shake.
/// Note: We also use a specific (possible different) protocol version
/// for both the backend database and MMR data files.
/// This one is p2p layer specific.
pub const PROTOCOL_VERSION: u32 = 1;
/// Automated testing edge_bits
pub const AUTOMATED_TESTING_MIN_EDGE_BITS: u8 = 9;

View file

@ -20,6 +20,7 @@
//! `serialize` or `deserialize` functions on them as appropriate.
use crate::core::hash::{DefaultHashable, Hash, Hashed};
use crate::global::PROTOCOL_VERSION;
use crate::keychain::{BlindingFactor, Identifier, IDENTIFIER_SIZE};
use crate::util::read_write::read_exact;
use crate::util::secp::constants::{
@ -135,6 +136,9 @@ pub trait Writer {
/// The mode this serializer is writing in
fn serialization_mode(&self) -> SerializationMode;
/// Protocol version for version specific serialization rules.
fn protocol_version(&self) -> ProtocolVersion;
/// Writes a u8 as bytes
fn write_u8(&mut self, n: u8) -> Result<(), Error> {
self.write_fixed_bytes(&[n])
@ -286,9 +290,10 @@ where
#[derive(Clone, Copy, Debug, Deserialize, Eq, Ord, PartialOrd, PartialEq, Serialize)]
pub struct ProtocolVersion(pub u32);
impl Default for ProtocolVersion {
fn default() -> ProtocolVersion {
ProtocolVersion(1)
impl ProtocolVersion {
/// Our default "local" protocol version.
pub fn local() -> ProtocolVersion {
ProtocolVersion(PROTOCOL_VERSION)
}
}
@ -346,27 +351,31 @@ pub fn deserialize<T: Readable>(
T::read(&mut reader)
}
/// Deserialize a Readable based on our local db version protocol.
pub fn deserialize_db<T: Readable>(source: &mut dyn Read) -> Result<T, Error> {
deserialize(source, ProtocolVersion::local_db())
}
/// Deserialize a Readable based on our local "default" version protocol.
/// Deserialize a Readable based on our default "local" protocol version.
pub fn deserialize_default<T: Readable>(source: &mut dyn Read) -> Result<T, Error> {
deserialize(source, ProtocolVersion::default())
deserialize(source, ProtocolVersion::local())
}
/// Serializes a Writeable into any std::io::Write implementation.
pub fn serialize<W: Writeable>(sink: &mut dyn Write, thing: &W) -> Result<(), Error> {
let mut writer = BinWriter { sink };
pub fn serialize<W: Writeable>(
sink: &mut dyn Write,
version: ProtocolVersion,
thing: &W,
) -> Result<(), Error> {
let mut writer = BinWriter::new(sink, version);
thing.write(&mut writer)
}
/// Serialize a Writeable according to our default "local" protocol version.
pub fn serialize_default<W: Writeable>(sink: &mut dyn Write, thing: &W) -> Result<(), Error> {
serialize(sink, ProtocolVersion::local(), thing)
}
/// Utility function to serialize a writeable directly in memory using a
/// Vec<u8>.
pub fn ser_vec<W: Writeable>(thing: &W) -> Result<Vec<u8>, Error> {
pub fn ser_vec<W: Writeable>(thing: &W, version: ProtocolVersion) -> Result<Vec<u8>, Error> {
let mut vec = vec![];
serialize(&mut vec, thing)?;
serialize(&mut vec, version, thing)?;
Ok(vec)
}
@ -475,32 +484,28 @@ impl<'a> StreamingReader<'a> {
}
}
/// Note: We use read_fixed_bytes() here to ensure our "async" I/O behaves as expected.
impl<'a> Reader for StreamingReader<'a> {
fn read_u8(&mut self) -> Result<u8, Error> {
let buf = self.read_fixed_bytes(1)?;
Ok(buf[0])
}
fn read_u16(&mut self) -> Result<u16, Error> {
let buf = self.read_fixed_bytes(2)?;
Ok(BigEndian::read_u16(&buf[..]))
}
fn read_u32(&mut self) -> Result<u32, Error> {
let buf = self.read_fixed_bytes(4)?;
Ok(BigEndian::read_u32(&buf[..]))
}
fn read_i32(&mut self) -> Result<i32, Error> {
let buf = self.read_fixed_bytes(4)?;
Ok(BigEndian::read_i32(&buf[..]))
}
fn read_u64(&mut self) -> Result<u64, Error> {
let buf = self.read_fixed_bytes(8)?;
Ok(BigEndian::read_u64(&buf[..]))
}
fn read_i64(&mut self) -> Result<i64, Error> {
let buf = self.read_fixed_bytes(8)?;
Ok(BigEndian::read_i64(&buf[..]))
@ -683,12 +688,18 @@ impl<T: Hashed> VerifySortedAndUnique<T> for Vec<T> {
/// to write numbers, byte vectors, hashes, etc.
pub struct BinWriter<'a> {
sink: &'a mut dyn Write,
version: ProtocolVersion,
}
impl<'a> BinWriter<'a> {
/// Wraps a standard Write in a new BinWriter
pub fn new(write: &'a mut dyn Write) -> BinWriter<'a> {
BinWriter { sink: write }
pub fn new(sink: &'a mut dyn Write, version: ProtocolVersion) -> BinWriter<'a> {
BinWriter { sink, version }
}
/// Constructor for BinWriter with default "local" protocol version.
pub fn default(sink: &'a mut dyn Write) -> BinWriter<'a> {
BinWriter::new(sink, ProtocolVersion::local())
}
}
@ -702,6 +713,10 @@ impl<'a> Writer for BinWriter<'a> {
self.sink.write_all(bs)?;
Ok(())
}
fn protocol_version(&self) -> ProtocolVersion {
self.version
}
}
macro_rules! impl_int {

View file

@ -211,10 +211,10 @@ fn remove_coinbase_kernel_flag() {
#[test]
fn serialize_deserialize_header_version() {
let mut vec1 = Vec::new();
ser::serialize(&mut vec1, &1_u16).expect("serialization failed");
ser::serialize_default(&mut vec1, &1_u16).expect("serialization failed");
let mut vec2 = Vec::new();
ser::serialize(&mut vec2, &HeaderVersion::default()).expect("serialization failed");
ser::serialize_default(&mut vec2, &HeaderVersion::default()).expect("serialization failed");
// Check that a header_version serializes to a
// single u16 value with no extraneous bytes wrapping it.
@ -235,7 +235,7 @@ fn serialize_deserialize_block_header() {
let header1 = b.header;
let mut vec = Vec::new();
ser::serialize(&mut vec, &header1).expect("serialization failed");
ser::serialize_default(&mut vec, &header1).expect("serialization failed");
let header2: BlockHeader = ser::deserialize_default(&mut &vec[..]).unwrap();
assert_eq!(header1.hash(), header2.hash());
@ -252,7 +252,7 @@ fn serialize_deserialize_block() {
let b = new_block(vec![&tx1], &keychain, &builder, &prev, &key_id);
let mut vec = Vec::new();
ser::serialize(&mut vec, &b).expect("serialization failed");
ser::serialize_default(&mut vec, &b).expect("serialization failed");
let b2: Block = ser::deserialize_default(&mut &vec[..]).unwrap();
assert_eq!(b.hash(), b2.hash());
@ -270,7 +270,7 @@ fn empty_block_serialized_size() {
let key_id = ExtKeychain::derive_key_id(1, 1, 0, 0, 0);
let b = new_block(vec![], &keychain, &builder, &prev, &key_id);
let mut vec = Vec::new();
ser::serialize(&mut vec, &b).expect("serialization failed");
ser::serialize_default(&mut vec, &b).expect("serialization failed");
let target_len = 1_265;
assert_eq!(vec.len(), target_len);
}
@ -284,7 +284,7 @@ fn block_single_tx_serialized_size() {
let key_id = ExtKeychain::derive_key_id(1, 1, 0, 0, 0);
let b = new_block(vec![&tx1], &keychain, &builder, &prev, &key_id);
let mut vec = Vec::new();
ser::serialize(&mut vec, &b).expect("serialization failed");
ser::serialize_default(&mut vec, &b).expect("serialization failed");
let target_len = 2_847;
assert_eq!(vec.len(), target_len);
}
@ -298,7 +298,7 @@ fn empty_compact_block_serialized_size() {
let b = new_block(vec![], &keychain, &builder, &prev, &key_id);
let cb: CompactBlock = b.into();
let mut vec = Vec::new();
ser::serialize(&mut vec, &cb).expect("serialization failed");
ser::serialize_default(&mut vec, &cb).expect("serialization failed");
let target_len = 1_273;
assert_eq!(vec.len(), target_len);
}
@ -313,7 +313,7 @@ fn compact_block_single_tx_serialized_size() {
let b = new_block(vec![&tx1], &keychain, &builder, &prev, &key_id);
let cb: CompactBlock = b.into();
let mut vec = Vec::new();
ser::serialize(&mut vec, &cb).expect("serialization failed");
ser::serialize_default(&mut vec, &cb).expect("serialization failed");
let target_len = 1_279;
assert_eq!(vec.len(), target_len);
}
@ -333,7 +333,7 @@ fn block_10_tx_serialized_size() {
let key_id = ExtKeychain::derive_key_id(1, 1, 0, 0, 0);
let b = new_block(txs.iter().collect(), &keychain, &builder, &prev, &key_id);
let mut vec = Vec::new();
ser::serialize(&mut vec, &b).expect("serialization failed");
ser::serialize_default(&mut vec, &b).expect("serialization failed");
let target_len = 17_085;
assert_eq!(vec.len(), target_len,);
}
@ -353,7 +353,7 @@ fn compact_block_10_tx_serialized_size() {
let b = new_block(txs.iter().collect(), &keychain, &builder, &prev, &key_id);
let cb: CompactBlock = b.into();
let mut vec = Vec::new();
ser::serialize(&mut vec, &cb).expect("serialization failed");
ser::serialize_default(&mut vec, &cb).expect("serialization failed");
let target_len = 1_333;
assert_eq!(vec.len(), target_len,);
}
@ -439,7 +439,7 @@ fn serialize_deserialize_compact_block() {
let mut cb1: CompactBlock = b.into();
let mut vec = Vec::new();
ser::serialize(&mut vec, &cb1).expect("serialization failed");
ser::serialize_default(&mut vec, &cb1).expect("serialization failed");
// After header serialization, timestamp will lose 'nanos' info, that's the designed behavior.
// To suppress 'nanos' difference caused assertion fail, we force b.header also lose 'nanos'.

View file

@ -39,7 +39,7 @@ use std::sync::Arc;
fn simple_tx_ser() {
let tx = tx2i1o();
let mut vec = Vec::new();
ser::serialize(&mut vec, &tx).expect("serialization failed");
ser::serialize_default(&mut vec, &tx).expect("serialization failed");
let target_len = 955;
assert_eq!(vec.len(), target_len,);
}
@ -48,7 +48,7 @@ fn simple_tx_ser() {
fn simple_tx_ser_deser() {
let tx = tx2i1o();
let mut vec = Vec::new();
ser::serialize(&mut vec, &tx).expect("serialization failed");
ser::serialize_default(&mut vec, &tx).expect("serialization failed");
let dtx: Transaction = ser::deserialize_default(&mut &vec[..]).unwrap();
assert_eq!(dtx.fee(), 2);
assert_eq!(dtx.inputs().len(), 2);
@ -62,11 +62,11 @@ fn tx_double_ser_deser() {
let btx = tx2i1o();
let mut vec = Vec::new();
assert!(ser::serialize(&mut vec, &btx).is_ok());
assert!(ser::serialize_default(&mut vec, &btx).is_ok());
let dtx: Transaction = ser::deserialize_default(&mut &vec[..]).unwrap();
let mut vec2 = Vec::new();
assert!(ser::serialize(&mut vec2, &btx).is_ok());
assert!(ser::serialize_default(&mut vec2, &btx).is_ok());
let dtx2: Transaction = ser::deserialize_default(&mut &vec2[..]).unwrap();
assert_eq!(btx.hash(), dtx.hash());

View file

@ -37,7 +37,7 @@ fn merkle_proof_ser_deser() {
let proof = pmmr.merkle_proof(9).unwrap();
let mut vec = Vec::new();
ser::serialize(&mut vec, &proof).expect("serialization failed");
ser::serialize_default(&mut vec, &proof).expect("serialization failed");
let proof_2: MerkleProof = ser::deserialize_default(&mut &vec[..]).unwrap();
assert_eq!(proof, proof_2);

View file

@ -39,7 +39,7 @@ fn test_output_ser_deser() {
};
let mut vec = vec![];
ser::serialize(&mut vec, &out).expect("serialized failed");
ser::serialize_default(&mut vec, &out).expect("serialized failed");
let dout: Output = ser::deserialize_default(&mut &vec[..]).unwrap();
assert_eq!(dout.features, OutputFeatures::Plain);

View file

@ -123,6 +123,7 @@ impl<'a> Message<'a> {
pub struct Response<'a> {
resp_type: Type,
body: Vec<u8>,
version: ProtocolVersion,
stream: &'a mut dyn Write,
attachment: Option<File>,
}
@ -130,20 +131,25 @@ pub struct Response<'a> {
impl<'a> Response<'a> {
pub fn new<T: ser::Writeable>(
resp_type: Type,
version: ProtocolVersion,
body: T,
stream: &'a mut dyn Write,
) -> Result<Response<'a>, Error> {
let body = ser::ser_vec(&body)?;
let body = ser::ser_vec(&body, version)?;
Ok(Response {
resp_type,
body,
version,
stream,
attachment: None,
})
}
fn write(mut self, tracker: Arc<Tracker>) -> Result<(), Error> {
let mut msg = ser::ser_vec(&MsgHeader::new(self.resp_type, self.body.len() as u64))?;
let mut msg = ser::ser_vec(
&MsgHeader::new(self.resp_type, self.body.len() as u64),
self.version,
)?;
msg.append(&mut self.body);
write_all(&mut self.stream, &msg[..], time::Duration::from_secs(10))?;
tracker.inc_sent(msg.len() as u64);
@ -213,11 +219,11 @@ pub struct ConnHandle {
}
impl ConnHandle {
pub fn send<T>(&self, body: T, msg_type: Type) -> Result<u64, Error>
pub fn send<T>(&self, body: T, msg_type: Type, version: ProtocolVersion) -> Result<u64, Error>
where
T: ser::Writeable,
{
let buf = write_to_buf(body, msg_type)?;
let buf = write_to_buf(body, msg_type, version)?;
let buf_len = buf.len();
self.send_channel.try_send(buf)?;
Ok(buf_len as u64)

View file

@ -73,8 +73,8 @@ impl Handshake {
Err(e) => return Err(Error::Connection(e)),
};
// Using our default version here.
let version = ProtocolVersion::default();
// Using our default "local" protocol version.
let version = ProtocolVersion::local();
let hand = Hand {
version,
@ -88,7 +88,7 @@ impl Handshake {
};
// write and read the handshake response
write_message(conn, hand, Type::Hand)?;
write_message(conn, hand, Type::Hand, version)?;
// Note: We have to read the Shake message *before* we know which protocol
// version our peer supports (it is in the shake message itself).
@ -132,8 +132,9 @@ impl Handshake {
conn: &mut TcpStream,
) -> Result<PeerInfo, Error> {
// Note: We read the Hand message *before* we know which protocol version
// is supported by our peer (it is in the Hand message).
let version = ProtocolVersion::default();
// is supported by our peer (in the Hand message).
let version = ProtocolVersion::local();
let hand: Hand = read_message(conn, version, Type::Hand)?;
// all the reasons we could refuse this connection for
@ -177,17 +178,16 @@ impl Handshake {
// send our reply with our info
let shake = Shake {
version: ProtocolVersion::default(),
version,
capabilities: capab,
genesis: self.genesis,
total_difficulty: total_difficulty,
user_agent: USER_AGENT.to_string(),
};
write_message(conn, shake, Type::Shake)?;
write_message(conn, shake, Type::Shake, version)?;
trace!("Success handshake with {}.", peer_info.addr);
// when more than one protocol version is supported, choosing should go here
Ok(peer_info)
}

View file

@ -191,15 +191,19 @@ pub fn read_message<T: Readable>(
}
}
pub fn write_to_buf<T: Writeable>(msg: T, msg_type: Type) -> Result<Vec<u8>, Error> {
pub fn write_to_buf<T: Writeable>(
msg: T,
msg_type: Type,
version: ProtocolVersion,
) -> Result<Vec<u8>, Error> {
// prepare the body first so we know its serialized length
let mut body_buf = vec![];
ser::serialize(&mut body_buf, &msg)?;
ser::serialize(&mut body_buf, version, &msg)?;
// build and serialize the header using the body size
let mut msg_buf = vec![];
let blen = body_buf.len() as u64;
ser::serialize(&mut msg_buf, &MsgHeader::new(msg_type, blen))?;
ser::serialize(&mut msg_buf, version, &MsgHeader::new(msg_type, blen))?;
msg_buf.append(&mut body_buf);
Ok(msg_buf)
@ -209,8 +213,9 @@ pub fn write_message<T: Writeable>(
stream: &mut dyn Write,
msg: T,
msg_type: Type,
version: ProtocolVersion,
) -> Result<(), Error> {
let buf = write_to_buf(msg, msg_type)?;
let buf = write_to_buf(msg, msg_type, version)?;
stream.write_all(&buf[..])?;
Ok(())
}

View file

@ -224,7 +224,10 @@ impl Peer {
/// Send a msg with given msg_type to our peer via the connection.
fn send<T: Writeable>(&self, msg: T, msg_type: Type) -> Result<(), Error> {
let bytes = self.send_handle.lock().send(msg, msg_type)?;
let bytes = self
.send_handle
.lock()
.send(msg, msg_type, self.info.version)?;
self.tracker.inc_sent(bytes);
Ok(())
}

View file

@ -66,6 +66,7 @@ impl MessageHandler for Protocol {
Ok(Some(Response::new(
Type::Pong,
self.peer_info.version,
Pong {
total_difficulty: adapter.total_difficulty()?,
height: adapter.total_height()?,
@ -104,7 +105,12 @@ impl MessageHandler for Protocol {
);
let tx = adapter.get_transaction(h);
if let Some(tx) = tx {
Ok(Some(Response::new(Type::Transaction, tx, writer)?))
Ok(Some(Response::new(
Type::Transaction,
self.peer_info.version,
tx,
writer,
)?))
} else {
Ok(None)
}
@ -140,7 +146,12 @@ impl MessageHandler for Protocol {
let bo = adapter.get_block(h);
if let Some(b) = bo {
return Ok(Some(Response::new(Type::Block, b, writer)?));
return Ok(Some(Response::new(
Type::Block,
self.peer_info.version,
b,
writer,
)?));
}
Ok(None)
}
@ -162,7 +173,12 @@ impl MessageHandler for Protocol {
let h: Hash = msg.body()?;
if let Some(b) = adapter.get_block(h) {
let cb: CompactBlock = b.into();
Ok(Some(Response::new(Type::CompactBlock, cb, writer)?))
Ok(Some(Response::new(
Type::CompactBlock,
self.peer_info.version,
cb,
writer,
)?))
} else {
Ok(None)
}
@ -187,6 +203,7 @@ impl MessageHandler for Protocol {
// serialize and send all the headers over
Ok(Some(Response::new(
Type::Headers,
self.peer_info.version,
Headers { headers },
writer,
)?))
@ -232,6 +249,7 @@ impl MessageHandler for Protocol {
let peers = adapter.find_peer_addrs(get_peers.capabilities);
Ok(Some(Response::new(
Type::PeerAddrs,
self.peer_info.version,
PeerAddrs { peers },
writer,
)?))
@ -248,8 +266,12 @@ impl MessageHandler for Protocol {
let kernel_data = self.adapter.kernel_data_read()?;
let bytes = kernel_data.metadata()?.len();
let kernel_data_response = KernelDataResponse { bytes };
let mut response =
Response::new(Type::KernelDataResponse, &kernel_data_response, writer)?;
let mut response = Response::new(
Type::KernelDataResponse,
self.peer_info.version,
&kernel_data_response,
writer,
)?;
response.add_attachment(kernel_data);
Ok(Some(response))
}
@ -304,6 +326,7 @@ impl MessageHandler for Protocol {
let file_sz = txhashset.reader.metadata()?.len();
let mut resp = Response::new(
Type::TxHashSetArchive,
self.peer_info.version,
&TxHashSetArchive {
height: sm_req.height as u64,
hash: sm_req.hash,

View file

@ -417,7 +417,7 @@ impl Server {
/// The p2p layer protocol version for this node.
pub fn protocol_version() -> ProtocolVersion {
ProtocolVersion::default()
ProtocolVersion::local()
}
/// Returns a set of stats about this server. This and the ServerStats

View file

@ -331,7 +331,7 @@ impl Handler {
// Serialize the block header into pre and post nonce strings
let mut header_buf = vec![];
{
let mut writer = ser::BinWriter::new(&mut header_buf);
let mut writer = ser::BinWriter::default(&mut header_buf);
bh.write_pre_pow(&mut writer).unwrap();
bh.pow.write_pre_pow(&mut writer).unwrap();
}

View file

@ -22,7 +22,7 @@ use lmdb_zero as lmdb;
use lmdb_zero::traits::CreateCursor;
use lmdb_zero::LmdbResultExt;
use crate::core::ser;
use crate::core::ser::{self, ProtocolVersion};
use crate::util::{RwLock, RwLockReadGuard};
/// number of bytes to grow the database by when needed
@ -68,6 +68,7 @@ pub struct Store {
env: Arc<lmdb::Environment>,
db: RwLock<Option<Arc<lmdb::Database<'static>>>>,
name: String,
version: ProtocolVersion,
}
impl Store {
@ -111,6 +112,7 @@ impl Store {
env: Arc::new(env),
db: RwLock::new(None),
name: db_name,
version: ProtocolVersion(1),
};
{
@ -230,7 +232,7 @@ impl Store {
) -> Result<Option<T>, Error> {
let res: lmdb::error::Result<&[u8]> = access.get(&db.as_ref().unwrap(), key);
match res.to_opt() {
Ok(Some(mut res)) => match ser::deserialize_db(&mut res) {
Ok(Some(mut res)) => match ser::deserialize(&mut res, self.version) {
Ok(res) => Ok(Some(res)),
Err(e) => Err(Error::SerErr(format!("{}", e))),
},
@ -259,6 +261,7 @@ impl Store {
cursor,
seek: false,
prefix: from.to_vec(),
version: self.version,
_marker: marker::PhantomData,
})
}
@ -296,7 +299,7 @@ impl<'a> Batch<'a> {
/// Writes a single key and its `Writeable` value to the db. Encapsulates
/// serialization.
pub fn put_ser<W: ser::Writeable>(&self, key: &[u8], value: &W) -> Result<(), Error> {
let ser_value = ser::ser_vec(value);
let ser_value = ser::ser_vec(value, self.store.version);
match ser_value {
Ok(data) => self.put(key, &data),
Err(err) => Err(Error::SerErr(format!("{}", err))),
@ -360,6 +363,7 @@ where
cursor: Arc<lmdb::Cursor<'static, 'static>>,
seek: bool,
prefix: Vec<u8>,
version: ProtocolVersion,
_marker: marker::PhantomData<T>,
}
@ -393,7 +397,7 @@ where
fn deser_if_prefix_match(&self, key: &[u8], value: &[u8]) -> Option<(Vec<u8>, T)> {
let plen = self.prefix.len();
if plen == 0 || key[0..plen] == self.prefix[..] {
if let Ok(value) = ser::deserialize_db(&mut &value[..]) {
if let Ok(value) = ser::deserialize(&mut &value[..], self.version) {
Some((key.to_vec(), value))
} else {
None

View file

@ -19,7 +19,7 @@ use std::{io, time};
use crate::core::core::hash::{Hash, Hashed};
use crate::core::core::pmmr::{self, family, Backend};
use crate::core::core::BlockHeader;
use crate::core::ser::{FixedLength, PMMRable};
use crate::core::ser::{FixedLength, PMMRable, ProtocolVersion};
use crate::leaf_set::LeafSet;
use crate::prune_list::PruneList;
use crate::types::{AppendOnlyFile, DataFile, SizeEntry, SizeInfo};
@ -206,6 +206,11 @@ impl<T: PMMRable> PMMRBackend<T> {
fixed_size: bool,
header: Option<&BlockHeader>,
) -> io::Result<PMMRBackend<T>> {
// Note: Explicit protocol version here.
// Regardless of our "default" protocol version we have existing MMR files
// and we need to be able to support these across upgrades.
let version = ProtocolVersion(1);
let data_dir = data_dir.as_ref();
// Are we dealing with "fixed size" data elements or "variable size" data elements
@ -216,14 +221,15 @@ impl<T: PMMRable> PMMRBackend<T> {
SizeInfo::VariableSize(Box::new(AppendOnlyFile::open(
data_dir.join(PMMR_SIZE_FILE),
SizeInfo::FixedSize(SizeEntry::LEN as u16),
version,
)?))
};
// Hash file is always "fixed size" and we use 32 bytes per hash.
let hash_size_info = SizeInfo::FixedSize(Hash::LEN as u16);
let hash_file = DataFile::open(&data_dir.join(PMMR_HASH_FILE), hash_size_info)?;
let data_file = DataFile::open(&data_dir.join(PMMR_DATA_FILE), size_info)?;
let hash_file = DataFile::open(&data_dir.join(PMMR_HASH_FILE), hash_size_info, version)?;
let data_file = DataFile::open(&data_dir.join(PMMR_DATA_FILE), size_info, version)?;
let leaf_set_path = data_dir.join(PMMR_LEAF_FILE);

View file

@ -78,12 +78,16 @@ where
T: Readable + Writeable + Debug,
{
/// Open (or create) a file at the provided path on disk.
pub fn open<P>(path: P, size_info: SizeInfo) -> io::Result<DataFile<T>>
pub fn open<P>(
path: P,
size_info: SizeInfo,
version: ProtocolVersion,
) -> io::Result<DataFile<T>>
where
P: AsRef<Path> + Debug,
{
Ok(DataFile {
file: AppendOnlyFile::open(path, size_info)?,
file: AppendOnlyFile::open(path, size_info, version)?,
})
}
@ -177,6 +181,7 @@ pub struct AppendOnlyFile<T> {
path: PathBuf,
file: Option<File>,
size_info: SizeInfo,
version: ProtocolVersion,
mmap: Option<memmap::Mmap>,
// Buffer of unsync'd bytes. These bytes will be appended to the file when flushed.
@ -191,7 +196,11 @@ where
T: Debug + Readable + Writeable,
{
/// Open a file (existing or not) as append-only, backed by a mmap.
pub fn open<P>(path: P, size_info: SizeInfo) -> io::Result<AppendOnlyFile<T>>
pub fn open<P>(
path: P,
size_info: SizeInfo,
version: ProtocolVersion,
) -> io::Result<AppendOnlyFile<T>>
where
P: AsRef<Path> + Debug,
{
@ -199,6 +208,7 @@ where
file: None,
path: path.as_ref().to_path_buf(),
size_info,
version,
mmap: None,
buffer: vec![],
buffer_start_pos: 0,
@ -268,7 +278,8 @@ where
/// Append element to append-only file by serializing it to bytes and appending the bytes.
fn append_elmt(&mut self, data: &T) -> io::Result<()> {
let mut bytes = ser::ser_vec(data).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
let mut bytes = ser::ser_vec(data, self.version)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
self.append(&mut bytes)?;
Ok(())
}
@ -415,7 +426,8 @@ where
fn read_as_elmt(&self, pos: u64) -> io::Result<T> {
let data = self.read(pos)?;
ser::deserialize_db(&mut &data[..]).map_err(|e| io::Error::new(io::ErrorKind::Other, e))
ser::deserialize(&mut &data[..], self.version)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
}
// Read length bytes starting at offset from the buffer.
@ -471,10 +483,10 @@ where
let reader = File::open(&self.path)?;
let mut buf_reader = BufReader::new(reader);
let mut streaming_reader =
StreamingReader::new(&mut buf_reader, ProtocolVersion::local_db(), time::Duration::from_secs(1));
StreamingReader::new(&mut buf_reader, self.version, time::Duration::from_secs(1));
let mut buf_writer = BufWriter::new(File::create(&tmp_path)?);
let mut bin_writer = BinWriter::new(&mut buf_writer);
let mut bin_writer = BinWriter::new(&mut buf_writer, self.version);
let mut current_pos = 0;
let mut prune_pos = prune_pos;
@ -517,11 +529,14 @@ where
{
let reader = File::open(&self.path)?;
let mut buf_reader = BufReader::new(reader);
let mut streaming_reader =
StreamingReader::new(&mut buf_reader, ProtocolVersion::local_db(), time::Duration::from_secs(1));
let mut streaming_reader = StreamingReader::new(
&mut buf_reader,
self.version,
time::Duration::from_secs(1),
);
let mut buf_writer = BufWriter::new(File::create(&tmp_path)?);
let mut bin_writer = BinWriter::new(&mut buf_writer);
let mut bin_writer = BinWriter::new(&mut buf_writer, self.version);
let mut current_offset = 0;
while let Ok(_) = T::read(&mut streaming_reader) {