[2.x.x] deserialization now protocol version aware (#2824)

* introduce protocol version to deserialize and read

* thread protocol version through our reader

* example protocol version access in kernel read

* fix our StreamingReader impl (WouldBlock woes)

* debug log progress of txhashset download
This commit is contained in:
Antioch Peverell 2019-06-27 17:19:41 +01:00 committed by GitHub
parent 9398578947
commit 5aaf2d058d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 207 additions and 114 deletions

View file

@ -15,7 +15,7 @@
use super::utils::w;
use crate::core::core::hash::Hashed;
use crate::core::core::Transaction;
use crate::core::ser;
use crate::core::ser::{self, ProtocolVersion};
use crate::pool;
use crate::rest::*;
use crate::router::{Handler, ResponseFuture};
@ -64,7 +64,6 @@ impl PoolPushHandler {
let fluff = params.get("fluff").is_some();
let pool_arc = match w(&self.tx_pool) {
//w(&self.tx_pool).clone();
Ok(p) => p,
Err(e) => return Box::new(err(e)),
};
@ -76,7 +75,10 @@ impl PoolPushHandler {
.map_err(|e| ErrorKind::RequestError(format!("Bad request: {}", e)).into())
})
.and_then(move |tx_bin| {
ser::deserialize(&mut &tx_bin[..])
// TODO - pass protocol version in via the api call?
let version = ProtocolVersion::default();
ser::deserialize(&mut &tx_bin[..], version)
.map_err(|e| ErrorKind::RequestError(format!("Bad request: {}", e)).into())
})
.and_then(move |tx: Transaction| {

View file

@ -83,7 +83,7 @@ pub struct Status {
impl Status {
pub fn from_tip_and_peers(current_tip: chain::Tip, connections: u32) -> Status {
Status {
protocol_version: p2p::msg::ProtocolVersion::default().into(),
protocol_version: ser::ProtocolVersion::default().into(),
user_agent: p2p::msg::USER_AGENT.to_string(),
connections: connections,
tip: Tip::from_tip(current_tip),

View file

@ -23,7 +23,7 @@ use crate::core::core::{
};
use crate::core::global;
use crate::core::pow;
use crate::core::ser::{Readable, StreamingReader};
use crate::core::ser::{ProtocolVersion, Readable, StreamingReader};
use crate::error::{Error, ErrorKind};
use crate::pipe;
use crate::store;
@ -647,7 +647,7 @@ impl Chain {
/// TODO - Write this data to disk and validate the rebuilt kernel MMR.
pub fn kernel_data_write(&self, reader: &mut Read) -> Result<(), Error> {
let mut count = 0;
let mut stream = StreamingReader::new(reader, Duration::from_secs(1));
let mut stream = StreamingReader::new(reader, ProtocolVersion::default(), Duration::from_secs(1));
while let Ok(_kernel) = TxKernelEntry::read(&mut stream) {
count += 1;
}

View file

@ -17,7 +17,7 @@
use crate::core::hash::Hash;
use crate::core::pmmr;
use crate::ser;
use crate::ser::{PMMRIndexHashable, Readable, Reader, Writeable, Writer};
use crate::ser::{PMMRIndexHashable, ProtocolVersion, Readable, Reader, Writeable, Writer};
use crate::util;
/// Merkle proof errors.
@ -85,7 +85,7 @@ impl MerkleProof {
/// Convert hex string representation back to a Merkle proof instance
pub fn from_hex(hex: &str) -> Result<MerkleProof, String> {
let bytes = util::from_hex(hex.to_string()).unwrap();
let res = ser::deserialize(&mut &bytes[..])
let res = ser::deserialize(&mut &bytes[..], ProtocolVersion::default())
.map_err(|_| "failed to deserialize a Merkle Proof".to_string())?;
Ok(res)
}

View file

@ -202,6 +202,12 @@ impl Writeable for TxKernel {
impl Readable for TxKernel {
fn read(reader: &mut dyn Reader) -> Result<TxKernel, ser::Error> {
// We have access to the protocol version here.
// This may be a protocol version based on a peer connection
// or the version used locally for db storage.
// We can handle version specific deserialization here.
let _version = reader.protocol_version();
Ok(TxKernel {
features: KernelFeatures::read(reader)?,
fee: reader.read_u64()?,
@ -338,7 +344,7 @@ impl Writeable for TxKernelEntry {
}
impl Readable for TxKernelEntry {
fn read(reader: &mut Reader) -> Result<TxKernelEntry, ser::Error> {
fn read(reader: &mut dyn Reader) -> Result<TxKernelEntry, ser::Error> {
let kernel = TxKernel::read(reader)?;
Ok(TxKernelEntry { kernel })
}
@ -1523,7 +1529,7 @@ mod test {
let mut vec = vec![];
ser::serialize(&mut vec, &kernel).expect("serialized failed");
let kernel2: TxKernel = ser::deserialize(&mut &vec[..]).unwrap();
let kernel2: TxKernel = ser::deserialize_default(&mut &vec[..]).unwrap();
assert_eq!(kernel2.features, KernelFeatures::Plain);
assert_eq!(kernel2.lock_height, 0);
assert_eq!(kernel2.excess, commit);
@ -1541,7 +1547,7 @@ mod test {
let mut vec = vec![];
ser::serialize(&mut vec, &kernel).expect("serialized failed");
let kernel2: TxKernel = ser::deserialize(&mut &vec[..]).unwrap();
let kernel2: TxKernel = ser::deserialize_default(&mut &vec[..]).unwrap();
assert_eq!(kernel2.features, KernelFeatures::HeightLocked);
assert_eq!(kernel2.lock_height, 100);
assert_eq!(kernel2.excess, commit);

View file

@ -31,11 +31,11 @@ use crate::util::secp::pedersen::{Commitment, RangeProof};
use crate::util::secp::Signature;
use crate::util::secp::{ContextFlag, Secp256k1};
use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
use std::fmt::Debug;
use std::fmt::{self, Debug};
use std::io::{self, Read, Write};
use std::marker;
use std::time::Duration;
use std::{cmp, error, fmt};
use std::{cmp, error};
/// Possible errors deriving from serializing or deserializing.
#[derive(Clone, Eq, PartialEq, Debug, Serialize, Deserialize)]
@ -209,6 +209,9 @@ pub trait Reader {
/// Consumes a byte from the reader, producing an error if it doesn't have
/// the expected value
fn expect_u8(&mut self, val: u8) -> Result<u8, Error>;
/// Access to underlying protocol version to support
/// version specific deserialization logic.
fn protocol_version(&self) -> ProtocolVersion;
}
/// Trait that every type that can be serialized as binary must implement.
@ -275,6 +278,54 @@ where
Ok(res)
}
/// Protocol version for serialization/deserialization.
/// Note: This is used in various places including but limited to
/// the p2p layer and our local db storage layer.
/// We may speak multiple versions to various peers and a potentially *different*
/// version for our local db.
#[derive(Clone, Copy, Debug, Deserialize, Eq, Ord, PartialOrd, PartialEq, Serialize)]
pub struct ProtocolVersion(pub u32);
impl Default for ProtocolVersion {
fn default() -> ProtocolVersion {
ProtocolVersion(1)
}
}
impl fmt::Display for ProtocolVersion {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.0)
}
}
impl ProtocolVersion {
/// We need to specify a protocol version for our local database.
/// Regardless of specific version used when sending/receiving data between peers
/// we need to take care with serialization/deserialization of data locally in the db.
pub fn local_db() -> ProtocolVersion {
ProtocolVersion(1)
}
}
impl From<ProtocolVersion> for u32 {
fn from(v: ProtocolVersion) -> u32 {
v.0
}
}
impl Writeable for ProtocolVersion {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), Error> {
writer.write_u32(self.0)
}
}
impl Readable for ProtocolVersion {
fn read(reader: &mut dyn Reader) -> Result<ProtocolVersion, Error> {
let version = reader.read_u32()?;
Ok(ProtocolVersion(version))
}
}
/// Trait that every type that can be deserialized from binary must implement.
/// Reads directly to a Reader, a utility type thinly wrapping an
/// underlying Read implementation.
@ -287,11 +338,24 @@ where
}
/// Deserializes a Readable from any std::io::Read implementation.
pub fn deserialize<T: Readable>(source: &mut dyn Read) -> Result<T, Error> {
let mut reader = BinReader { source };
pub fn deserialize<T: Readable>(
source: &mut dyn Read,
version: ProtocolVersion,
) -> Result<T, Error> {
let mut reader = BinReader::new(source, version);
T::read(&mut reader)
}
/// Deserialize a Readable based on our local db version protocol.
pub fn deserialize_db<T: Readable>(source: &mut dyn Read) -> Result<T, Error> {
deserialize(source, ProtocolVersion::local_db())
}
/// Deserialize a Readable based on our local "default" version protocol.
pub fn deserialize_default<T: Readable>(source: &mut dyn Read) -> Result<T, Error> {
deserialize(source, ProtocolVersion::default())
}
/// Serializes a Writeable into any std::io::Write implementation.
pub fn serialize<W: Writeable>(sink: &mut dyn Write, thing: &W) -> Result<(), Error> {
let mut writer = BinWriter { sink };
@ -309,6 +373,14 @@ pub fn ser_vec<W: Writeable>(thing: &W) -> Result<Vec<u8>, Error> {
/// Utility to read from a binary source
pub struct BinReader<'a> {
source: &'a mut dyn Read,
version: ProtocolVersion,
}
impl<'a> BinReader<'a> {
/// Constructor for a new BinReader for the provided source and protocol version.
pub fn new(source: &'a mut dyn Read, version: ProtocolVersion) -> BinReader<'a> {
BinReader { source, version }
}
}
fn map_io_err(err: io::Error) -> Error {
@ -366,12 +438,17 @@ impl<'a> Reader for BinReader<'a> {
})
}
}
fn protocol_version(&self) -> ProtocolVersion {
self.version
}
}
/// A reader that reads straight off a stream.
/// Tracks total bytes read so we can verify we read the right number afterwards.
pub struct StreamingReader<'a> {
total_bytes_read: u64,
version: ProtocolVersion,
stream: &'a mut dyn Read,
timeout: Duration,
}
@ -379,9 +456,14 @@ pub struct StreamingReader<'a> {
impl<'a> StreamingReader<'a> {
/// Create a new streaming reader with the provided underlying stream.
/// Also takes a duration to be used for each individual read_exact call.
pub fn new(stream: &'a mut dyn Read, timeout: Duration) -> StreamingReader<'a> {
pub fn new(
stream: &'a mut dyn Read,
version: ProtocolVersion,
timeout: Duration,
) -> StreamingReader<'a> {
StreamingReader {
total_bytes_read: 0,
version,
stream,
timeout,
}
@ -450,6 +532,10 @@ impl<'a> Reader for StreamingReader<'a> {
})
}
}
fn protocol_version(&self) -> ProtocolVersion {
self.version
}
}
impl Readable for Commitment {

View file

@ -221,7 +221,7 @@ fn serialize_deserialize_header_version() {
assert_eq!(vec1, vec2);
// Check we can successfully deserialize a header_version.
let version: HeaderVersion = ser::deserialize(&mut &vec2[..]).unwrap();
let version: HeaderVersion = ser::deserialize_default(&mut &vec2[..]).unwrap();
assert_eq!(version.0, 1)
}
@ -236,7 +236,7 @@ fn serialize_deserialize_block_header() {
let mut vec = Vec::new();
ser::serialize(&mut vec, &header1).expect("serialization failed");
let header2: BlockHeader = ser::deserialize(&mut &vec[..]).unwrap();
let header2: BlockHeader = ser::deserialize_default(&mut &vec[..]).unwrap();
assert_eq!(header1.hash(), header2.hash());
assert_eq!(header1, header2);
@ -253,7 +253,7 @@ fn serialize_deserialize_block() {
let mut vec = Vec::new();
ser::serialize(&mut vec, &b).expect("serialization failed");
let b2: Block = ser::deserialize(&mut &vec[..]).unwrap();
let b2: Block = ser::deserialize_default(&mut &vec[..]).unwrap();
assert_eq!(b.hash(), b2.hash());
assert_eq!(b.header, b2.header);
@ -447,7 +447,7 @@ fn serialize_deserialize_compact_block() {
cb1.header.timestamp =
origin_ts - Duration::nanoseconds(origin_ts.timestamp_subsec_nanos() as i64);
let cb2: CompactBlock = ser::deserialize(&mut &vec[..]).unwrap();
let cb2: CompactBlock = ser::deserialize_default(&mut &vec[..]).unwrap();
assert_eq!(cb1.header, cb2.header);
assert_eq!(cb1.kern_ids(), cb2.kern_ids());

View file

@ -49,7 +49,7 @@ fn simple_tx_ser_deser() {
let tx = tx2i1o();
let mut vec = Vec::new();
ser::serialize(&mut vec, &tx).expect("serialization failed");
let dtx: Transaction = ser::deserialize(&mut &vec[..]).unwrap();
let dtx: Transaction = ser::deserialize_default(&mut &vec[..]).unwrap();
assert_eq!(dtx.fee(), 2);
assert_eq!(dtx.inputs().len(), 2);
assert_eq!(dtx.outputs().len(), 1);
@ -63,11 +63,11 @@ fn tx_double_ser_deser() {
let mut vec = Vec::new();
assert!(ser::serialize(&mut vec, &btx).is_ok());
let dtx: Transaction = ser::deserialize(&mut &vec[..]).unwrap();
let dtx: Transaction = ser::deserialize_default(&mut &vec[..]).unwrap();
let mut vec2 = Vec::new();
assert!(ser::serialize(&mut vec2, &btx).is_ok());
let dtx2: Transaction = ser::deserialize(&mut &vec2[..]).unwrap();
let dtx2: Transaction = ser::deserialize_default(&mut &vec2[..]).unwrap();
assert_eq!(btx.hash(), dtx.hash());
assert_eq!(dtx.hash(), dtx2.hash());

View file

@ -16,8 +16,7 @@ mod vec_backend;
use self::core::core::merkle_proof::MerkleProof;
use self::core::core::pmmr::PMMR;
use self::core::ser;
use self::core::ser::PMMRIndexHashable;
use self::core::ser::{self, PMMRIndexHashable};
use crate::vec_backend::{TestElem, VecBackend};
use grin_core as core;
@ -39,7 +38,7 @@ fn merkle_proof_ser_deser() {
let mut vec = Vec::new();
ser::serialize(&mut vec, &proof).expect("serialization failed");
let proof_2: MerkleProof = ser::deserialize(&mut &vec[..]).unwrap();
let proof_2: MerkleProof = ser::deserialize_default(&mut &vec[..]).unwrap();
assert_eq!(proof, proof_2);
}

View file

@ -40,7 +40,7 @@ fn test_output_ser_deser() {
let mut vec = vec![];
ser::serialize(&mut vec, &out).expect("serialized failed");
let dout: Output = ser::deserialize(&mut &vec[..]).unwrap();
let dout: Output = ser::deserialize_default(&mut &vec[..]).unwrap();
assert_eq!(dout.features, OutputFeatures::Plain);
assert_eq!(dout.commit, out.commit);

View file

@ -30,8 +30,7 @@ use std::{
time,
};
use crate::core::ser;
use crate::core::ser::FixedLength;
use crate::core::ser::{self, FixedLength, ProtocolVersion};
use crate::msg::{
read_body, read_discard, read_header, read_item, write_to_buf, MsgHeader, MsgHeaderWrapper,
Type,
@ -75,22 +74,31 @@ macro_rules! try_break {
pub struct Message<'a> {
pub header: MsgHeader,
stream: &'a mut dyn Read,
version: ProtocolVersion,
}
impl<'a> Message<'a> {
fn from_header(header: MsgHeader, stream: &'a mut dyn Read) -> Message<'a> {
Message { header, stream }
fn from_header(
header: MsgHeader,
stream: &'a mut dyn Read,
version: ProtocolVersion,
) -> Message<'a> {
Message {
header,
stream,
version,
}
}
/// Read the message body from the underlying connection
pub fn body<T: ser::Readable>(&mut self) -> Result<T, Error> {
read_body(&self.header, self.stream)
read_body(&self.header, self.stream, self.version)
}
/// Read a single "thing" from the underlying connection.
/// Return the thing and the total bytes read.
pub fn streaming_read<T: ser::Readable>(&mut self) -> Result<(T, u64), Error> {
read_item(self.stream)
read_item(self.stream, self.version)
}
pub fn copy_attachment(&mut self, len: usize, writer: &mut dyn Write) -> Result<usize, Error> {
@ -255,6 +263,7 @@ impl Tracker {
/// itself.
pub fn listen<H>(
stream: TcpStream,
version: ProtocolVersion,
tracker: Arc<Tracker>,
handler: H,
) -> io::Result<(ConnHandle, StopHandle)>
@ -267,7 +276,7 @@ where
stream
.set_nonblocking(true)
.expect("Non-blocking IO not available.");
let peer_thread = poll(stream, handler, send_rx, close_rx, tracker)?;
let peer_thread = poll(stream, version, handler, send_rx, close_rx, tracker)?;
Ok((
ConnHandle {
@ -282,6 +291,7 @@ where
fn poll<H>(
conn: TcpStream,
version: ProtocolVersion,
handler: H,
send_rx: mpsc::Receiver<Vec<u8>>,
close_rx: mpsc::Receiver<()>,
@ -301,9 +311,9 @@ where
let mut retry_send = Err(());
loop {
// check the read end
match try_break!(read_header(&mut reader, None)) {
match try_break!(read_header(&mut reader, version, None)) {
Some(MsgHeaderWrapper::Known(header)) => {
let msg = Message::from_header(header, &mut reader);
let msg = Message::from_header(header, &mut reader, version);
trace!(
"Received message header, type {:?}, len {}.",

View file

@ -14,7 +14,8 @@
use crate::core::core::hash::Hash;
use crate::core::pow::Difficulty;
use crate::msg::{read_message, write_message, Hand, ProtocolVersion, Shake, Type, USER_AGENT};
use crate::core::ser::ProtocolVersion;
use crate::msg::{read_message, write_message, Hand, Shake, Type, USER_AGENT};
use crate::peer::Peer;
use crate::types::{Capabilities, Direction, Error, P2PConfig, PeerAddr, PeerInfo, PeerLiveInfo};
use crate::util::RwLock;
@ -60,7 +61,7 @@ impl Handshake {
pub fn initiate(
&self,
capab: Capabilities,
capabilities: Capabilities,
total_difficulty: Difficulty,
self_addr: PeerAddr,
conn: &mut TcpStream,
@ -72,12 +73,15 @@ impl Handshake {
Err(e) => return Err(Error::Connection(e)),
};
// Using our default version here.
let version = ProtocolVersion::default();
let hand = Hand {
version: ProtocolVersion::default(),
capabilities: capab,
nonce: nonce,
version,
capabilities,
nonce,
genesis: self.genesis,
total_difficulty: total_difficulty,
total_difficulty,
sender_addr: self_addr,
receiver_addr: peer_addr,
user_agent: USER_AGENT.to_string(),
@ -85,7 +89,10 @@ impl Handshake {
// write and read the handshake response
write_message(conn, hand, Type::Hand)?;
let shake: Shake = read_message(conn, Type::Shake)?;
// Note: We have to read the Shake message *before* we know which protocol
// version our peer supports (it is in the shake message itself).
let shake: Shake = read_message(conn, version, Type::Shake)?;
if shake.genesis != self.genesis {
return Err(Error::GenesisMismatch {
us: self.genesis,
@ -124,7 +131,10 @@ impl Handshake {
total_difficulty: Difficulty,
conn: &mut TcpStream,
) -> Result<PeerInfo, Error> {
let hand: Hand = read_message(conn, Type::Hand)?;
// Note: We read the Hand message *before* we know which protocol version
// is supported by our peer (it is in the Hand message).
let version = ProtocolVersion::default();
let hand: Hand = read_message(conn, version, Type::Hand)?;
// all the reasons we could refuse this connection for
if hand.genesis != self.genesis {

View file

@ -15,30 +15,21 @@
//! Message types that transit over the network and related serialization code.
use num::FromPrimitive;
use std::fmt;
use std::io::{Read, Write};
use std::time;
use crate::core::core::hash::Hash;
use crate::core::core::BlockHeader;
use crate::core::pow::Difficulty;
use crate::core::ser::{self, FixedLength, Readable, Reader, StreamingReader, Writeable, Writer};
use crate::core::ser::{
self, FixedLength, ProtocolVersion, Readable, Reader, StreamingReader, Writeable, Writer,
};
use crate::core::{consensus, global};
use crate::types::{
Capabilities, Error, PeerAddr, ReasonForBan, MAX_BLOCK_HEADERS, MAX_LOCATORS, MAX_PEER_ADDRS,
};
use crate::util::read_write::read_exact;
/// Our local node protocol version.
/// We will increment the protocol version with every change to p2p msg serialization
/// so we will likely connect with peers with both higher and lower protocol versions.
/// We need to be aware that some msg formats will be potentially incompatible and handle
/// this for each individual peer connection.
/// Note: A peer may disconnect and reconnect with an updated protocol version. Normally
/// the protocol version will increase but we need to handle decreasing values also
/// as a peer may rollback to previous version of the code.
const PROTOCOL_VERSION: u32 = 1;
/// Grin's user agent with current version
pub const USER_AGENT: &'static str = concat!("MW/Grin ", env!("CARGO_PKG_VERSION"));
@ -134,6 +125,7 @@ fn magic() -> [u8; 2] {
///
pub fn read_header(
stream: &mut dyn Read,
version: ProtocolVersion,
msg_type: Option<Type>,
) -> Result<MsgHeaderWrapper, Error> {
let mut head = vec![0u8; MsgHeader::LEN];
@ -142,26 +134,33 @@ pub fn read_header(
} else {
read_exact(stream, &mut head, time::Duration::from_secs(10), false)?;
}
let header = ser::deserialize::<MsgHeaderWrapper>(&mut &head[..])?;
let header = ser::deserialize::<MsgHeaderWrapper>(&mut &head[..], version)?;
Ok(header)
}
/// Read a single item from the provided stream, always blocking until we
/// have a result (or timeout).
/// Returns the item and the total bytes read.
pub fn read_item<T: Readable>(stream: &mut dyn Read) -> Result<(T, u64), Error> {
pub fn read_item<T: Readable>(
stream: &mut dyn Read,
version: ProtocolVersion,
) -> Result<(T, u64), Error> {
let timeout = time::Duration::from_secs(20);
let mut reader = StreamingReader::new(stream, timeout);
let mut reader = StreamingReader::new(stream, version, timeout);
let res = T::read(&mut reader)?;
Ok((res, reader.total_bytes_read()))
}
/// Read a message body from the provided stream, always blocking
/// until we have a result (or timeout).
pub fn read_body<T: Readable>(h: &MsgHeader, stream: &mut dyn Read) -> Result<T, Error> {
pub fn read_body<T: Readable>(
h: &MsgHeader,
stream: &mut dyn Read,
version: ProtocolVersion,
) -> Result<T, Error> {
let mut body = vec![0u8; h.msg_len as usize];
read_exact(stream, &mut body, time::Duration::from_secs(20), true)?;
ser::deserialize(&mut &body[..]).map_err(From::from)
ser::deserialize(&mut &body[..], version).map_err(From::from)
}
/// Read (an unknown) message from the provided stream and discard it.
@ -172,11 +171,15 @@ pub fn read_discard(msg_len: u64, stream: &mut dyn Read) -> Result<(), Error> {
}
/// Reads a full message from the underlying stream.
pub fn read_message<T: Readable>(stream: &mut dyn Read, msg_type: Type) -> Result<T, Error> {
match read_header(stream, Some(msg_type))? {
pub fn read_message<T: Readable>(
stream: &mut dyn Read,
version: ProtocolVersion,
msg_type: Type,
) -> Result<T, Error> {
match read_header(stream, version, Some(msg_type))? {
MsgHeaderWrapper::Known(header) => {
if header.msg_type == msg_type {
read_body(&header, stream)
read_body(&header, stream, version)
} else {
Err(Error::BadMessage)
}
@ -309,40 +312,6 @@ impl Readable for MsgHeaderWrapper {
}
}
#[derive(Clone, Copy, Debug, Deserialize, Eq, Ord, PartialOrd, PartialEq, Serialize)]
pub struct ProtocolVersion(pub u32);
impl Default for ProtocolVersion {
fn default() -> ProtocolVersion {
ProtocolVersion(PROTOCOL_VERSION)
}
}
impl From<ProtocolVersion> for u32 {
fn from(v: ProtocolVersion) -> u32 {
v.0
}
}
impl fmt::Display for ProtocolVersion {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.0)
}
}
impl Writeable for ProtocolVersion {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
writer.write_u32(self.0)
}
}
impl Readable for ProtocolVersion {
fn read(reader: &mut dyn Reader) -> Result<ProtocolVersion, ser::Error> {
let version = reader.read_u32()?;
Ok(ProtocolVersion(version))
}
}
/// First part of a handshake, sender advertises its version and
/// characteristics.
pub struct Hand {
@ -436,10 +405,8 @@ impl Writeable for Shake {
impl Readable for Shake {
fn read(reader: &mut dyn Reader) -> Result<Shake, ser::Error> {
let version = ProtocolVersion::read(reader)?;
let capab = reader.read_u32()?;
let capabilities = Capabilities::from_bits_truncate(capab);
let total_difficulty = Difficulty::read(reader)?;
let ua = reader.read_bytes_len_prefix()?;
let user_agent = String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData)?;

View file

@ -75,7 +75,7 @@ impl Peer {
let tracking_adapter = TrackingAdapter::new(adapter);
let handler = Protocol::new(Arc::new(tracking_adapter.clone()), info.clone());
let tracker = Arc::new(conn::Tracker::new());
let (sendh, stoph) = conn::listen(conn, tracker.clone(), handler)?;
let (sendh, stoph) = conn::listen(conn, info.version, tracker.clone(), handler)?;
let send_handle = Mutex::new(sendh);
let stop_handle = Mutex::new(stoph);
Ok(Peer {

View file

@ -25,6 +25,7 @@ use std::cmp;
use std::fs::{self, File, OpenOptions};
use std::io::{BufWriter, Seek, SeekFrom, Write};
use std::sync::Arc;
use std::time::Instant;
use tempfile::tempfile;
pub struct Protocol {
@ -340,6 +341,7 @@ impl MessageHandler for Protocol {
download_start_time.timestamp(),
nonce
));
let mut now = Instant::now();
let mut save_txhashset_to_file = |file| -> Result<(), Error> {
let mut tmp_zip =
BufWriter::new(OpenOptions::new().write(true).create_new(true).open(file)?);
@ -355,11 +357,21 @@ impl MessageHandler for Protocol {
downloaded_size as u64,
total_size as u64,
);
if now.elapsed().as_secs() > 10 {
now = Instant::now();
debug!(
"handle_payload: txhashset archive: {}/{}",
downloaded_size, total_size
);
}
// Increase received bytes quietly (without affecting the counters).
// Otherwise we risk banning a peer as "abusive".
tracker.inc_quiet_received(size as u64)
}
debug!(
"handle_payload: txhashset archive: {}/{} ... DONE",
downloaded_size, total_size
);
tmp_zip
.into_inner()
.map_err(|_| Error::Internal)?

View file

@ -29,8 +29,7 @@ use crate::core::core;
use crate::core::core::hash::Hash;
use crate::core::global;
use crate::core::pow::Difficulty;
use crate::core::ser::{self, Readable, Reader, Writeable, Writer};
use crate::msg::ProtocolVersion;
use crate::core::ser::{self, ProtocolVersion, Readable, Reader, Writeable, Writer};
use grin_store;
/// Maximum number of block headers a peer should ever send

View file

@ -21,6 +21,7 @@ use std::time::SystemTime;
use crate::core::consensus::graph_weight;
use crate::core::core::hash::Hash;
use crate::core::ser::ProtocolVersion;
use chrono::prelude::*;
@ -147,7 +148,7 @@ pub struct PeerStats {
/// Address
pub addr: String,
/// version running
pub version: p2p::msg::ProtocolVersion,
pub version: ProtocolVersion,
/// Peer user agent string.
pub user_agent: String,
/// difficulty reported by peer

View file

@ -39,6 +39,7 @@ use crate::common::stats::{DiffBlock, DiffStats, PeerStats, ServerStateInfo, Ser
use crate::common::types::{Error, ServerConfig, StratumServerConfig, SyncState, SyncStatus};
use crate::core::core::hash::{Hashed, ZERO_HASH};
use crate::core::core::verifier_cache::{LruVerifierCache, VerifierCache};
use crate::core::ser::ProtocolVersion;
use crate::core::{consensus, genesis, global, pow};
use crate::grin::{dandelion_monitor, seed, sync};
use crate::mining::stratumserver;
@ -414,9 +415,9 @@ impl Server {
self.chain.header_head().map_err(|e| e.into())
}
/// Current p2p layer protocol version.
pub fn protocol_version() -> p2p::msg::ProtocolVersion {
p2p::msg::ProtocolVersion::default()
/// The p2p layer protocol version for this node.
pub fn protocol_version() -> ProtocolVersion {
ProtocolVersion::default()
}
/// Returns a set of stats about this server. This and the ServerStats

View file

@ -230,7 +230,7 @@ impl Store {
) -> Result<Option<T>, Error> {
let res: lmdb::error::Result<&[u8]> = access.get(&db.as_ref().unwrap(), key);
match res.to_opt() {
Ok(Some(mut res)) => match ser::deserialize(&mut res) {
Ok(Some(mut res)) => match ser::deserialize_db(&mut res) {
Ok(res) => Ok(Some(res)),
Err(e) => Err(Error::SerErr(format!("{}", e))),
},
@ -393,7 +393,7 @@ where
fn deser_if_prefix_match(&self, key: &[u8], value: &[u8]) -> Option<(Vec<u8>, T)> {
let plen = self.prefix.len();
if plen == 0 || key[0..plen] == self.prefix[..] {
if let Ok(value) = ser::deserialize(&mut &value[..]) {
if let Ok(value) = ser::deserialize_db(&mut &value[..]) {
Some((key.to_vec(), value))
} else {
None

View file

@ -16,7 +16,7 @@ use memmap;
use tempfile::tempfile;
use crate::core::ser::{
self, BinWriter, FixedLength, Readable, Reader, StreamingReader, Writeable, Writer,
self, BinWriter, FixedLength, ProtocolVersion, Readable, Reader, StreamingReader, Writeable, Writer,
};
use std::fmt::Debug;
use std::fs::{self, File, OpenOptions};
@ -415,7 +415,7 @@ where
fn read_as_elmt(&self, pos: u64) -> io::Result<T> {
let data = self.read(pos)?;
ser::deserialize(&mut &data[..]).map_err(|e| io::Error::new(io::ErrorKind::Other, e))
ser::deserialize_db(&mut &data[..]).map_err(|e| io::Error::new(io::ErrorKind::Other, e))
}
// Read length bytes starting at offset from the buffer.
@ -471,7 +471,7 @@ where
let reader = File::open(&self.path)?;
let mut buf_reader = BufReader::new(reader);
let mut streaming_reader =
StreamingReader::new(&mut buf_reader, time::Duration::from_secs(1));
StreamingReader::new(&mut buf_reader, ProtocolVersion::local_db(), time::Duration::from_secs(1));
let mut buf_writer = BufWriter::new(File::create(&tmp_path)?);
let mut bin_writer = BinWriter::new(&mut buf_writer);
@ -518,7 +518,7 @@ where
let reader = File::open(&self.path)?;
let mut buf_reader = BufReader::new(reader);
let mut streaming_reader =
StreamingReader::new(&mut buf_reader, time::Duration::from_secs(1));
StreamingReader::new(&mut buf_reader, ProtocolVersion::local_db(), time::Duration::from_secs(1));
let mut buf_writer = BufWriter::new(File::create(&tmp_path)?);
let mut bin_writer = BinWriter::new(&mut buf_writer);