From 572c1951e1a3a692edb93245e7174fe2d84ac515 Mon Sep 17 00:00:00 2001 From: Ignotus Peverell Date: Sun, 29 Jan 2017 15:52:01 -0800 Subject: [PATCH] Introduced a higher-level connection abstraction, allowing the protocol to stay relatively simple. Deals with the lower level details of sending, receiving data, timeouts and dealing with futures. --- grin/Cargo.toml | 2 +- p2p/Cargo.toml | 4 +- p2p/src/conn.rs | 199 ++++++++++++++++++++++++++++++++++++++++++ p2p/src/handshake.rs | 4 +- p2p/src/lib.rs | 2 + p2p/src/peer.rs | 5 ++ p2p/src/protocol.rs | 202 ++++++++++++------------------------------- p2p/src/server.rs | 1 + p2p/src/types.rs | 2 +- 9 files changed, 270 insertions(+), 151 deletions(-) create mode 100644 p2p/src/conn.rs diff --git a/grin/Cargo.toml b/grin/Cargo.toml index 9c215f75d..8eb6f3bcb 100644 --- a/grin/Cargo.toml +++ b/grin/Cargo.toml @@ -12,7 +12,7 @@ grin_util = { path = "../util" } secp256k1zkp = { path = "../secp256k1zkp" } env_logger="^0.3.5" -futures = "^0.1.6" +futures = "^0.1.9" log = "^0.3" time = "^0.1" tokio-core="^0.1.1" diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 3c5d71777..4d949e47f 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -6,8 +6,9 @@ authors = ["Ignotus Peverell "] [dependencies] bitflags = "^0.7.0" byteorder = "^0.5" -futures = "^0.1.6" +futures = "^0.1.9" log = "^0.3" +net2 = "0.2.0" rand = "^0.3" tokio-core="^0.1.1" time = "^0.1" @@ -15,6 +16,7 @@ enum_primitive = "^0.1.0" num = "^0.1.36" grin_core = { path = "../core" } +grin_util = { path = "../util" } [dev-dependencies] env_logger = "^0.3" diff --git a/p2p/src/conn.rs b/p2p/src/conn.rs new file mode 100644 index 000000000..383871df9 --- /dev/null +++ b/p2p/src/conn.rs @@ -0,0 +1,199 @@ +// Copyright 2016 The Grin Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Provides a connection wrapper that handles the lower level tasks in sending or +//! receiving data from the TCP socket, as well as dealing with timeouts. + +use std::iter; +use std::sync::{Mutex, Arc}; + +use futures; +use futures::{Stream, Future}; +use futures::stream; +use futures::sync::mpsc::{Sender, UnboundedSender, UnboundedReceiver}; +use tokio_core::io::{Io, WriteHalf, ReadHalf, write_all, read_exact}; +use tokio_core::net::TcpStream; + +use core::ser; +use msg::*; + +/// Handler to provide to the connection, will be called back anytime a message is +/// received. The provided sender can be use to immediately send back another +/// message. +pub trait Handler: Sync + Send { + /// Handle function to implement to process incoming messages. A sender to reply + /// immediately as well as the message header and its unparsed body are provided. + fn handle(&self, sender: UnboundedSender>, header: MsgHeader, body: Vec) -> Result<(), ser::Error>; +} + +impl Handler for F + where F: Fn(UnboundedSender>, MsgHeader, Vec) -> Result<(), ser::Error>, F: Sync + Send { + + fn handle(&self, sender: UnboundedSender>, header: MsgHeader, body: Vec) -> Result<(), ser::Error> { + self(sender, header, body) + } +} + +/// A higher level connection wrapping the TcpStream. Maintains the amount of data +/// transmitted and deals with the low-level task of sending and receiving +/// data, parsing message headers and timeouts. +pub struct Connection { + // Channel to push bytes to the remote peer + outbound_chan: UnboundedSender>, + + // Close the connection with the remote peer + close_chan: Sender<()>, + + // Bytes we've sent. + sent_bytes: Arc>, + + // Bytes we've received. + received_bytes: Arc>, + + // Counter for read errors. + error_count: Mutex, +} + +impl Connection { + + /// Start listening on the provided connection and wraps it. Does not hang the + /// current thread, instead just returns a future and the Connection itself. + pub fn listen(conn: TcpStream, handler: F) -> (Connection, Box>) + where F: Handler + 'static { + + let (reader, writer) = conn.split(); + + // prepare the channel that will transmit data to the connection writer + let (tx, rx) = futures::sync::mpsc::unbounded(); + + // same for closing the connection + let (close_tx, close_rx) = futures::sync::mpsc::channel(1); + let close_conn = close_rx.for_each(|_| Ok(())).map_err(|_| ser::Error::CorruptedData); + + let me = Connection { + outbound_chan: tx.clone(), + close_chan: close_tx, + sent_bytes: Arc::new(Mutex::new(0)), + received_bytes: Arc::new(Mutex::new(0)), + error_count: Mutex::new(0), + }; + + // setup the reading future, getting messages from the peer and processing them + let read_msg = me.read_msg(tx, reader, handler).map(|_| ()); + + // setting the writing future, getting messages from our system and sending + // them out + let write_msg = me.write_msg(rx, writer).map(|_| ()); + + // select between our different futures and return them + let fut = Box::new(close_conn.select(read_msg.select(write_msg).map(|_| ()).map_err(|(e, _)| e)) + .map(|_| ()) + .map_err(|(e, _)| e)); + + (me, fut) + } + + /// Prepares the future that gets message data produced by our system and + /// sends it to the peer connection + fn write_msg(&self, + rx: UnboundedReceiver>, + writer: WriteHalf) + -> Box, Error = ser::Error>> { + + let sent_bytes = self.sent_bytes.clone(); + let send_data = rx.map(move |data| { + // add the count of bytes sent + let mut sent_bytes = sent_bytes.lock().unwrap(); + *sent_bytes += data.len() as u64; + data + }) + // write the data and make sure the future returns the right types + .fold(writer, + |writer, data| write_all(writer, data).map_err(|_| ()).map(|(writer, buf)| writer)) + .map_err(|_| ser::Error::CorruptedData); + Box::new(send_data) + } + + /// Prepares the future reading from the peer connection, parsing each + /// message and forwarding them appropriately based on their type + fn read_msg(&self, + sender: UnboundedSender>, + reader: ReadHalf, + handler: F) + -> Box, Error = ser::Error>> + where F: Handler + 'static { + + // infinite iterator stream so we repeat the message reading logic until the + // peer is stopped + let iter = stream::iter(iter::repeat(()).map(Ok::<(), ser::Error>)); + + // setup the reading future, getting messages from the peer and processing them + let recv_bytes = self.received_bytes.clone(); + let handler = Arc::new(handler); + + let read_msg = iter.fold(reader, move |reader, _| { + let recv_bytes = recv_bytes.clone(); + let handler = handler.clone(); + let sender_inner = sender.clone(); + + // first read the message header + read_exact(reader, vec![0u8; HEADER_LEN as usize]) + .map_err(|e| ser::Error::IOErr(e)) + .and_then(move |(reader, buf)| { + let header = try!(ser::deserialize::(&mut &buf[..])); + Ok((reader, header)) + }) + .and_then(move |(reader, header)| { + // now that we have a size, proceed with the body + read_exact(reader, vec![0u8; header.msg_len as usize]) + .map(|(reader, buf)| (reader, header, buf)) + .map_err(|e| ser::Error::IOErr(e)) + }) + .map(move |(reader, header, buf)| { + // add the count of bytes received + let mut recv_bytes = recv_bytes.lock().unwrap(); + *recv_bytes += header.serialized_len() + header.msg_len; + + // and handle the different message types + let msg_type = header.msg_type; + if let Err(e) = handler.handle(sender_inner.clone(), header, buf) { + debug!("Invalid {:?} message: {}", msg_type, e); + } + + reader + }) + }); + Box::new(read_msg) + } + + /// Utility function to send any Writeable. Handles adding the header and + /// serialization. + pub fn send_msg(&self, t: Type, body: &ser::Writeable) -> Result<(), ser::Error> { + + let mut body_data = vec![]; + try!(ser::serialize(&mut body_data, body)); + let mut data = vec![]; + try!(ser::serialize(&mut data, &MsgHeader::new(t, body_data.len() as u64))); + data.append(&mut body_data); + + self.outbound_chan.send(data).map_err(|_| ser::Error::CorruptedData) + } + + /// Bytes sent and received by this peer to the remote peer. + pub fn transmitted_bytes(&self) -> (u64, u64) { + let sent = *self.sent_bytes.lock().unwrap(); + let recv = *self.received_bytes.lock().unwrap(); + (sent, recv) + } +} diff --git a/p2p/src/handshake.rs b/p2p/src/handshake.rs index d25c3f514..e644c95ca 100644 --- a/p2p/src/handshake.rs +++ b/p2p/src/handshake.rs @@ -16,13 +16,11 @@ use std::collections::VecDeque; use std::sync::{Arc, RwLock}; use futures::Future; -use futures::future::ok; use rand::Rng; use rand::os::OsRng; use tokio_core::net::TcpStream; -use tokio_core::io::{write_all, read_exact, read_to_end}; -use core::ser::{serialize, deserialize, Error}; +use core::ser::Error; use msg::*; use types::*; use protocol::ProtocolV1; diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index 4905136b3..f061b8e07 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -26,6 +26,7 @@ extern crate bitflags; extern crate enum_primitive; #[macro_use] extern crate grin_core as core; +extern crate grin_util as util; #[macro_use] extern crate log; extern crate futures; @@ -35,6 +36,7 @@ extern crate rand; extern crate time; extern crate num; +mod conn; pub mod handshake; mod msg; mod peer; diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 89660ac4e..73fe49184 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -31,6 +31,7 @@ unsafe impl Sync for Peer {} unsafe impl Send for Peer {} impl Peer { + /// Initiates the handshake with another peer. pub fn connect(conn: TcpStream, hs: &Handshake) -> Box> { @@ -44,6 +45,7 @@ impl Peer { Box::new(connect_peer) } + /// Accept a handshake initiated by another peer. pub fn accept(conn: TcpStream, hs: &Handshake) -> Box> { @@ -57,6 +59,8 @@ impl Peer { Box::new(hs_peer) } + /// Main peer loop listening for messages and forwarding to the rest of the + /// system. pub fn run(&self, conn: TcpStream, na: Arc) @@ -64,6 +68,7 @@ impl Peer { self.proto.handle(conn, na) } + /// Bytes sent and received by this peer to the remote peer. pub fn transmitted_bytes(&self) -> (u64, u64) { self.proto.transmitted_bytes() } diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index fb4c354df..3af8f7757 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -12,77 +12,57 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::cell::RefCell; -use std::iter; -use std::ops::DerefMut; use std::sync::{Mutex, Arc}; use futures; -use futures::{Stream, Future}; +use futures::Future; use futures::stream; -use futures::sync::mpsc::{UnboundedSender, UnboundedReceiver}; -use tokio_core::io::{Io, WriteHalf, ReadHalf, write_all, read_exact}; +use futures::sync::mpsc::UnboundedSender; use tokio_core::net::TcpStream; use core::core; +use core::core::hash::Hash; use core::ser; +use conn::Connection; use msg::*; use types::*; +use util::OneTime; pub struct ProtocolV1 { - outbound_chan: RefCell>>>, + conn: OneTime, - // Bytes we've sent. - sent_bytes: Arc>, - - // Bytes we've received. - received_bytes: Arc>, - - // Counter for read errors. - error_count: Mutex, + expected_responses: Mutex>, } impl ProtocolV1 { - pub fn new() -> ProtocolV1 { - ProtocolV1 { - outbound_chan: RefCell::new(None), - sent_bytes: Arc::new(Mutex::new(0)), - received_bytes: Arc::new(Mutex::new(0)), - error_count: Mutex::new(0), - } - } + pub fn new() -> ProtocolV1 { + ProtocolV1 { + conn: OneTime::new(), + expected_responses: Mutex::new(vec![]), + } + } } impl Protocol for ProtocolV1 { + /// Sets up the protocol reading, writing and closing logic. fn handle(&self, conn: TcpStream, adapter: Arc) -> Box> { - let (reader, writer) = conn.split(); - // prepare the channel that will transmit data to the connection writer - let (tx, rx) = futures::sync::mpsc::unbounded(); - { - let mut out_mut = self.outbound_chan.borrow_mut(); - *out_mut = Some(tx.clone()); - } + let (conn, listener) = Connection::listen(conn, move |sender, header, data| { + let adapt = adapter.as_ref(); + handle_payload(adapt, sender, header, data) + }); - // setup the reading future, getting messages from the peer and processing them - let read_msg = self.read_msg(tx, reader, adapter).map(|_| ()); + self.conn.init(conn); - // setting the writing future, getting messages from our system and sending - // them out - let write_msg = self.write_msg(rx, writer).map(|_| ()); - - // select between our different futures and return them - Box::new(read_msg.select(write_msg).map(|_| ()).map_err(|(e, _)| e)) + listener } - /// Bytes sent and received by this peer to the remote peer. + /// Bytes sent and received. fn transmitted_bytes(&self) -> (u64, u64) { - let sent = *self.sent_bytes.lock().unwrap(); - let recv = *self.received_bytes.lock().unwrap(); - (sent, recv) + self.conn.borrow().transmitted_bytes() } /// Sends a ping message to the remote peer. Will panic if handle has never @@ -108,114 +88,46 @@ impl Protocol for ProtocolV1 { } impl ProtocolV1 { - /// Prepares the future reading from the peer connection, parsing each - /// message and forwarding them appropriately based on their type - fn read_msg(&self, - sender: UnboundedSender>, - reader: ReadHalf, - adapter: Arc) - -> Box, Error = ser::Error>> { - // infinite iterator stream so we repeat the message reading logic until the - // peer is stopped - let iter = stream::iter(iter::repeat(()).map(Ok::<(), ser::Error>)); - - // setup the reading future, getting messages from the peer and processing them - let recv_bytes = self.received_bytes.clone(); - let read_msg = iter.fold(reader, move |reader, _| { - let mut sender_inner = sender.clone(); - let recv_bytes = recv_bytes.clone(); - let adapter = adapter.clone(); - - // first read the message header - read_exact(reader, vec![0u8; HEADER_LEN as usize]) - .map_err(|e| ser::Error::IOErr(e)) - .and_then(move |(reader, buf)| { - let header = try!(ser::deserialize::(&mut &buf[..])); - Ok((reader, header)) - }) - .and_then(move |(reader, header)| { - // now that we have a size, proceed with the body - read_exact(reader, vec![0u8; header.msg_len as usize]) - .map(|(reader, buf)| (reader, header, buf)) - .map_err(|e| ser::Error::IOErr(e)) - }) - .map(move |(reader, header, buf)| { - // add the count of bytes received - let mut recv_bytes = recv_bytes.lock().unwrap(); - *recv_bytes += header.serialized_len() + header.msg_len; - - // and handle the different message types - if let Err(e) = handle_payload(adapter, &header, buf, &mut sender_inner) { - debug!("Invalid {:?} message: {}", header.msg_type, e); - } - - reader - }) - }); - Box::new(read_msg) - } - - /// Prepares the future that gets message data produced by our system and - /// sends it to the peer connection - fn write_msg(&self, - rx: UnboundedReceiver>, - writer: WriteHalf) - -> Box, Error = ser::Error>> { - - let sent_bytes = self.sent_bytes.clone(); - let send_data = rx.map(move |data| { - // add the count of bytes sent - let mut sent_bytes = sent_bytes.lock().unwrap(); - *sent_bytes += data.len() as u64; - data - }) - // write the data and make sure the future returns the right types - .fold(writer, - |writer, data| write_all(writer, data).map_err(|_| ()).map(|(writer, buf)| writer)) - .map_err(|_| ser::Error::CorruptedData); - Box::new(send_data) - } - - /// Utility function to send any Writeable. Handles adding the header and - /// serialization. fn send_msg(&self, t: Type, body: &ser::Writeable) -> Result<(), ser::Error> { - let mut body_data = vec![]; - try!(ser::serialize(&mut body_data, body)); - let mut data = vec![]; - try!(ser::serialize(&mut data, &MsgHeader::new(t, body_data.len() as u64))); - data.append(&mut body_data); + self.conn.borrow().send_msg(t, body) + } - let mut msg_send = self.outbound_chan.borrow_mut(); - if let Err(e) = msg_send.deref_mut().as_mut().unwrap().send(data) { + fn send_request(&self, t: Type, body: &ser::Writeable, expect_resp: Option<(Type, Hash)>) -> Result<(), ser::Error> { + let sent = self.send_msg(t, body); + + if let Err(e) = sent { warn!("Couldn't send message to remote peer: {}", e); - } - Ok(()) - } + } else if let Some(exp) = expect_resp { + let mut expects = self.expected_responses.lock().unwrap(); + expects.push(exp); + } + Ok(()) + } } -fn handle_payload(adapter: Arc, - header: &MsgHeader, - buf: Vec, - sender: &mut UnboundedSender>) +fn handle_payload(adapter: &NetAdapter, + sender: UnboundedSender>, + header: MsgHeader, + buf: Vec) -> Result<(), ser::Error> { - match header.msg_type { - Type::Ping => { - let data = try!(ser::ser_vec(&MsgHeader::new(Type::Pong, 0))); - sender.send(data); - } - Type::Pong => {} - Type::Transaction => { - let tx = try!(ser::deserialize::(&mut &buf[..])); - adapter.transaction_received(tx); - } - Type::Block => { - let b = try!(ser::deserialize::(&mut &buf[..])); - adapter.block_received(b); - } - _ => { - debug!("unknown message type {:?}", header.msg_type); - } - }; - Ok(()) + match header.msg_type { + Type::Ping => { + let data = try!(ser::ser_vec(&MsgHeader::new(Type::Pong, 0))); + sender.send(data); + } + Type::Pong => {} + Type::Transaction => { + let tx = try!(ser::deserialize::(&mut &buf[..])); + adapter.transaction_received(tx); + } + Type::Block => { + let b = try!(ser::deserialize::(&mut &buf[..])); + adapter.block_received(b); + } + _ => { + debug!("unknown message type {:?}", header.msg_type); + } + }; + Ok(()) } diff --git a/p2p/src/server.rs b/p2p/src/server.rs index 62c0b1908..19784da5e 100644 --- a/p2p/src/server.rs +++ b/p2p/src/server.rs @@ -151,6 +151,7 @@ impl Server { } } + /// Number of peers we're currently connected to. pub fn peers_count(&self) -> u32 { self.peers.read().unwrap().len() as u32 } diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 7035ca317..773933d8c 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -90,7 +90,7 @@ pub trait Protocol { /// Bridge between the networking layer and the rest of the system. Handles the /// forwarding or querying of blocks and transactions from the network among /// other things. -pub trait NetAdapter { +pub trait NetAdapter: Sync + Send { /// A valid transaction has been received from one of our peers fn transaction_received(&self, tx: core::Transaction);