From 3ffc2f5d8c9fc5c47fae13491e03c7029010c1bf Mon Sep 17 00:00:00 2001 From: Ignotus Peverell Date: Mon, 31 Oct 2016 12:29:08 -0700 Subject: [PATCH] Tests improvement. Protocol also measures bytes received (at least for pings). --- p2p/src/msg.rs | 3 +++ p2p/src/peer.rs | 4 ++-- p2p/src/protocol.rs | 47 +++++++++++++++++++++++---------------- p2p/src/server.rs | 9 ++++++-- p2p/src/types.rs | 4 ++-- p2p/tests/network_conn.rs | 22 +++++++++++++----- 6 files changed, 58 insertions(+), 31 deletions(-) diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index c8ad81e56..f247eddcc 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -66,6 +66,9 @@ impl MsgHeader { pub fn acceptable(&self) -> bool { Type::from_u8(self.msg_type as u8).is_some() } + + /// Serialized length of the header in bytes + pub fn serialized_len(&self) -> u64 { 3 } } impl Writeable for MsgHeader { diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 69440d4d8..bd9f97349 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -51,8 +51,8 @@ impl Peer { self.proto.send_ping() } - pub fn sent_bytes(&self) -> u64 { - self.proto.sent_bytes() + pub fn transmitted_bytes(&self) -> (u64, u64) { + self.proto.transmitted_bytes() } pub fn stop(&self) { diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index 8f5d9652c..41651c433 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -39,6 +39,21 @@ pub struct ProtocolV1 { // Used both to count the amount of data sent and lock writing to the conn. We can't wrap conn with // the lock as we're always listening to receive. sent_bytes: Mutex, + // Bytes we've received. + received_bytes: Mutex, +} + +impl ProtocolV1 { + /// Creates a new protocol v1 + pub fn new(conn: TcpStream) -> ProtocolV1 { + ProtocolV1 { + conn: RefCell::new(conn), + msg_send: RefCell::new(None), + stop_send: RefCell::new(None), + sent_bytes: Mutex::new(0), + received_bytes: Mutex::new(0), + } + } } impl Protocol for ProtocolV1 { @@ -60,18 +75,19 @@ impl Protocol for ProtocolV1 { let mut conn = self.conn.borrow_mut(); loop { - // main select loop, switches between listening, sending or stopping + // main select loop, switches between listening, sending or stopping select!( r:conn => { - // deser the header ot get the message type + // deser the header ot get the message type let header = try_to_o!(ser::deserialize::(conn.deref_mut())); if !header.acceptable() { continue; } - // check the message and hopefully do what's expected + let recv = header.serialized_len(); + // check the message and hopefully do what's expected match header.msg_type { Type::Ping => { - // respond with pong + // respond with pong let data = try_to_o!(ser::ser_vec(&MsgHeader::new(Type::Pong))); let mut sent_bytes = self.sent_bytes.lock().unwrap(); *sent_bytes += data.len() as u64; @@ -80,16 +96,18 @@ impl Protocol for ProtocolV1 { Type::Pong => {}, _ => error!("uncaught unknown"), } + let mut received = self.received_bytes.lock().unwrap(); + *received += recv; }, r:msg_recv => { - // relay a message originated from the rest of the local system + // relay a message originated from the rest of the local system let data = &msg_recv.recv().unwrap()[..]; let mut sent_bytes = self.sent_bytes.lock().unwrap(); *sent_bytes += data.len() as u64; try_to_o!(conn.deref_mut().write_all(data).map_err(&ser::Error::IOErr)); }, r:stop_recv => { - // shuts the connection don and end the loop + // shuts the connection don and end the loop stop_recv.recv(); conn.shutdown(Shutdown::Both); return None; @@ -107,8 +125,10 @@ impl Protocol for ProtocolV1 { None } - fn sent_bytes(&self) -> u64 { - *self.sent_bytes.lock().unwrap().deref() + fn transmitted_bytes(&self) -> (u64, u64) { + let sent = *self.sent_bytes.lock().unwrap().deref(); + let received = *self.received_bytes.lock().unwrap().deref(); + (sent, received) } fn close(&self) { @@ -116,14 +136,3 @@ impl Protocol for ProtocolV1 { stop_send.as_ref().unwrap().send(0); } } - -impl ProtocolV1 { - pub fn new(conn: TcpStream) -> ProtocolV1 { - ProtocolV1 { - conn: RefCell::new(conn), - msg_send: RefCell::new(None), - stop_send: RefCell::new(None), - sent_bytes: Mutex::new(0), - } - } -} diff --git a/p2p/src/server.rs b/p2p/src/server.rs index 65458e0cb..3bd93286e 100644 --- a/p2p/src/server.rs +++ b/p2p/src/server.rs @@ -53,7 +53,7 @@ unsafe impl Send for Server {} // TODO TLS impl Server { - /// Creates a new idle p2p server with no peers + /// Creates a new idle p2p server with no peers pub fn new() -> Server { Server { peers: RwLock::new(Vec::new()), @@ -85,6 +85,7 @@ impl Server { { let mut peers = self.peers.write().unwrap(); peers.push(wpeer.clone()); + println!("len {}", peers.len()) } mioco::spawn(move || -> io::Result<()> { @@ -102,7 +103,7 @@ impl Server { } } - /// Stops the server. Disconnect from all peers at the same time. + /// Stops the server. Disconnect from all peers at the same time. pub fn stop(&self) { let peers = self.peers.write().unwrap(); for p in peers.deref() { @@ -118,4 +119,8 @@ impl Server { let tcp_client = TcpStream::connect(&addr).unwrap(); Peer::accept(tcp_client, &Handshake::new()) } + + pub fn peers_count(&self) -> u32 { + self.peers.read().unwrap().len() as u32 + } } diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 6c350a00c..0ba8522b1 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -47,8 +47,8 @@ pub trait Protocol { /// Sends a ping message to the remote peer. fn send_ping(&self) -> Option; - /// How many bytes have been sent to the remote peer. - fn sent_bytes(&self) -> u64; + /// How many bytes have been sent/received to/from the remote peer. + fn transmitted_bytes(&self) -> (u64, u64); /// Close the connection to the remote peer. fn close(&self); diff --git a/p2p/tests/network_conn.rs b/p2p/tests/network_conn.rs index 17929763f..5006aaaa4 100644 --- a/p2p/tests/network_conn.rs +++ b/p2p/tests/network_conn.rs @@ -25,28 +25,38 @@ fn peer_handshake() { env_logger::init().unwrap(); mioco::start(|| -> io::Result<()> { + // start a server in its own coroutine let server = Arc::new(p2p::Server::new()); let in_server = server.clone(); mioco::spawn(move || -> io::Result<()> { - try!(in_server.start()); + try!(in_server.start().map_err(|_| io::Error::last_os_error())); Ok(()) }); - // given server a little time to start - mioco::sleep(time::Duration::from_millis(200)); + // giving server a little time to start + mioco::sleep(time::Duration::from_millis(50)); + // connect a client peer to the server let addr = p2p::DEFAULT_LISTEN_ADDR.parse().unwrap(); let peer = try!(p2p::Server::connect_as_client(addr).map_err(|_| io::Error::last_os_error())); + mioco::sleep(time::Duration::from_millis(50)); + assert_eq!(server.peers_count(), 1); + + // spawn our client peer to its own coroutine so it can poll for replies let peer = Arc::new(peer); let in_peer = peer.clone(); mioco::spawn(move || -> io::Result<()> { in_peer.run(&p2p::DummyAdapter{}); Ok(()) }); - mioco::sleep(time::Duration::from_millis(100)); + mioco::sleep(time::Duration::from_millis(50)); + + // send a ping and check we got ponged peer.send_ping(); - mioco::sleep(time::Duration::from_millis(100)); - assert!(peer.sent_bytes() > 0); + mioco::sleep(time::Duration::from_millis(50)); + let (sent, recv) = peer.transmitted_bytes(); + assert!(sent > 0); + assert!(recv > 0); server.stop(); Ok(())