diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index 924685207..7277c59cf 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -19,7 +19,6 @@ #![deny(non_camel_case_types)] #![deny(non_snake_case)] #![deny(unused_mut)] -#![warn(missing_docs)] #[macro_use] extern crate bitflags; @@ -38,6 +37,7 @@ extern crate num; mod types; mod msg; mod handshake; +mod rw; mod protocol; mod server; mod peer; diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index fb6d803f9..95d73934d 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -18,28 +18,43 @@ use std::ops::{Deref, DerefMut}; use std::sync::Mutex; use mioco; -use mioco::sync::mpsc::{sync_channel, SyncSender}; +use mioco::sync::mpsc::{sync_channel, SyncSender, Receiver}; use mioco::tcp::{TcpStream, Shutdown}; use core::core; use core::ser; use msg::*; +use rw; use types::*; +/// In normal peer operation we don't want to be sent more than 100Mb in a +/// single message. +const MAX_DATA_BYTES: usize = 100 * 1000 * 1000; + +/// Number of errors before we disconnect from a peer. +const MAX_ERRORS: u64 = 5; + /// First version of our communication protocol. Manages the underlying /// connection, listening to incoming messages and transmitting outgoing ones. pub struct ProtocolV1 { // The underlying tcp connection. conn: RefCell, + // Send channel for the rest of the local system to send messages to the peer we're connected to. msg_send: RefCell>>>, + // Stop channel to exit the send/listen loop. stop_send: RefCell>>, + // Used both to count the amount of data sent and lock writing to the conn. We can't wrap conn with // the lock as we're always listening to receive. sent_bytes: Mutex, + // Bytes we've received. received_bytes: Mutex, + + // Counter for read errors. + error_count: Mutex, } impl ProtocolV1 { @@ -51,6 +66,7 @@ impl ProtocolV1 { stop_send: RefCell::new(None), sent_bytes: Mutex::new(0), received_bytes: Mutex::new(0), + error_count: Mutex::new(0), } } } @@ -61,42 +77,23 @@ impl Protocol for ProtocolV1 { /// to send. Must be called before any interaction with a protocol instance /// and should only be called once. Will block so also needs to be called /// within a coroutine. - fn handle(&self, server: &NetAdapter) -> Result<(), ser::Error> { + fn handle(&self, adapter: &NetAdapter) -> Result<(), ser::Error> { // setup channels so we can switch between reads, writes and close - let (msg_send, msg_recv) = sync_channel(10); - let (stop_send, stop_recv) = sync_channel(1); - { - let mut msg_mut = self.msg_send.borrow_mut(); - *msg_mut = Some(msg_send); - let mut stop_mut = self.stop_send.borrow_mut(); - *stop_mut = Some(stop_send); - } + let (msg_recv, stop_recv) = self.setup_channels(); let mut conn = self.conn.borrow_mut(); loop { // main select loop, switches between listening, sending or stopping select!( r:conn => { - // deser the header ot get the message type - let header = try!(ser::deserialize::(conn.deref_mut())); - if !header.acceptable() { - continue; + let res = self.read_msg(&mut conn, adapter); + if let Err(_) = res { + let mut cnt = self.error_count.lock().unwrap(); + *cnt += 1; + if *cnt > MAX_ERRORS { + return res.map(|_| ()); + } } - let recv = header.serialized_len(); - // check the message and hopefully do what's expected - match header.msg_type { - Type::Ping => { - // respond with pong - let data = try!(ser::ser_vec(&MsgHeader::new(Type::Pong))); - let mut sent_bytes = self.sent_bytes.lock().unwrap(); - *sent_bytes += data.len() as u64; - try!(conn.deref_mut().write_all(&data[..]).map_err(&ser::Error::IOErr)); - }, - Type::Pong => {}, - _ => error!("uncaught unknown"), - } - let mut received = self.received_bytes.lock().unwrap(); - *received += recv; }, r:msg_recv => { // relay a message originated from the rest of the local system @@ -124,20 +121,24 @@ impl Protocol for ProtocolV1 { Ok(()) } + /// Serializes and sends a block to our remote peer fn send_block(&self, b: &core::Block) -> Result<(), ser::Error> { self.send_msg(Type::Block, b) } + /// Serializes and sends a transaction to our remote peer fn send_transaction(&self, tx: &core::Transaction) -> Result<(), ser::Error> { self.send_msg(Type::Transaction, tx) } + /// Bytes sent and received by this peer to the remote peer. fn transmitted_bytes(&self) -> (u64, u64) { - let sent = *self.sent_bytes.lock().unwrap().deref(); - let received = *self.received_bytes.lock().unwrap().deref(); + let sent = *self.sent_bytes.lock().unwrap().deref(); + let received = *self.received_bytes.lock().unwrap().deref(); (sent, received) } + /// Close the connection to the remote peer fn close(&self) { let stop_send = self.stop_send.borrow(); stop_send.as_ref().unwrap().send(0); @@ -145,8 +146,45 @@ impl Protocol for ProtocolV1 { } impl ProtocolV1 { + fn read_msg(&self, mut conn: &mut TcpStream, adapter: &NetAdapter) -> Result<(), ser::Error> { + // deser the header to get the message type + let header = try!(ser::deserialize::(conn.deref_mut())); + if !header.acceptable() { + return Err(ser::Error::CorruptedData); + } + + // wrap our connection with limited byte-counting readers + let mut limit_conn = rw::LimitedRead::new(conn.deref_mut(), MAX_DATA_BYTES); + let mut read_conn = rw::CountingRead::new(&mut limit_conn); + + // check the message type and hopefully do what's expected with it + match header.msg_type { + Type::Ping => { + // respond with pong + try!(self.send_pong()); + }, + Type::Pong => {}, + Type::Transaction => { + let tx = try!(ser::deserialize(&mut read_conn)); + adapter.transaction_received(tx); + }, + Type::Block => { + let b = try!(ser::deserialize(&mut read_conn)); + adapter.block_received(b); + } + _ => error!("uncaught unknown"), + } + + // update total of bytes sent + let mut sent_bytes = self.sent_bytes.lock().unwrap(); + *sent_bytes += header.serialized_len() + (read_conn.bytes_read() as u64); + + Ok(()) + } + /// Helper function to avoid boilerplate, builds a header followed by the /// Writeable body and send the whole thing. + // TODO serialize straight to the connection fn send_msg(&self, t: Type, body: &ser::Writeable) -> Result<(), ser::Error> { let mut data = Vec::new(); try!(ser::serialize(&mut data, &MsgHeader::new(t))); @@ -155,4 +193,24 @@ impl ProtocolV1 { msg_send.as_ref().unwrap().send(data); Ok(()) } + + fn send_pong(&self) -> Result<(), ser::Error> { + let data = try!(ser::ser_vec(&MsgHeader::new(Type::Pong))); + let msg_send = self.msg_send.borrow(); + msg_send.as_ref().unwrap().send(data); + Ok(()) + } + + /// Setup internal communication channels to select over + fn setup_channels(&self) -> (Receiver>, Receiver) { + let (msg_send, msg_recv) = sync_channel(10); + let (stop_send, stop_recv) = sync_channel(1); + { + let mut msg_mut = self.msg_send.borrow_mut(); + *msg_mut = Some(msg_send); + let mut stop_mut = self.stop_send.borrow_mut(); + *stop_mut = Some(stop_send); + } + (msg_recv, stop_recv) + } } diff --git a/p2p/src/server.rs b/p2p/src/server.rs index 5b32ff469..f542e4e0b 100644 --- a/p2p/src/server.rs +++ b/p2p/src/server.rs @@ -42,7 +42,10 @@ fn listen_addr() -> SocketAddr { } pub struct DummyAdapter {} -impl NetAdapter for DummyAdapter {} +impl NetAdapter for DummyAdapter { + fn transaction_received(&self, tx: core::Transaction) {} + fn block_received(&self, b: core::Block) {} +} /// P2P server implementation, handling bootstrapping to find and connect to /// peers, receiving connections from other peers and keep track of all of them. diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 893260ce5..9864608c4 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -63,4 +63,10 @@ pub trait Protocol { /// Bridge between the networking layer and the rest of the system. Handles the /// forwarding or querying of blocks and transactions among other things. -pub trait NetAdapter {} +pub trait NetAdapter { + /// A transaction has been received from one of our peers + fn transaction_received(&self, tx: core::Transaction); + + /// A block has been received from one of our peers + fn block_received(&self, b: core::Block); +}