diff --git a/p2p/src/codec.rs b/p2p/src/codec.rs index 293613f47..ae0322bc6 100644 --- a/p2p/src/codec.rs +++ b/p2p/src/codec.rs @@ -1,8 +1,33 @@ -use crate::core::core::block::{BlockHeader, UntrustedBlockHeader}; +// Copyright 2020 The Grin Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Provides a connection wrapper that handles the lower level tasks in sending +//! or receiving data from the TCP socket, as well as dealing with timeouts. +//! +//! Because of a few idiosyncracies in the Rust `TcpStream`, this has to use +//! async I/O to be able to both read *and* write on the connection. Which +//! forces us to go through some additional gymnastic to loop over the async +//! stream and make sure we get the right number of bytes out. + use crate::core::global::header_size_bytes; use crate::core::ser::{BufReader, ProtocolVersion, Readable}; use crate::msg::{Message, MsgHeader, MsgHeaderWrapper, Type}; use crate::types::{AttachmentMeta, AttachmentUpdate, Error}; +use crate::{ + core::core::block::{BlockHeader, UntrustedBlockHeader}, + msg::HeadersData, +}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use core::ser::Reader; use std::cmp::min; @@ -169,18 +194,21 @@ impl Codec { headers.push(header.into()); *bytes_left = bytes_left.saturating_sub(bytes_read); *items_left -= 1; - - if headers.len() == HEADER_BATCH_SIZE || *items_left == 0 { + let remaining = *items_left as u64; + if headers.len() == HEADER_BATCH_SIZE || remaining == 0 { let mut h = Vec::with_capacity(min(HEADER_BATCH_SIZE, *items_left)); mem::swap(headers, &mut h); - if *items_left == 0 { + if remaining == 0 { let bytes_left = *bytes_left; self.state = None; if bytes_left > 0 { return Err(Error::BadMessage); } } - return Ok(Message::Headers(h)); + return Ok(Message::Headers(HeadersData { + headers: h, + remaining, + })); } } Attachment(left, meta, now) => { diff --git a/p2p/src/conn.rs b/p2p/src/conn.rs index 6be82190d..dfbbfddce 100644 --- a/p2p/src/conn.rs +++ b/p2p/src/conn.rs @@ -249,6 +249,15 @@ where // increase the appropriate counter match &next { Ok(Message::Attachment(_, _)) => reader_tracker.inc_quiet_received(bytes_read), + Ok(Message::Headers(data)) => { + // We process a full 512 headers locally in smaller 32 header batches. + // We only want to increment the msg count once for the full 512 headers. + if data.remaining == 0 { + reader_tracker.inc_received(bytes_read); + } else { + reader_tracker.inc_quiet_received(bytes_read); + } + } _ => reader_tracker.inc_received(bytes_read), } diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index 93de6dca2..b47d39c0d 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -865,7 +865,7 @@ pub enum Message { CompactBlock(UntrustedCompactBlock), GetHeaders(Locator), Header(UntrustedBlockHeader), - Headers(Vec), + Headers(HeadersData), GetPeerAddrs(GetPeerAddrs), PeerAddrs(PeerAddrs), TxHashSetRequest(TxHashSetRequest), @@ -881,6 +881,17 @@ pub enum Message { KernelSegment(SegmentResponse), } +/// We receive 512 headers from a peer. +/// But we process them in smaller batches of 32 headers. +/// HeadersData wraps the current batch and a count of the headers remaining after this batch. +pub struct HeadersData { + /// Batch of headers currently being processed. + pub headers: Vec, + /// Number of headers stil to be processed after this current batch. + /// 0 indicates this is the final batch from the larger set of headers received from the peer. + pub remaining: u64, +} + impl fmt::Display for Message { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index 0b35ec4a6..9a2cf8b4c 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -199,8 +199,8 @@ impl MessageHandler for Protocol { Consumed::None } - Message::Headers(headers) => { - adapter.headers_received(&headers, &self.peer_info)?; + Message::Headers(data) => { + adapter.headers_received(&data.headers, &self.peer_info)?; Consumed::None }