From 7bea7341abbf2bccd0eef74d3547b84414a25be0 Mon Sep 17 00:00:00 2001 From: Ignotus Peverell Date: Sun, 11 Dec 2016 13:04:52 -0800 Subject: [PATCH] Various cleanup and reorganization. --- p2p/src/handshake.rs | 7 +-- p2p/src/msg.rs | 4 +- p2p/src/protocol.rs | 112 ++++++++++++++++++++++++++++--------------- p2p/src/server.rs | 62 ++++++++++++++++-------- 4 files changed, 122 insertions(+), 63 deletions(-) diff --git a/p2p/src/handshake.rs b/p2p/src/handshake.rs index 29156ed75..d25c3f514 100644 --- a/p2p/src/handshake.rs +++ b/p2p/src/handshake.rs @@ -50,6 +50,7 @@ impl Handshake { pub fn connect(&self, conn: TcpStream) -> Box> { + // prepare the first part of the hanshake let nonce = self.next_nonce(); let hand = Hand { version: PROTOCOL_VERSION, @@ -59,10 +60,10 @@ impl Handshake { receiver_addr: SockAddr(conn.peer_addr().unwrap()), user_agent: USER_AGENT.to_string(), }; + + // write and read the handshake response Box::new(write_msg(conn, hand, Type::Hand) - .and_then(|conn| { - read_msg::(conn) - }) + .and_then(|conn| read_msg::(conn)) .and_then(|(conn, shake)| { if shake.version != 1 { Err(Error::UnexpectedData { diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index 67758359c..25339cd6e 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -100,10 +100,12 @@ pub fn write_msg(conn: TcpStream, let mut body_buf = vec![]; ser::serialize(&mut body_buf, &msg); - // build and send the header using the body size + // build and serialize the header using the body size let mut header_buf = vec![]; let blen = body_buf.len() as u64; ser::serialize(&mut header_buf, &MsgHeader::new(msg_type, blen)); + + // send the whole thing write_all(conn, header_buf) .and_then(|(conn, _)| write_all(conn, body_buf)) .map(|(conn, _)| conn) diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index 6897d18f1..af8b58850 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -20,8 +20,8 @@ use std::sync::{Mutex, Arc}; use futures; use futures::{Stream, Future}; use futures::stream; -use futures::sync::mpsc::UnboundedSender; -use tokio_core::io::{Io, write_all, read_exact, read_to_end}; +use futures::sync::mpsc::{UnboundedSender, UnboundedReceiver}; +use tokio_core::io::{Io, WriteHalf, ReadHalf, write_all, read_exact, read_to_end}; use tokio_core::net::TcpStream; use core::core; @@ -67,6 +67,54 @@ impl Protocol for ProtocolV1 { *out_mut = Some(tx.clone()); } + // setup the reading future, getting messages from the peer and processing them + let read_msg = self.read_msg(tx, reader).map(|_| ()); + + // setting the writing future, getting messages from our system and sending + // them out + let write_msg = self.write_msg(rx, writer).map(|_| ()); + + // select between our different futures and return them + Box::new(read_msg.select(write_msg).map(|_| ()).map_err(|(e, _)| e)) + } + + /// Bytes sent and received by this peer to the remote peer. + fn transmitted_bytes(&self) -> (u64, u64) { + let sent = *self.sent_bytes.lock().unwrap(); + let recv = *self.received_bytes.lock().unwrap(); + (sent, recv) + } + + /// Sends a ping message to the remote peer. Will panic if handle has never + /// been called on this protocol. + fn send_ping(&self) -> Result<(), ser::Error> { + self.send_msg(Type::Ping, &Empty {}) + } + + /// Serializes and sends a block to our remote peer + fn send_block(&self, b: &core::Block) -> Result<(), ser::Error> { + self.send_msg(Type::Block, b) + } + + /// Serializes and sends a transaction to our remote peer + fn send_transaction(&self, tx: &core::Transaction) -> Result<(), ser::Error> { + self.send_msg(Type::Transaction, tx) + } + + /// Close the connection to the remote peer + fn close(&self) { + // TODO some kind of shutdown signal + } +} + +impl ProtocolV1 { + /// Prepares the future reading from the peer connection, parsing each + /// message and forwarding them appropriately based on their type + fn read_msg(&self, + tx: UnboundedSender>, + reader: ReadHalf) + -> Box, Error = ser::Error>> { + // infinite iterator stream so we repeat the message reading logic until the // peer is stopped let iter = stream::iter(iter::repeat(()).map(Ok::<(), ser::Error>)); @@ -75,18 +123,19 @@ impl Protocol for ProtocolV1 { let recv_bytes = self.received_bytes.clone(); let read_msg = iter.fold(reader, move |reader, _| { let mut tx_inner = tx.clone(); - let recv_bytes = recv_bytes.clone(); + let recv_bytes = recv_bytes.clone(); + + // first read the message header read_exact(reader, vec![0u8; HEADER_LEN as usize]) .map_err(|e| ser::Error::IOErr(e)) .and_then(move |(reader, buf)| { - // first read the message header let header = try!(ser::deserialize::(&mut &buf[..])); Ok((reader, header)) }) .map(move |(reader, header)| { - // add the count of bytes sent - let mut recv_bytes = recv_bytes.lock().unwrap(); - *recv_bytes += header.serialized_len() + header.msg_len; + // add the count of bytes sent + let mut recv_bytes = recv_bytes.lock().unwrap(); + *recv_bytes += header.serialized_len() + header.msg_len; // and handle the different message types match header.msg_type { @@ -104,9 +153,16 @@ impl Protocol for ProtocolV1 { reader }) }); + Box::new(read_msg) + } + + /// Prepares the future that gets message data produced by our system and + /// sends it to the peer connection + fn write_msg(&self, + rx: UnboundedReceiver>, + writer: WriteHalf) + -> Box, Error = ser::Error>> { - // setting the writing future, getting messages from our system and sending - // them out let sent_bytes = self.sent_bytes.clone(); let send_data = rx.map(move |data| { // add the count of bytes sent @@ -117,43 +173,23 @@ impl Protocol for ProtocolV1 { // write the data and make sure the future returns the right types .fold(writer, |writer, data| write_all(writer, data).map_err(|_| ()).map(|(writer, buf)| writer)) - .map(|_| ()) .map_err(|_| ser::Error::CorruptedData); - - // select between our different futures and return them - Box::new(read_msg.map(|_| ()).select(send_data).map(|_| ()).map_err(|(e, _)| e)) + Box::new(send_data) } - /// Bytes sent and received by this peer to the remote peer. - fn transmitted_bytes(&self) -> (u64, u64) { - let sent = *self.sent_bytes.lock().unwrap(); - let recv = *self.received_bytes.lock().unwrap(); - (sent, recv) - } + /// Utility function to send any Writeable. Handles adding the header and + /// serialization. + fn send_msg(&self, t: Type, body: &ser::Writeable) -> Result<(), ser::Error> { + let mut body_data = vec![]; + try!(ser::serialize(&mut body_data, body)); + let mut data = vec![]; + try!(ser::serialize(&mut data, &MsgHeader::new(t, body_data.len() as u64))); + data.append(&mut body_data); - /// Sends a ping message to the remote peer. Will panic if handle has never - /// been called on this protocol. - fn send_ping(&self) -> Result<(), ser::Error> { - let data = try!(ser::ser_vec(&MsgHeader::new(Type::Ping, 0))); let mut msg_send = self.outbound_chan.borrow_mut(); if let Err(e) = msg_send.deref_mut().as_mut().unwrap().send(data) { warn!("Couldn't send message to remote peer: {}", e); } Ok(()) } - - /// Serializes and sends a block to our remote peer - fn send_block(&self, b: &core::Block) -> Result<(), ser::Error> { - unimplemented!(); - } - - /// Serializes and sends a transaction to our remote peer - fn send_transaction(&self, tx: &core::Transaction) -> Result<(), ser::Error> { - unimplemented!(); - } - - /// Close the connection to the remote peer - fn close(&self) { - // TODO some kind of shutdown signal - } } diff --git a/p2p/src/server.rs b/p2p/src/server.rs index c5b2afbd1..423f21c47 100644 --- a/p2p/src/server.rs +++ b/p2p/src/server.rs @@ -23,6 +23,7 @@ use std::time::Duration; use futures; use futures::{Future, Stream}; +use futures::future::IntoFuture; use tokio_core::net::{TcpListener, TcpStream}; use tokio_core::reactor; @@ -74,24 +75,12 @@ impl Server { let hp = h.clone(); let peers = socket.incoming().map_err(|e| Error::IOErr(e)).map(move |(conn, addr)| { let peers = peers.clone(); + // accept the peer and add it to the server map - let peer_accept = Peer::accept(conn, &hs.clone()).map(move |(conn, peer)| { - let apeer = Arc::new(peer); - let mut peers = peers.write().unwrap(); - peers.push(apeer.clone()); - Ok((conn, apeer)) - }); + let peer_accept = add_to_peers(peers, Peer::accept(conn, &hs.clone())); // wire in a future to timeout the accept after 5 secs - let timeout = reactor::Timeout::new(Duration::new(5, 0), &hp).unwrap(); - let timed_peer = peer_accept.select(timeout.map(Err).map_err(|e| Error::IOErr(e))) - .then(|res| { - match res { - Ok((Ok((conn, p)), _timeout)) => Ok((conn, p)), - Ok((_, _accept)) => Err(Error::TooLargeReadErr), - Err((e, _other)) => Err(e), - } - }); + let timed_peer = with_timeout(Box::new(peer_accept), &hp); // run the main peer protocol timed_peer.and_then(|(conn, peer)| peer.clone().run(conn, &DummyAdapter {})) @@ -124,6 +113,7 @@ impl Server { })) } + /// Asks the server to connect to a new peer. pub fn connect_peer(&self, addr: SocketAddr, h: reactor::Handle) @@ -132,12 +122,11 @@ impl Server { let peers = self.peers.clone(); let request = socket.and_then(move |socket| { let peers = peers.clone(); - Peer::connect(socket, &Handshake::new()).map(move |(conn, peer)| { - let apeer = Arc::new(peer); - let mut peers = peers.write().unwrap(); - peers.push(apeer.clone()); - (conn, apeer) - }) + + // connect to the peer and add it to the server map, wiring it a timeout for + // the handhake + let peer_connect = add_to_peers(peers, Peer::connect(socket, &Handshake::new())); + with_timeout(Box::new(peer_connect), &h) }) .and_then(|(socket, peer)| peer.run(socket, &DummyAdapter {})); Box::new(request) @@ -152,3 +141,34 @@ impl Server { self.stop.into_inner().unwrap().complete(()); } } + +// Adds the peer built by the provided future in the peers map +fn add_to_peers(peers: Arc>>>, + peer_fut: A) + -> Box), ()>, Error = Error>> + where A: IntoFuture + 'static +{ + let peer_add = peer_fut.into_future().map(move |(conn, peer)| { + let apeer = Arc::new(peer); + let mut peers = peers.write().unwrap(); + peers.push(apeer.clone()); + Ok((conn, apeer)) + }); + Box::new(peer_add) +} + +// Adds a timeout to a future +fn with_timeout(fut: Box, Error = Error>>, + h: &reactor::Handle) + -> Box> { + let timeout = reactor::Timeout::new(Duration::new(5, 0), h).unwrap(); + let timed = fut.select(timeout.map(Err).map_err(|e| Error::IOErr(e))) + .then(|res| { + match res { + Ok((Ok(inner), _timeout)) => Ok(inner), + Ok((_, _accept)) => Err(Error::TooLargeReadErr), + Err((e, _other)) => Err(e), + } + }); + Box::new(timed) +}