Added limited and counting readers for protocol to maintain its stats. Added primitive error count. Cleaned up protocol.

This commit is contained in:
Ignotus Peverell 2016-11-02 14:19:40 -07:00
parent edc6c62577
commit 098d17ee42
No known key found for this signature in database
GPG key ID: 99CD25F39F8F8211
4 changed files with 101 additions and 34 deletions

View file

@ -19,7 +19,6 @@
#![deny(non_camel_case_types)] #![deny(non_camel_case_types)]
#![deny(non_snake_case)] #![deny(non_snake_case)]
#![deny(unused_mut)] #![deny(unused_mut)]
#![warn(missing_docs)]
#[macro_use] #[macro_use]
extern crate bitflags; extern crate bitflags;
@ -38,6 +37,7 @@ extern crate num;
mod types; mod types;
mod msg; mod msg;
mod handshake; mod handshake;
mod rw;
mod protocol; mod protocol;
mod server; mod server;
mod peer; mod peer;

View file

@ -18,28 +18,43 @@ use std::ops::{Deref, DerefMut};
use std::sync::Mutex; use std::sync::Mutex;
use mioco; use mioco;
use mioco::sync::mpsc::{sync_channel, SyncSender}; use mioco::sync::mpsc::{sync_channel, SyncSender, Receiver};
use mioco::tcp::{TcpStream, Shutdown}; use mioco::tcp::{TcpStream, Shutdown};
use core::core; use core::core;
use core::ser; use core::ser;
use msg::*; use msg::*;
use rw;
use types::*; use types::*;
/// In normal peer operation we don't want to be sent more than 100Mb in a
/// single message.
const MAX_DATA_BYTES: usize = 100 * 1000 * 1000;
/// Number of errors before we disconnect from a peer.
const MAX_ERRORS: u64 = 5;
/// First version of our communication protocol. Manages the underlying /// First version of our communication protocol. Manages the underlying
/// connection, listening to incoming messages and transmitting outgoing ones. /// connection, listening to incoming messages and transmitting outgoing ones.
pub struct ProtocolV1 { pub struct ProtocolV1 {
// The underlying tcp connection. // The underlying tcp connection.
conn: RefCell<TcpStream>, conn: RefCell<TcpStream>,
// Send channel for the rest of the local system to send messages to the peer we're connected to. // Send channel for the rest of the local system to send messages to the peer we're connected to.
msg_send: RefCell<Option<SyncSender<Vec<u8>>>>, msg_send: RefCell<Option<SyncSender<Vec<u8>>>>,
// Stop channel to exit the send/listen loop. // Stop channel to exit the send/listen loop.
stop_send: RefCell<Option<SyncSender<u8>>>, stop_send: RefCell<Option<SyncSender<u8>>>,
// Used both to count the amount of data sent and lock writing to the conn. We can't wrap conn with // Used both to count the amount of data sent and lock writing to the conn. We can't wrap conn with
// the lock as we're always listening to receive. // the lock as we're always listening to receive.
sent_bytes: Mutex<u64>, sent_bytes: Mutex<u64>,
// Bytes we've received. // Bytes we've received.
received_bytes: Mutex<u64>, received_bytes: Mutex<u64>,
// Counter for read errors.
error_count: Mutex<u64>,
} }
impl ProtocolV1 { impl ProtocolV1 {
@ -51,6 +66,7 @@ impl ProtocolV1 {
stop_send: RefCell::new(None), stop_send: RefCell::new(None),
sent_bytes: Mutex::new(0), sent_bytes: Mutex::new(0),
received_bytes: Mutex::new(0), received_bytes: Mutex::new(0),
error_count: Mutex::new(0),
} }
} }
} }
@ -61,42 +77,23 @@ impl Protocol for ProtocolV1 {
/// to send. Must be called before any interaction with a protocol instance /// to send. Must be called before any interaction with a protocol instance
/// and should only be called once. Will block so also needs to be called /// and should only be called once. Will block so also needs to be called
/// within a coroutine. /// within a coroutine.
fn handle(&self, server: &NetAdapter) -> Result<(), ser::Error> { fn handle(&self, adapter: &NetAdapter) -> Result<(), ser::Error> {
// setup channels so we can switch between reads, writes and close // setup channels so we can switch between reads, writes and close
let (msg_send, msg_recv) = sync_channel(10); let (msg_recv, stop_recv) = self.setup_channels();
let (stop_send, stop_recv) = sync_channel(1);
{
let mut msg_mut = self.msg_send.borrow_mut();
*msg_mut = Some(msg_send);
let mut stop_mut = self.stop_send.borrow_mut();
*stop_mut = Some(stop_send);
}
let mut conn = self.conn.borrow_mut(); let mut conn = self.conn.borrow_mut();
loop { loop {
// main select loop, switches between listening, sending or stopping // main select loop, switches between listening, sending or stopping
select!( select!(
r:conn => { r:conn => {
// deser the header ot get the message type let res = self.read_msg(&mut conn, adapter);
let header = try!(ser::deserialize::<MsgHeader>(conn.deref_mut())); if let Err(_) = res {
if !header.acceptable() { let mut cnt = self.error_count.lock().unwrap();
continue; *cnt += 1;
if *cnt > MAX_ERRORS {
return res.map(|_| ());
} }
let recv = header.serialized_len();
// check the message and hopefully do what's expected
match header.msg_type {
Type::Ping => {
// respond with pong
let data = try!(ser::ser_vec(&MsgHeader::new(Type::Pong)));
let mut sent_bytes = self.sent_bytes.lock().unwrap();
*sent_bytes += data.len() as u64;
try!(conn.deref_mut().write_all(&data[..]).map_err(&ser::Error::IOErr));
},
Type::Pong => {},
_ => error!("uncaught unknown"),
} }
let mut received = self.received_bytes.lock().unwrap();
*received += recv;
}, },
r:msg_recv => { r:msg_recv => {
// relay a message originated from the rest of the local system // relay a message originated from the rest of the local system
@ -124,20 +121,24 @@ impl Protocol for ProtocolV1 {
Ok(()) Ok(())
} }
/// Serializes and sends a block to our remote peer
fn send_block(&self, b: &core::Block) -> Result<(), ser::Error> { fn send_block(&self, b: &core::Block) -> Result<(), ser::Error> {
self.send_msg(Type::Block, b) self.send_msg(Type::Block, b)
} }
/// Serializes and sends a transaction to our remote peer
fn send_transaction(&self, tx: &core::Transaction) -> Result<(), ser::Error> { fn send_transaction(&self, tx: &core::Transaction) -> Result<(), ser::Error> {
self.send_msg(Type::Transaction, tx) self.send_msg(Type::Transaction, tx)
} }
/// Bytes sent and received by this peer to the remote peer.
fn transmitted_bytes(&self) -> (u64, u64) { fn transmitted_bytes(&self) -> (u64, u64) {
let sent = *self.sent_bytes.lock().unwrap().deref(); let sent = *self.sent_bytes.lock().unwrap().deref();
let received = *self.received_bytes.lock().unwrap().deref(); let received = *self.received_bytes.lock().unwrap().deref();
(sent, received) (sent, received)
} }
/// Close the connection to the remote peer
fn close(&self) { fn close(&self) {
let stop_send = self.stop_send.borrow(); let stop_send = self.stop_send.borrow();
stop_send.as_ref().unwrap().send(0); stop_send.as_ref().unwrap().send(0);
@ -145,8 +146,45 @@ impl Protocol for ProtocolV1 {
} }
impl ProtocolV1 { impl ProtocolV1 {
fn read_msg(&self, mut conn: &mut TcpStream, adapter: &NetAdapter) -> Result<(), ser::Error> {
// deser the header to get the message type
let header = try!(ser::deserialize::<MsgHeader>(conn.deref_mut()));
if !header.acceptable() {
return Err(ser::Error::CorruptedData);
}
// wrap our connection with limited byte-counting readers
let mut limit_conn = rw::LimitedRead::new(conn.deref_mut(), MAX_DATA_BYTES);
let mut read_conn = rw::CountingRead::new(&mut limit_conn);
// check the message type and hopefully do what's expected with it
match header.msg_type {
Type::Ping => {
// respond with pong
try!(self.send_pong());
},
Type::Pong => {},
Type::Transaction => {
let tx = try!(ser::deserialize(&mut read_conn));
adapter.transaction_received(tx);
},
Type::Block => {
let b = try!(ser::deserialize(&mut read_conn));
adapter.block_received(b);
}
_ => error!("uncaught unknown"),
}
// update total of bytes sent
let mut sent_bytes = self.sent_bytes.lock().unwrap();
*sent_bytes += header.serialized_len() + (read_conn.bytes_read() as u64);
Ok(())
}
/// Helper function to avoid boilerplate, builds a header followed by the /// Helper function to avoid boilerplate, builds a header followed by the
/// Writeable body and send the whole thing. /// Writeable body and send the whole thing.
// TODO serialize straight to the connection
fn send_msg(&self, t: Type, body: &ser::Writeable) -> Result<(), ser::Error> { fn send_msg(&self, t: Type, body: &ser::Writeable) -> Result<(), ser::Error> {
let mut data = Vec::new(); let mut data = Vec::new();
try!(ser::serialize(&mut data, &MsgHeader::new(t))); try!(ser::serialize(&mut data, &MsgHeader::new(t)));
@ -155,4 +193,24 @@ impl ProtocolV1 {
msg_send.as_ref().unwrap().send(data); msg_send.as_ref().unwrap().send(data);
Ok(()) Ok(())
} }
fn send_pong(&self) -> Result<(), ser::Error> {
let data = try!(ser::ser_vec(&MsgHeader::new(Type::Pong)));
let msg_send = self.msg_send.borrow();
msg_send.as_ref().unwrap().send(data);
Ok(())
}
/// Setup internal communication channels to select over
fn setup_channels(&self) -> (Receiver<Vec<u8>>, Receiver<u8>) {
let (msg_send, msg_recv) = sync_channel(10);
let (stop_send, stop_recv) = sync_channel(1);
{
let mut msg_mut = self.msg_send.borrow_mut();
*msg_mut = Some(msg_send);
let mut stop_mut = self.stop_send.borrow_mut();
*stop_mut = Some(stop_send);
}
(msg_recv, stop_recv)
}
} }

View file

@ -42,7 +42,10 @@ fn listen_addr() -> SocketAddr {
} }
pub struct DummyAdapter {} pub struct DummyAdapter {}
impl NetAdapter for DummyAdapter {} impl NetAdapter for DummyAdapter {
fn transaction_received(&self, tx: core::Transaction) {}
fn block_received(&self, b: core::Block) {}
}
/// P2P server implementation, handling bootstrapping to find and connect to /// P2P server implementation, handling bootstrapping to find and connect to
/// peers, receiving connections from other peers and keep track of all of them. /// peers, receiving connections from other peers and keep track of all of them.

View file

@ -63,4 +63,10 @@ pub trait Protocol {
/// Bridge between the networking layer and the rest of the system. Handles the /// Bridge between the networking layer and the rest of the system. Handles the
/// forwarding or querying of blocks and transactions among other things. /// forwarding or querying of blocks and transactions among other things.
pub trait NetAdapter {} pub trait NetAdapter {
/// A transaction has been received from one of our peers
fn transaction_received(&self, tx: core::Transaction);
/// A block has been received from one of our peers
fn block_received(&self, b: core::Block);
}