Tests improvement. Protocol also measures bytes received (at least for pings).

This commit is contained in:
Ignotus Peverell 2016-10-31 12:29:08 -07:00
parent d1bb114b18
commit 3ffc2f5d8c
No known key found for this signature in database
GPG key ID: 99CD25F39F8F8211
6 changed files with 58 additions and 31 deletions

View file

@ -66,6 +66,9 @@ impl MsgHeader {
pub fn acceptable(&self) -> bool { pub fn acceptable(&self) -> bool {
Type::from_u8(self.msg_type as u8).is_some() Type::from_u8(self.msg_type as u8).is_some()
} }
/// Serialized length of the header in bytes
pub fn serialized_len(&self) -> u64 { 3 }
} }
impl Writeable for MsgHeader { impl Writeable for MsgHeader {

View file

@ -51,8 +51,8 @@ impl Peer {
self.proto.send_ping() self.proto.send_ping()
} }
pub fn sent_bytes(&self) -> u64 { pub fn transmitted_bytes(&self) -> (u64, u64) {
self.proto.sent_bytes() self.proto.transmitted_bytes()
} }
pub fn stop(&self) { pub fn stop(&self) {

View file

@ -39,6 +39,21 @@ pub struct ProtocolV1 {
// Used both to count the amount of data sent and lock writing to the conn. We can't wrap conn with // Used both to count the amount of data sent and lock writing to the conn. We can't wrap conn with
// the lock as we're always listening to receive. // the lock as we're always listening to receive.
sent_bytes: Mutex<u64>, sent_bytes: Mutex<u64>,
// Bytes we've received.
received_bytes: Mutex<u64>,
}
impl ProtocolV1 {
/// Creates a new protocol v1
pub fn new(conn: TcpStream) -> ProtocolV1 {
ProtocolV1 {
conn: RefCell::new(conn),
msg_send: RefCell::new(None),
stop_send: RefCell::new(None),
sent_bytes: Mutex::new(0),
received_bytes: Mutex::new(0),
}
}
} }
impl Protocol for ProtocolV1 { impl Protocol for ProtocolV1 {
@ -60,18 +75,19 @@ impl Protocol for ProtocolV1 {
let mut conn = self.conn.borrow_mut(); let mut conn = self.conn.borrow_mut();
loop { loop {
// main select loop, switches between listening, sending or stopping // main select loop, switches between listening, sending or stopping
select!( select!(
r:conn => { r:conn => {
// deser the header ot get the message type // deser the header ot get the message type
let header = try_to_o!(ser::deserialize::<MsgHeader>(conn.deref_mut())); let header = try_to_o!(ser::deserialize::<MsgHeader>(conn.deref_mut()));
if !header.acceptable() { if !header.acceptable() {
continue; continue;
} }
// check the message and hopefully do what's expected let recv = header.serialized_len();
// check the message and hopefully do what's expected
match header.msg_type { match header.msg_type {
Type::Ping => { Type::Ping => {
// respond with pong // respond with pong
let data = try_to_o!(ser::ser_vec(&MsgHeader::new(Type::Pong))); let data = try_to_o!(ser::ser_vec(&MsgHeader::new(Type::Pong)));
let mut sent_bytes = self.sent_bytes.lock().unwrap(); let mut sent_bytes = self.sent_bytes.lock().unwrap();
*sent_bytes += data.len() as u64; *sent_bytes += data.len() as u64;
@ -80,16 +96,18 @@ impl Protocol for ProtocolV1 {
Type::Pong => {}, Type::Pong => {},
_ => error!("uncaught unknown"), _ => error!("uncaught unknown"),
} }
let mut received = self.received_bytes.lock().unwrap();
*received += recv;
}, },
r:msg_recv => { r:msg_recv => {
// relay a message originated from the rest of the local system // relay a message originated from the rest of the local system
let data = &msg_recv.recv().unwrap()[..]; let data = &msg_recv.recv().unwrap()[..];
let mut sent_bytes = self.sent_bytes.lock().unwrap(); let mut sent_bytes = self.sent_bytes.lock().unwrap();
*sent_bytes += data.len() as u64; *sent_bytes += data.len() as u64;
try_to_o!(conn.deref_mut().write_all(data).map_err(&ser::Error::IOErr)); try_to_o!(conn.deref_mut().write_all(data).map_err(&ser::Error::IOErr));
}, },
r:stop_recv => { r:stop_recv => {
// shuts the connection don and end the loop // shuts the connection don and end the loop
stop_recv.recv(); stop_recv.recv();
conn.shutdown(Shutdown::Both); conn.shutdown(Shutdown::Both);
return None; return None;
@ -107,8 +125,10 @@ impl Protocol for ProtocolV1 {
None None
} }
fn sent_bytes(&self) -> u64 { fn transmitted_bytes(&self) -> (u64, u64) {
*self.sent_bytes.lock().unwrap().deref() let sent = *self.sent_bytes.lock().unwrap().deref();
let received = *self.received_bytes.lock().unwrap().deref();
(sent, received)
} }
fn close(&self) { fn close(&self) {
@ -116,14 +136,3 @@ impl Protocol for ProtocolV1 {
stop_send.as_ref().unwrap().send(0); stop_send.as_ref().unwrap().send(0);
} }
} }
impl ProtocolV1 {
pub fn new(conn: TcpStream) -> ProtocolV1 {
ProtocolV1 {
conn: RefCell::new(conn),
msg_send: RefCell::new(None),
stop_send: RefCell::new(None),
sent_bytes: Mutex::new(0),
}
}
}

View file

@ -53,7 +53,7 @@ unsafe impl Send for Server {}
// TODO TLS // TODO TLS
impl Server { impl Server {
/// Creates a new idle p2p server with no peers /// Creates a new idle p2p server with no peers
pub fn new() -> Server { pub fn new() -> Server {
Server { Server {
peers: RwLock::new(Vec::new()), peers: RwLock::new(Vec::new()),
@ -85,6 +85,7 @@ impl Server {
{ {
let mut peers = self.peers.write().unwrap(); let mut peers = self.peers.write().unwrap();
peers.push(wpeer.clone()); peers.push(wpeer.clone());
println!("len {}", peers.len())
} }
mioco::spawn(move || -> io::Result<()> { mioco::spawn(move || -> io::Result<()> {
@ -102,7 +103,7 @@ impl Server {
} }
} }
/// Stops the server. Disconnect from all peers at the same time. /// Stops the server. Disconnect from all peers at the same time.
pub fn stop(&self) { pub fn stop(&self) {
let peers = self.peers.write().unwrap(); let peers = self.peers.write().unwrap();
for p in peers.deref() { for p in peers.deref() {
@ -118,4 +119,8 @@ impl Server {
let tcp_client = TcpStream::connect(&addr).unwrap(); let tcp_client = TcpStream::connect(&addr).unwrap();
Peer::accept(tcp_client, &Handshake::new()) Peer::accept(tcp_client, &Handshake::new())
} }
pub fn peers_count(&self) -> u32 {
self.peers.read().unwrap().len() as u32
}
} }

View file

@ -47,8 +47,8 @@ pub trait Protocol {
/// Sends a ping message to the remote peer. /// Sends a ping message to the remote peer.
fn send_ping(&self) -> Option<Error>; fn send_ping(&self) -> Option<Error>;
/// How many bytes have been sent to the remote peer. /// How many bytes have been sent/received to/from the remote peer.
fn sent_bytes(&self) -> u64; fn transmitted_bytes(&self) -> (u64, u64);
/// Close the connection to the remote peer. /// Close the connection to the remote peer.
fn close(&self); fn close(&self);

View file

@ -25,28 +25,38 @@ fn peer_handshake() {
env_logger::init().unwrap(); env_logger::init().unwrap();
mioco::start(|| -> io::Result<()> { mioco::start(|| -> io::Result<()> {
// start a server in its own coroutine
let server = Arc::new(p2p::Server::new()); let server = Arc::new(p2p::Server::new());
let in_server = server.clone(); let in_server = server.clone();
mioco::spawn(move || -> io::Result<()> { mioco::spawn(move || -> io::Result<()> {
try!(in_server.start()); try!(in_server.start().map_err(|_| io::Error::last_os_error()));
Ok(()) Ok(())
}); });
// given server a little time to start // giving server a little time to start
mioco::sleep(time::Duration::from_millis(200)); mioco::sleep(time::Duration::from_millis(50));
// connect a client peer to the server
let addr = p2p::DEFAULT_LISTEN_ADDR.parse().unwrap(); let addr = p2p::DEFAULT_LISTEN_ADDR.parse().unwrap();
let peer = try!(p2p::Server::connect_as_client(addr).map_err(|_| io::Error::last_os_error())); let peer = try!(p2p::Server::connect_as_client(addr).map_err(|_| io::Error::last_os_error()));
mioco::sleep(time::Duration::from_millis(50));
assert_eq!(server.peers_count(), 1);
// spawn our client peer to its own coroutine so it can poll for replies
let peer = Arc::new(peer); let peer = Arc::new(peer);
let in_peer = peer.clone(); let in_peer = peer.clone();
mioco::spawn(move || -> io::Result<()> { mioco::spawn(move || -> io::Result<()> {
in_peer.run(&p2p::DummyAdapter{}); in_peer.run(&p2p::DummyAdapter{});
Ok(()) Ok(())
}); });
mioco::sleep(time::Duration::from_millis(100)); mioco::sleep(time::Duration::from_millis(50));
// send a ping and check we got ponged
peer.send_ping(); peer.send_ping();
mioco::sleep(time::Duration::from_millis(100)); mioco::sleep(time::Duration::from_millis(50));
assert!(peer.sent_bytes() > 0); let (sent, recv) = peer.transmitted_bytes();
assert!(sent > 0);
assert!(recv > 0);
server.stop(); server.stop();
Ok(()) Ok(())