Peer send receive bytes statistics missed response messages and attachment (#1953)

* fix: peer send receive bytes statistics missed response message and attachment

* rustfmt
This commit is contained in:
Gary Yu 2018-11-11 07:30:57 +08:00 committed by GitHub
parent 2352275dff
commit 7ff1ee5fde
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 41 additions and 10 deletions

View file

@ -35,7 +35,11 @@ use util::{RateCounter, RwLock};
/// A trait to be implemented in order to receive messages from the
/// connection. Allows providing an optional response.
pub trait MessageHandler: Send + 'static {
fn consume<'a>(&self, msg: Message<'a>) -> Result<Option<Response<'a>>, Error>;
fn consume<'a>(
&self,
msg: Message<'a>,
received_bytes: Arc<RwLock<RateCounter>>,
) -> Result<Option<Response<'a>>, Error>;
}
// Macro to simplify the boilerplate around async I/O error handling,
@ -119,17 +123,27 @@ pub struct Response<'a> {
}
impl<'a> Response<'a> {
fn write(mut self) -> Result<(), Error> {
fn write(mut self, sent_bytes: Arc<RwLock<RateCounter>>) -> Result<(), Error> {
let mut msg =
ser::ser_vec(&MsgHeader::new(self.resp_type, self.body.len() as u64)).unwrap();
msg.append(&mut self.body);
write_all(&mut self.conn, &msg[..], time::Duration::from_secs(10))?;
// Increase sent bytes counter
{
let mut sent_bytes = sent_bytes.write();
sent_bytes.inc(msg.len() as u64);
}
if let Some(mut file) = self.attachment {
let mut buf = [0u8; 8000];
loop {
match file.read(&mut buf[..]) {
Ok(0) => break,
Ok(n) => write_all(&mut self.conn, &buf[..n], time::Duration::from_secs(10))?,
Ok(n) => {
write_all(&mut self.conn, &buf[..n], time::Duration::from_secs(10))?;
// Increase sent bytes counter
let mut sent_bytes = sent_bytes.write();
sent_bytes.inc(n as u64);
}
Err(e) => return Err(From::from(e)),
}
}
@ -187,6 +201,8 @@ where
// Counter of number of bytes received
let received_bytes = Arc::new(RwLock::new(RateCounter::new()));
// Counter of number of bytes sent
let sent_bytes = Arc::new(RwLock::new(RateCounter::new()));
stream
.set_nonblocking(true)
@ -198,10 +214,11 @@ where
error_tx,
close_rx,
received_bytes.clone(),
sent_bytes.clone(),
);
Tracker {
sent_bytes: Arc::new(RwLock::new(RateCounter::new())),
sent_bytes: sent_bytes.clone(),
received_bytes: received_bytes.clone(),
send_channel: send_tx,
close_channel: close_tx,
@ -216,6 +233,7 @@ fn poll<H>(
error_tx: mpsc::Sender<Error>,
close_rx: mpsc::Receiver<()>,
received_bytes: Arc<RwLock<RateCounter>>,
sent_bytes: Arc<RwLock<RateCounter>>,
) where
H: MessageHandler,
{
@ -238,13 +256,14 @@ fn poll<H>(
);
// Increase received bytes counter
let received = received_bytes.clone();
{
let mut received_bytes = received_bytes.write();
received_bytes.inc(size_of::<MsgHeader>() as u64 + msg.header.msg_len);
}
if let Some(Some(resp)) = try_break!(error_tx, handler.consume(msg)) {
try_break!(error_tx, resp.write());
if let Some(Some(resp)) = try_break!(error_tx, handler.consume(msg, received)) {
try_break!(error_tx, resp.write(sent_bytes.clone()));
}
}

View file

@ -24,6 +24,7 @@ use chrono::prelude::Utc;
use conn::{Message, MessageHandler, Response};
use core::core::{self, hash::Hash, CompactBlock};
use core::{global, ser};
use util::{RateCounter, RwLock};
use msg::{
read_exact, BanReason, GetPeerAddrs, Headers, Locator, PeerAddrs, Ping, Pong, SockAddr,
@ -43,7 +44,11 @@ impl Protocol {
}
impl MessageHandler for Protocol {
fn consume<'a>(&self, mut msg: Message<'a>) -> Result<Option<Response<'a>>, Error> {
fn consume<'a>(
&self,
mut msg: Message<'a>,
received_bytes: Arc<RwLock<RateCounter>>,
) -> Result<Option<Response<'a>>, Error> {
let adapter = &self.adapter;
// If we received a msg from a banned peer then log and drop it.
@ -286,13 +291,20 @@ impl MessageHandler for Protocol {
let mut downloaded_size: usize = 0;
let mut request_size = cmp::min(48_000, total_size);
while request_size > 0 {
downloaded_size += msg.copy_attachment(request_size, &mut tmp_zip)?;
let size = msg.copy_attachment(request_size, &mut tmp_zip)?;
downloaded_size += size;
request_size = cmp::min(48_000, total_size - downloaded_size);
self.adapter.txhashset_download_update(
download_start_time,
downloaded_size as u64,
total_size as u64,
);
// Increase received bytes counter
{
let mut received_bytes = received_bytes.write();
received_bytes.inc(size as u64);
}
}
tmp_zip.into_inner().unwrap().sync_all()?;
Ok(())

View file

@ -124,8 +124,8 @@ impl TUIStatusListener for TUIPeerView {
}).column(PeerColumn::Direction, "Direction", |c| c.width_percent(8))
.column(PeerColumn::TotalDifficulty, "Total Difficulty", |c| {
c.width_percent(24)
}).column(PeerColumn::Version, "Version", |c| c.width_percent(8))
.column(PeerColumn::UserAgent, "User Agent", |c| c.width_percent(16));
}).column(PeerColumn::Version, "Ver", |c| c.width_percent(6))
.column(PeerColumn::UserAgent, "User Agent", |c| c.width_percent(18));
let peer_status_view = BoxView::with_full_screen(
LinearLayout::new(Orientation::Vertical)
.child(