inc quietly on small batches of headers (#3564)

This commit is contained in:
Antioch Peverell 2021-02-16 14:36:28 +00:00 committed by GitHub
parent 524dbd0170
commit 5b9664bccf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 56 additions and 8 deletions

View file

@ -1,8 +1,33 @@
use crate::core::core::block::{BlockHeader, UntrustedBlockHeader};
// Copyright 2020 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Provides a connection wrapper that handles the lower level tasks in sending
//! or receiving data from the TCP socket, as well as dealing with timeouts.
//!
//! Because of a few idiosyncracies in the Rust `TcpStream`, this has to use
//! async I/O to be able to both read *and* write on the connection. Which
//! forces us to go through some additional gymnastic to loop over the async
//! stream and make sure we get the right number of bytes out.
use crate::core::global::header_size_bytes;
use crate::core::ser::{BufReader, ProtocolVersion, Readable};
use crate::msg::{Message, MsgHeader, MsgHeaderWrapper, Type};
use crate::types::{AttachmentMeta, AttachmentUpdate, Error};
use crate::{
core::core::block::{BlockHeader, UntrustedBlockHeader},
msg::HeadersData,
};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use core::ser::Reader;
use std::cmp::min;
@ -169,18 +194,21 @@ impl Codec {
headers.push(header.into());
*bytes_left = bytes_left.saturating_sub(bytes_read);
*items_left -= 1;
if headers.len() == HEADER_BATCH_SIZE || *items_left == 0 {
let remaining = *items_left as u64;
if headers.len() == HEADER_BATCH_SIZE || remaining == 0 {
let mut h = Vec::with_capacity(min(HEADER_BATCH_SIZE, *items_left));
mem::swap(headers, &mut h);
if *items_left == 0 {
if remaining == 0 {
let bytes_left = *bytes_left;
self.state = None;
if bytes_left > 0 {
return Err(Error::BadMessage);
}
}
return Ok(Message::Headers(h));
return Ok(Message::Headers(HeadersData {
headers: h,
remaining,
}));
}
}
Attachment(left, meta, now) => {

View file

@ -249,6 +249,15 @@ where
// increase the appropriate counter
match &next {
Ok(Message::Attachment(_, _)) => reader_tracker.inc_quiet_received(bytes_read),
Ok(Message::Headers(data)) => {
// We process a full 512 headers locally in smaller 32 header batches.
// We only want to increment the msg count once for the full 512 headers.
if data.remaining == 0 {
reader_tracker.inc_received(bytes_read);
} else {
reader_tracker.inc_quiet_received(bytes_read);
}
}
_ => reader_tracker.inc_received(bytes_read),
}

View file

@ -865,7 +865,7 @@ pub enum Message {
CompactBlock(UntrustedCompactBlock),
GetHeaders(Locator),
Header(UntrustedBlockHeader),
Headers(Vec<BlockHeader>),
Headers(HeadersData),
GetPeerAddrs(GetPeerAddrs),
PeerAddrs(PeerAddrs),
TxHashSetRequest(TxHashSetRequest),
@ -881,6 +881,17 @@ pub enum Message {
KernelSegment(SegmentResponse<TxKernel>),
}
/// We receive 512 headers from a peer.
/// But we process them in smaller batches of 32 headers.
/// HeadersData wraps the current batch and a count of the headers remaining after this batch.
pub struct HeadersData {
/// Batch of headers currently being processed.
pub headers: Vec<BlockHeader>,
/// Number of headers stil to be processed after this current batch.
/// 0 indicates this is the final batch from the larger set of headers received from the peer.
pub remaining: u64,
}
impl fmt::Display for Message {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {

View file

@ -199,8 +199,8 @@ impl MessageHandler for Protocol {
Consumed::None
}
Message::Headers(headers) => {
adapter.headers_received(&headers, &self.peer_info)?;
Message::Headers(data) => {
adapter.headers_received(&data.headers, &self.peer_info)?;
Consumed::None
}