diff --git a/p2p/src/conn.rs b/p2p/src/conn.rs index fa36aec44..69a143451 100644 --- a/p2p/src/conn.rs +++ b/p2p/src/conn.rs @@ -35,7 +35,11 @@ use util::{RateCounter, RwLock}; /// A trait to be implemented in order to receive messages from the /// connection. Allows providing an optional response. pub trait MessageHandler: Send + 'static { - fn consume<'a>(&self, msg: Message<'a>) -> Result>, Error>; + fn consume<'a>( + &self, + msg: Message<'a>, + received_bytes: Arc>, + ) -> Result>, Error>; } // Macro to simplify the boilerplate around async I/O error handling, @@ -119,17 +123,27 @@ pub struct Response<'a> { } impl<'a> Response<'a> { - fn write(mut self) -> Result<(), Error> { + fn write(mut self, sent_bytes: Arc>) -> Result<(), Error> { let mut msg = ser::ser_vec(&MsgHeader::new(self.resp_type, self.body.len() as u64)).unwrap(); msg.append(&mut self.body); write_all(&mut self.conn, &msg[..], time::Duration::from_secs(10))?; + // Increase sent bytes counter + { + let mut sent_bytes = sent_bytes.write(); + sent_bytes.inc(msg.len() as u64); + } if let Some(mut file) = self.attachment { let mut buf = [0u8; 8000]; loop { match file.read(&mut buf[..]) { Ok(0) => break, - Ok(n) => write_all(&mut self.conn, &buf[..n], time::Duration::from_secs(10))?, + Ok(n) => { + write_all(&mut self.conn, &buf[..n], time::Duration::from_secs(10))?; + // Increase sent bytes counter + let mut sent_bytes = sent_bytes.write(); + sent_bytes.inc(n as u64); + } Err(e) => return Err(From::from(e)), } } @@ -187,6 +201,8 @@ where // Counter of number of bytes received let received_bytes = Arc::new(RwLock::new(RateCounter::new())); + // Counter of number of bytes sent + let sent_bytes = Arc::new(RwLock::new(RateCounter::new())); stream .set_nonblocking(true) @@ -198,10 +214,11 @@ where error_tx, close_rx, received_bytes.clone(), + sent_bytes.clone(), ); Tracker { - sent_bytes: Arc::new(RwLock::new(RateCounter::new())), + sent_bytes: sent_bytes.clone(), received_bytes: received_bytes.clone(), send_channel: send_tx, close_channel: close_tx, @@ -216,6 +233,7 @@ fn poll( error_tx: mpsc::Sender, close_rx: mpsc::Receiver<()>, received_bytes: Arc>, + sent_bytes: Arc>, ) where H: MessageHandler, { @@ -238,13 +256,14 @@ fn poll( ); // Increase received bytes counter + let received = received_bytes.clone(); { let mut received_bytes = received_bytes.write(); received_bytes.inc(size_of::() as u64 + msg.header.msg_len); } - if let Some(Some(resp)) = try_break!(error_tx, handler.consume(msg)) { - try_break!(error_tx, resp.write()); + if let Some(Some(resp)) = try_break!(error_tx, handler.consume(msg, received)) { + try_break!(error_tx, resp.write(sent_bytes.clone())); } } diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index 8d3095ce7..b94e1f3f0 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -24,6 +24,7 @@ use chrono::prelude::Utc; use conn::{Message, MessageHandler, Response}; use core::core::{self, hash::Hash, CompactBlock}; use core::{global, ser}; +use util::{RateCounter, RwLock}; use msg::{ read_exact, BanReason, GetPeerAddrs, Headers, Locator, PeerAddrs, Ping, Pong, SockAddr, @@ -43,7 +44,11 @@ impl Protocol { } impl MessageHandler for Protocol { - fn consume<'a>(&self, mut msg: Message<'a>) -> Result>, Error> { + fn consume<'a>( + &self, + mut msg: Message<'a>, + received_bytes: Arc>, + ) -> Result>, Error> { let adapter = &self.adapter; // If we received a msg from a banned peer then log and drop it. @@ -286,13 +291,20 @@ impl MessageHandler for Protocol { let mut downloaded_size: usize = 0; let mut request_size = cmp::min(48_000, total_size); while request_size > 0 { - downloaded_size += msg.copy_attachment(request_size, &mut tmp_zip)?; + let size = msg.copy_attachment(request_size, &mut tmp_zip)?; + downloaded_size += size; request_size = cmp::min(48_000, total_size - downloaded_size); self.adapter.txhashset_download_update( download_start_time, downloaded_size as u64, total_size as u64, ); + + // Increase received bytes counter + { + let mut received_bytes = received_bytes.write(); + received_bytes.inc(size as u64); + } } tmp_zip.into_inner().unwrap().sync_all()?; Ok(()) diff --git a/src/bin/tui/peers.rs b/src/bin/tui/peers.rs index 686545927..683c64360 100644 --- a/src/bin/tui/peers.rs +++ b/src/bin/tui/peers.rs @@ -124,8 +124,8 @@ impl TUIStatusListener for TUIPeerView { }).column(PeerColumn::Direction, "Direction", |c| c.width_percent(8)) .column(PeerColumn::TotalDifficulty, "Total Difficulty", |c| { c.width_percent(24) - }).column(PeerColumn::Version, "Version", |c| c.width_percent(8)) - .column(PeerColumn::UserAgent, "User Agent", |c| c.width_percent(16)); + }).column(PeerColumn::Version, "Ver", |c| c.width_percent(6)) + .column(PeerColumn::UserAgent, "User Agent", |c| c.width_percent(18)); let peer_status_view = BoxView::with_full_screen( LinearLayout::new(Orientation::Vertical) .child(