diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 0d1c572d4..8456ef1b0 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use futures::Future; use tokio_core::net::TcpStream; @@ -55,7 +57,7 @@ impl Peer { Box::new(hs_peer) } - pub fn run(&self, conn: TcpStream, na: &NetAdapter) -> Box> { + pub fn run(&self, conn: TcpStream, na: Arc) -> Box> { self.proto.handle(conn, na) } diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index af8b58850..cbd20babf 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -21,7 +21,7 @@ use futures; use futures::{Stream, Future}; use futures::stream; use futures::sync::mpsc::{UnboundedSender, UnboundedReceiver}; -use tokio_core::io::{Io, WriteHalf, ReadHalf, write_all, read_exact, read_to_end}; +use tokio_core::io::{Io, WriteHalf, ReadHalf, write_all, read_exact}; use tokio_core::net::TcpStream; use core::core; @@ -56,7 +56,7 @@ impl ProtocolV1 { impl Protocol for ProtocolV1 { fn handle(&self, conn: TcpStream, - adapter: &NetAdapter) + adapter: Arc) -> Box> { let (reader, writer) = conn.split(); @@ -68,7 +68,7 @@ impl Protocol for ProtocolV1 { } // setup the reading future, getting messages from the peer and processing them - let read_msg = self.read_msg(tx, reader).map(|_| ()); + let read_msg = self.read_msg(tx, reader, adapter).map(|_| ()); // setting the writing future, getting messages from our system and sending // them out @@ -111,8 +111,9 @@ impl ProtocolV1 { /// Prepares the future reading from the peer connection, parsing each /// message and forwarding them appropriately based on their type fn read_msg(&self, - tx: UnboundedSender>, - reader: ReadHalf) + sender: UnboundedSender>, + reader: ReadHalf, + adapter: Arc) -> Box, Error = ser::Error>> { // infinite iterator stream so we repeat the message reading logic until the @@ -122,8 +123,9 @@ impl ProtocolV1 { // setup the reading future, getting messages from the peer and processing them let recv_bytes = self.received_bytes.clone(); let read_msg = iter.fold(reader, move |reader, _| { - let mut tx_inner = tx.clone(); + let mut sender_inner = sender.clone(); let recv_bytes = recv_bytes.clone(); + let adapter = adapter.clone(); // first read the message header read_exact(reader, vec![0u8; HEADER_LEN as usize]) @@ -132,24 +134,20 @@ impl ProtocolV1 { let header = try!(ser::deserialize::(&mut &buf[..])); Ok((reader, header)) }) - .map(move |(reader, header)| { - // add the count of bytes sent + .and_then(move |(reader, header)| { + // now that we have a size, proceed with the body + read_exact(reader, vec![0u8; header.msg_len as usize]).map(|(reader, buf)| { (reader, header, buf) }).map_err(|e| ser::Error::IOErr(e)) + }) + .map(move |(reader, header, buf)| { + // add the count of bytes received let mut recv_bytes = recv_bytes.lock().unwrap(); *recv_bytes += header.serialized_len() + header.msg_len; // and handle the different message types - match header.msg_type { - Type::Ping => { - let data = ser::ser_vec(&MsgHeader::new(Type::Pong, 0)).unwrap(); - if let Err(e) = tx_inner.send(data) { - warn!("Couldn't send pong to remote peer: {}", e); - } - } - Type::Pong => {} - _ => { - error!("unknown message type {:?}", header.msg_type); - } - }; + if let Err(e) = handle_payload(adapter, &header, buf, &mut sender_inner) { + debug!("Invalid {:?} message: {}", header.msg_type, e); + } + reader }) }); @@ -193,3 +191,25 @@ impl ProtocolV1 { Ok(()) } } + +fn handle_payload(adapter: Arc, header: &MsgHeader, buf: Vec, sender: &mut UnboundedSender>) -> Result<(), ser::Error> { + match header.msg_type { + Type::Ping => { + let data = try!(ser::ser_vec(&MsgHeader::new(Type::Pong, 0))); + sender.send(data); + } + Type::Pong => {} + Type::Transaction => { + let tx = try!(ser::deserialize::(&mut &buf[..])); + adapter.transaction_received(tx); + } + Type::Block => { + let b = try!(ser::deserialize::(&mut &buf[..])); + adapter.block_received(b); + } + _ => { + debug!("unknown message type {:?}", header.msg_type); + } + }; + Ok(()) +} diff --git a/p2p/src/server.rs b/p2p/src/server.rs index 6ce52dd87..2901da6f6 100644 --- a/p2p/src/server.rs +++ b/p2p/src/server.rs @@ -33,6 +33,7 @@ use handshake::Handshake; use peer::Peer; use types::*; +/// A no-op network adapter used for testing. pub struct DummyAdapter {} impl NetAdapter for DummyAdapter { fn transaction_received(&self, tx: core::Transaction) {} @@ -83,7 +84,7 @@ impl Server { let timed_peer = with_timeout(Box::new(peer_accept), &hp); // run the main peer protocol - timed_peer.and_then(|(conn, peer)| peer.clone().run(conn, &DummyAdapter {})) + timed_peer.and_then(|(conn, peer)| peer.clone().run(conn, Arc::new(DummyAdapter {}))) }); // spawn each peer future to its own task @@ -128,7 +129,7 @@ impl Server { let peer_connect = add_to_peers(peers, Peer::connect(socket, &Handshake::new())); with_timeout(Box::new(peer_connect), &h) }) - .and_then(|(socket, peer)| peer.run(socket, &DummyAdapter {})); + .and_then(|(socket, peer)| peer.run(socket, Arc::new(DummyAdapter {}))); Box::new(request) } diff --git a/p2p/src/types.rs b/p2p/src/types.rs index e87e8609f..0dfafcc9d 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::net::{SocketAddr, IpAddr}; +use std::sync::Arc; use futures::Future; use tokio_core::net::TcpStream; @@ -65,7 +66,7 @@ pub trait Protocol { /// be known already, usually passed during construction. Will typically /// block so needs to be called withing a coroutine. Should also be called /// only once. - fn handle(&self, conn: TcpStream, na: &NetAdapter) -> Box>; + fn handle(&self, conn: TcpStream, na: Arc) -> Box>; /// Sends a ping message to the remote peer. fn send_ping(&self) -> Result<(), Error>; @@ -86,7 +87,7 @@ pub trait Protocol { /// Bridge between the networking layer and the rest of the system. Handles the /// forwarding or querying of blocks and transactions among other things. pub trait NetAdapter { - /// A transaction has been received from one of our peers + /// A valid transaction has been received from one of our peers fn transaction_received(&self, tx: core::Transaction); /// A block has been received from one of our peers diff --git a/p2p/tests/peer_handshake.rs b/p2p/tests/peer_handshake.rs index b13f1c6ff..2b4875f47 100644 --- a/p2p/tests/peer_handshake.rs +++ b/p2p/tests/peer_handshake.rs @@ -19,6 +19,7 @@ extern crate futures; extern crate tokio_core; use std::net::SocketAddr; +use std::sync::Arc; use std::time; use futures::future::Future; @@ -50,7 +51,7 @@ fn peer_handshake() { socket.and_then(move |socket| { Peer::connect(socket, &p2p::handshake::Handshake::new()) }).and_then(move |(socket, peer)| { - rhandle.spawn(peer.run(socket, &p2p::DummyAdapter {}).map_err(|e| { + rhandle.spawn(peer.run(socket, Arc::new(p2p::DummyAdapter {})).map_err(|e| { panic!("Client run failed: {}", e); })); peer.send_ping().unwrap();