From 4b5c010b054f31431fe540f0dad3828a2d1af1b6 Mon Sep 17 00:00:00 2001 From: Ignotus Peverell Date: Sat, 10 Dec 2016 19:11:49 -0800 Subject: [PATCH] Rewrote most of p2p code to use futures-rs instead of mioco. Need some cleanup and support for a few more message types. --- core/src/ser.rs | 8 +- p2p/Cargo.toml | 5 +- p2p/src/handshake.rs | 176 +++++++++++------------- p2p/src/lib.rs | 10 +- p2p/src/msg.rs | 100 ++++++++++++-- p2p/src/peer.rs | 61 ++++----- p2p/src/protocol.rs | 259 ++++++++++++++---------------------- p2p/src/rw.rs | 87 ------------ p2p/src/server.rs | 178 ++++++++++++------------- p2p/src/types.rs | 8 +- p2p/tests/common/mod.rs | 58 -------- p2p/tests/peer_handshake.rs | 62 ++++++--- p2p/tests/peer_tx_send.rs | 82 ------------ 13 files changed, 439 insertions(+), 655 deletions(-) delete mode 100644 p2p/src/rw.rs delete mode 100644 p2p/tests/common/mod.rs delete mode 100644 p2p/tests/peer_tx_send.rs diff --git a/core/src/ser.rs b/core/src/ser.rs index 795f1b63a..9240b7a74 100644 --- a/core/src/ser.rs +++ b/core/src/ser.rs @@ -36,7 +36,7 @@ pub enum Error { /// Data wasn't in a consumable format CorruptedData, /// When asked to read too much data - TooLargeReadErr(String), + TooLargeReadErr, } impl From for Error { @@ -53,7 +53,7 @@ impl fmt::Display for Error { write!(f, "expected {:?}, got {:?}", e, r) } Error::CorruptedData => f.write_str("corrupted data"), - Error::TooLargeReadErr(ref s) => f.write_str(&s), + Error::TooLargeReadErr => f.write_str("too large read"), } } } @@ -71,7 +71,7 @@ impl error::Error for Error { Error::IOErr(ref e) => error::Error::description(e), Error::UnexpectedData { expected: _, received: _ } => "unexpected data", Error::CorruptedData => "corrupted data", - Error::TooLargeReadErr(ref s) => s, + Error::TooLargeReadErr => "too large read", } } } @@ -238,7 +238,7 @@ impl<'a> Reader for BinReader<'a> { fn read_fixed_bytes(&mut self, length: usize) -> Result, Error> { // not reading more than 100k in a single read if length > 100000 { - return Err(Error::TooLargeReadErr(format!("fixed bytes length too large: {}", length))); + return Err(Error::TooLargeReadErr); } let mut buf = vec![0; length]; self.source.read_exact(&mut buf).map(move |_| buf).map_err(Error::IOErr) diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 4d443a646..e0c9c153b 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -1,14 +1,15 @@ [package] -name = "grin_p2p" +name = "grin_p2p_fut" version = "0.1.0" authors = ["Ignotus Peverell "] [dependencies] bitflags = "^0.7.0" byteorder = "^0.5" +futures = "^0.1.6" log = "^0.3" rand = "^0.3" -mioco = "^0.8" +tokio-core="^0.1.1" time = "^0.1" enum_primitive = "^0.1.0" num = "^0.1.36" diff --git a/p2p/src/handshake.rs b/p2p/src/handshake.rs index 0534d03e6..29156ed75 100644 --- a/p2p/src/handshake.rs +++ b/p2p/src/handshake.rs @@ -13,11 +13,14 @@ // limitations under the License. use std::collections::VecDeque; -use std::sync::RwLock; +use std::sync::{Arc, RwLock}; +use futures::Future; +use futures::future::ok; use rand::Rng; use rand::os::OsRng; -use mioco::tcp::{TcpStream, Shutdown}; +use tokio_core::net::TcpStream; +use tokio_core::io::{write_all, read_exact, read_to_end}; use core::ser::{serialize, deserialize, Error}; use msg::*; @@ -31,7 +34,7 @@ const NONCES_CAP: usize = 100; pub struct Handshake { /// Ring buffer of nonces sent to detect self connections without requiring /// a node id. - nonces: RwLock>, + nonces: Arc>>, } unsafe impl Sync for Handshake {} @@ -40,99 +43,91 @@ unsafe impl Send for Handshake {} impl Handshake { /// Creates a new handshake handler pub fn new() -> Handshake { - Handshake { nonces: RwLock::new(VecDeque::with_capacity(NONCES_CAP)) } + Handshake { nonces: Arc::new(RwLock::new(VecDeque::with_capacity(NONCES_CAP))) } } /// Handles connecting to a new remote peer, starting the version handshake. - pub fn connect(&self, mut conn: TcpStream) -> Result<(Box, PeerInfo), Error> { - // get a new nonce that can be used on handshake to detect self-connection + pub fn connect(&self, + conn: TcpStream) + -> Box> { let nonce = self.next_nonce(); - - // send the first part of the handshake - let sender_addr = conn.local_addr().unwrap(); - let receiver_addr = conn.peer_addr().unwrap(); - try!(serialize(&mut conn, - &Hand { - version: PROTOCOL_VERSION, - capabilities: FULL_SYNC, - nonce: nonce, - sender_addr: SockAddr(sender_addr), - receiver_addr: SockAddr(receiver_addr), - user_agent: USER_AGENT.to_string(), - })); - - // deserialize the handshake response and do version negotiation - let shake = try!(deserialize::(&mut conn)); - if shake.version != 1 { - self.close(&mut conn, - ErrCodes::UnsupportedVersion as u32, - format!("Unsupported version: {}, ours: {})", - shake.version, - PROTOCOL_VERSION)); - return Err(Error::UnexpectedData { - expected: vec![PROTOCOL_VERSION as u8], - received: vec![shake.version as u8], - }); - } - - let peer_info = PeerInfo { - capabilities: shake.capabilities, - user_agent: shake.user_agent, - addr: receiver_addr, - version: shake.version, + let hand = Hand { + version: PROTOCOL_VERSION, + capabilities: FULL_SYNC, + nonce: nonce, + sender_addr: SockAddr(conn.local_addr().unwrap()), + receiver_addr: SockAddr(conn.peer_addr().unwrap()), + user_agent: USER_AGENT.to_string(), }; + Box::new(write_msg(conn, hand, Type::Hand) + .and_then(|conn| { + read_msg::(conn) + }) + .and_then(|(conn, shake)| { + if shake.version != 1 { + Err(Error::UnexpectedData { + expected: vec![PROTOCOL_VERSION as u8], + received: vec![shake.version as u8], + }) + } else { + let peer_info = PeerInfo { + capabilities: shake.capabilities, + user_agent: shake.user_agent, + addr: conn.peer_addr().unwrap(), + version: shake.version, + }; - info!("Connected to peer {:?}", peer_info); - // when more than one protocol version is supported, choosing should go here - Ok((Box::new(ProtocolV1::new(conn)), peer_info)) + info!("Connected to peer {:?}", peer_info); + // when more than one protocol version is supported, choosing should go here + Ok((conn, ProtocolV1::new(), peer_info)) + } + })) } /// Handles receiving a connection from a new remote peer that started the /// version handshake. - pub fn handshake(&self, mut conn: TcpStream) -> Result<(Box, PeerInfo), Error> { - // deserialize first part of handshake sent to us and do version negotiation - let hand = try!(deserialize::(&mut conn)); - if hand.version != 1 { - self.close(&mut conn, - ErrCodes::UnsupportedVersion as u32, - format!("Unsupported version: {}, ours: {})", - hand.version, - PROTOCOL_VERSION)); - return Err(Error::UnexpectedData { - expected: vec![PROTOCOL_VERSION as u8], - received: vec![hand.version as u8], - }); - } - { - // check the nonce to see if we could be trying to connect to ourselves - let nonces = self.nonces.read().unwrap(); - if nonces.contains(&hand.nonce) { - return Err(Error::UnexpectedData { - expected: vec![], - received: vec![], - }); - } - } - - // all good, keep peer info - let peer_info = PeerInfo { - capabilities: hand.capabilities, - user_agent: hand.user_agent, - addr: conn.peer_addr().unwrap(), - version: hand.version, - }; - - // send our reply with our info - try!(serialize(&mut conn, - &Shake { - version: PROTOCOL_VERSION, - capabilities: FULL_SYNC, - user_agent: USER_AGENT.to_string(), - })); - - info!("Received connection from peer {:?}", peer_info); - // when more than one protocol version is supported, choosing should go here - Ok((Box::new(ProtocolV1::new(conn)), peer_info)) + pub fn handshake(&self, + conn: TcpStream) + -> Box> { + let nonces = self.nonces.clone(); + Box::new(read_msg::(conn) + .and_then(move |(conn, hand)| { + if hand.version != 1 { + return Err(Error::UnexpectedData { + expected: vec![PROTOCOL_VERSION as u8], + received: vec![hand.version as u8], + }); + } + { + // check the nonce to see if we could be trying to connect to ourselves + let nonces = nonces.read().unwrap(); + if nonces.contains(&hand.nonce) { + return Err(Error::UnexpectedData { + expected: vec![], + received: vec![], + }); + } + } + // all good, keep peer info + let peer_info = PeerInfo { + capabilities: hand.capabilities, + user_agent: hand.user_agent, + addr: conn.peer_addr().unwrap(), + version: hand.version, + }; + // send our reply with our info + let shake = Shake { + version: PROTOCOL_VERSION, + capabilities: FULL_SYNC, + user_agent: USER_AGENT.to_string(), + }; + Ok((conn, shake, peer_info)) + }) + .and_then(|(conn, shake, peer_info)| { + write_msg(conn, shake, Type::Shake) + // when more than one protocol version is supported, choosing should go here + .map(|conn| (conn, ProtocolV1::new(), peer_info)) + })) } /// Generate a new random nonce and store it in our ring buffer @@ -147,13 +142,4 @@ impl Handshake { } nonce } - - fn close(&self, conn: &mut TcpStream, err_code: u32, explanation: String) { - serialize(conn, - &PeerError { - code: err_code, - message: explanation, - }); - conn.shutdown(Shutdown::Both); - } } diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index 40c69b09b..c6c43377f 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -28,19 +28,19 @@ extern crate enum_primitive; extern crate grin_core as core; #[macro_use] extern crate log; +extern crate futures; #[macro_use] -extern crate mioco; +extern crate tokio_core; extern crate rand; extern crate time; extern crate num; -mod types; -mod msg; pub mod handshake; -mod rw; +mod msg; +mod peer; mod protocol; mod server; -mod peer; +mod types; pub use server::{Server, DummyAdapter}; pub use peer::Peer; diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index 39917c6b4..67758359c 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -14,11 +14,15 @@ //! Message types that transit over the network and related serialization code. -use std::net::*; - +use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6, Ipv4Addr, Ipv6Addr}; use num::FromPrimitive; +use futures::future::{Future, ok}; +use tokio_core::net::TcpStream; +use tokio_core::io::{write_all, read_exact}; + use core::ser::{self, Writeable, Readable, Writer, Reader}; +use core::consensus::MAX_MSG_LEN; use types::*; @@ -30,6 +34,9 @@ pub const USER_AGENT: &'static str = "MW/Grin 0.1"; /// Magic number expected in the header of every message const MAGIC: [u8; 2] = [0x1e, 0xc5]; +/// Size in bytes of a message header +pub const HEADER_LEN: u64 = 11; + /// Codes for each error that can be produced reading a message. pub enum ErrCodes { UnsupportedVersion = 100, @@ -51,27 +58,79 @@ enum_from_primitive! { } } +/// Future combinator to read any message where the body is a Readable. Reads +/// the header first, handles its validation and then reads the Readable body, +/// allocating buffers of the right size. +pub fn read_msg(conn: TcpStream) -> Box> + where T: Readable + 'static +{ + let read_header = read_exact(conn, vec![0u8; HEADER_LEN as usize]) + .map_err(|e| ser::Error::IOErr(e)) + .and_then(|(reader, buf)| { + let header = try!(ser::deserialize::(&mut &buf[..])); + if header.msg_len > MAX_MSG_LEN { + // TODO add additional restrictions on a per-message-type basis to avoid 20MB + // pings + return Err(ser::Error::TooLargeReadErr); + } + Ok((reader, header)) + }); + + let read_msg = read_header.and_then(|(reader, header)| { + read_exact(reader, vec![0u8; header.msg_len as usize]).map_err(|e| ser::Error::IOErr(e)) + }) + .and_then(|(reader, buf)| { + let body = try!(ser::deserialize(&mut &buf[..])); + Ok((reader, body)) + }); + Box::new(read_msg) +} + +/// Future combinator to write a full message from a Writeable payload. +/// Serializes the payload first and then sends the message header and that +/// payload. +pub fn write_msg(conn: TcpStream, + msg: T, + msg_type: Type) + -> Box> + where T: Writeable + 'static +{ + let write_msg = ok((conn)).and_then(move |conn| { + // prepare the body first so we know its serialized length + let mut body_buf = vec![]; + ser::serialize(&mut body_buf, &msg); + + // build and send the header using the body size + let mut header_buf = vec![]; + let blen = body_buf.len() as u64; + ser::serialize(&mut header_buf, &MsgHeader::new(msg_type, blen)); + write_all(conn, header_buf) + .and_then(|(conn, _)| write_all(conn, body_buf)) + .map(|(conn, _)| conn) + .map_err(|e| ser::Error::IOErr(e)) + }); + Box::new(write_msg) +} + /// Header of any protocol message, used to identify incoming messages. pub struct MsgHeader { magic: [u8; 2], pub msg_type: Type, + pub msg_len: u64, } impl MsgHeader { - pub fn new(msg_type: Type) -> MsgHeader { + pub fn new(msg_type: Type, len: u64) -> MsgHeader { MsgHeader { magic: MAGIC, msg_type: msg_type, + msg_len: len, } } - pub fn acceptable(&self) -> bool { - Type::from_u8(self.msg_type as u8).is_some() - } - /// Serialized length of the header in bytes pub fn serialized_len(&self) -> u64 { - 3 + HEADER_LEN } } @@ -80,7 +139,8 @@ impl Writeable for MsgHeader { ser_multiwrite!(writer, [write_u8, self.magic[0]], [write_u8, self.magic[1]], - [write_u8, self.msg_type as u8]); + [write_u8, self.msg_type as u8], + [write_u64, self.msg_len]); Ok(()) } } @@ -89,12 +149,13 @@ impl Readable for MsgHeader { fn read(reader: &mut Reader) -> Result { try!(reader.expect_u8(MAGIC[0])); try!(reader.expect_u8(MAGIC[1])); - let t = try!(reader.read_u8()); + let (t, len) = ser_multiread!(reader, read_u8, read_u64); match Type::from_u8(t) { Some(ty) => { Ok(MsgHeader { magic: MAGIC, msg_type: ty, + msg_len: len, }) } None => Err(ser::Error::CorruptedData), @@ -224,8 +285,7 @@ impl Readable for PeerAddrs { fn read(reader: &mut Reader) -> Result { let peer_count = try!(reader.read_u32()); if peer_count > 1000 { - return Err(ser::Error::TooLargeReadErr(format!("Too many peers provided: {}", - peer_count))); + return Err(ser::Error::TooLargeReadErr); } let peers = try_map_vec!([0..peer_count], |_| SockAddr::read(reader)); Ok(PeerAddrs { peers: peers }) @@ -313,3 +373,19 @@ impl Readable for SockAddr { } } } + +/// Placeholder for messages like Ping and Pong that don't send anything but +/// the header. +pub struct Empty {} + +impl Writeable for Empty { + fn write(&self, writer: &mut Writer) -> Result<(), ser::Error> { + Ok(()) + } +} + +impl Readable for Empty { + fn read(reader: &mut Reader) -> Result { + Ok(Empty {}) + } +} diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 99c9b4b0d..0d1c572d4 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -12,7 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use mioco::tcp::TcpStream; +use futures::Future; +use tokio_core::net::TcpStream; use core::core; use core::ser::Error; @@ -28,45 +29,45 @@ unsafe impl Sync for Peer {} unsafe impl Send for Peer {} impl Peer { - pub fn connect(conn: TcpStream, hs: &Handshake) -> Result { - let (proto, info) = try!(hs.connect(conn)); - Ok(Peer { - info: info, - proto: proto, - }) + pub fn connect(conn: TcpStream, + hs: &Handshake) + -> Box> { + let connect_peer = hs.connect(conn).and_then(|(conn, proto, info)| { + Ok((conn, + Peer { + info: info, + proto: Box::new(proto), + })) + }); + Box::new(connect_peer) } - pub fn accept(conn: TcpStream, hs: &Handshake) -> Result { - let (proto, info) = try!(hs.handshake(conn)); - Ok(Peer { - info: info, - proto: proto, - }) + pub fn accept(conn: TcpStream, + hs: &Handshake) + -> Box> { + let hs_peer = hs.handshake(conn).and_then(|(conn, proto, info)| { + Ok((conn, + Peer { + info: info, + proto: Box::new(proto), + })) + }); + Box::new(hs_peer) } - pub fn run(&self, na: &NetAdapter) -> Result<(), Error> { - self.proto.handle(na) - } - - pub fn send_ping(&self) -> Result<(), Error> { - self.proto.send_ping() - } - - pub fn send_block(&self, b: &core::Block) -> Result<(), Error> { - // TODO don't send if we already got the block from peer - self.proto.send_block(b) - } - - pub fn send_transaction(&self, tx: &core::Transaction) -> Result<(), Error> { - // TODO don't relay if we already got the tx from peer - self.proto.send_transaction(tx) + pub fn run(&self, conn: TcpStream, na: &NetAdapter) -> Box> { + self.proto.handle(conn, na) } pub fn transmitted_bytes(&self) -> (u64, u64) { self.proto.transmitted_bytes() } + pub fn send_ping(&self) -> Result<(), Error> { + self.proto.send_ping() + } + pub fn stop(&self) { - self.proto.as_ref().close() + self.proto.close(); } } diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index bf0d9f517..6897d18f1 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -13,210 +13,147 @@ // limitations under the License. use std::cell::RefCell; -use std::io::Write; -use std::ops::{Deref, DerefMut}; -use std::sync::Mutex; +use std::iter; +use std::ops::DerefMut; +use std::sync::{Mutex, Arc}; -use mioco; -use mioco::sync::mpsc::{sync_channel, SyncSender, Receiver}; -use mioco::tcp::{TcpStream, Shutdown}; +use futures; +use futures::{Stream, Future}; +use futures::stream; +use futures::sync::mpsc::UnboundedSender; +use tokio_core::io::{Io, write_all, read_exact, read_to_end}; +use tokio_core::net::TcpStream; use core::core; use core::ser; use msg::*; -use rw; use types::*; -/// In normal peer operation we don't want to be sent more than 100Mb in a -/// single message. -const MAX_DATA_BYTES: usize = 100 * 1000 * 1000; - -/// Number of errors before we disconnect from a peer. -const MAX_ERRORS: u64 = 5; - -/// First version of our communication protocol. Manages the underlying -/// connection, listening to incoming messages and transmitting outgoing ones. pub struct ProtocolV1 { - // The underlying tcp connection. - conn: RefCell, + outbound_chan: RefCell>>>, - // Send channel for the rest of the local system to send messages to the peer we're connected to. - msg_send: RefCell>>>, - - // Stop channel to exit the send/listen loop. - stop_send: RefCell>>, - - // Used both to count the amount of data sent and lock writing to the conn. We can't wrap conn with - // the lock as we're always listening to receive. - sent_bytes: Mutex, + // Bytes we've sent. + sent_bytes: Arc>, // Bytes we've received. - received_bytes: Mutex, + received_bytes: Arc>, // Counter for read errors. error_count: Mutex, } impl ProtocolV1 { - /// Creates a new protocol v1 - pub fn new(conn: TcpStream) -> ProtocolV1 { + pub fn new() -> ProtocolV1 { ProtocolV1 { - conn: RefCell::new(conn), - msg_send: RefCell::new(None), - stop_send: RefCell::new(None), - sent_bytes: Mutex::new(0), - received_bytes: Mutex::new(0), + outbound_chan: RefCell::new(None), + sent_bytes: Arc::new(Mutex::new(0)), + received_bytes: Arc::new(Mutex::new(0)), error_count: Mutex::new(0), } } } impl Protocol for ProtocolV1 { - /// Main protocol connection handling loop, starts listening to incoming - /// messages and transmitting messages the rest of the local system wants - /// to send. Must be called before any interaction with a protocol instance - /// and should only be called once. Will block so also needs to be called - /// within a coroutine. - fn handle(&self, adapter: &NetAdapter) -> Result<(), ser::Error> { - // setup channels so we can switch between reads, writes and close - let (msg_recv, stop_recv) = self.setup_channels(); + fn handle(&self, + conn: TcpStream, + adapter: &NetAdapter) + -> Box> { + let (reader, writer) = conn.split(); - let mut conn = self.conn.borrow_mut(); - loop { - // main select loop, switches between listening, sending or stopping - select!( - r:conn => { - let res = self.process_msg(&mut conn, adapter); - if let Err(_) = res { - let mut cnt = self.error_count.lock().unwrap(); - *cnt += 1; - if *cnt > MAX_ERRORS { - return res.map(|_| ()); - } - } - }, - r:msg_recv => { - // relay a message originated from the rest of the local system - let data = &msg_recv.recv().unwrap()[..]; - let mut sent_bytes = self.sent_bytes.lock().unwrap(); - *sent_bytes += data.len() as u64; - try!(conn.deref_mut().write_all(data).map_err(&ser::Error::IOErr)); - }, - r:stop_recv => { - // shuts the connection don and end the loop - stop_recv.recv(); - conn.shutdown(Shutdown::Both); - return Ok(()); - } - ); + // prepare the channel that will transmit data to the connection writer + let (tx, rx) = futures::sync::mpsc::unbounded(); + { + let mut out_mut = self.outbound_chan.borrow_mut(); + *out_mut = Some(tx.clone()); } + + // infinite iterator stream so we repeat the message reading logic until the + // peer is stopped + let iter = stream::iter(iter::repeat(()).map(Ok::<(), ser::Error>)); + + // setup the reading future, getting messages from the peer and processing them + let recv_bytes = self.received_bytes.clone(); + let read_msg = iter.fold(reader, move |reader, _| { + let mut tx_inner = tx.clone(); + let recv_bytes = recv_bytes.clone(); + read_exact(reader, vec![0u8; HEADER_LEN as usize]) + .map_err(|e| ser::Error::IOErr(e)) + .and_then(move |(reader, buf)| { + // first read the message header + let header = try!(ser::deserialize::(&mut &buf[..])); + Ok((reader, header)) + }) + .map(move |(reader, header)| { + // add the count of bytes sent + let mut recv_bytes = recv_bytes.lock().unwrap(); + *recv_bytes += header.serialized_len() + header.msg_len; + + // and handle the different message types + match header.msg_type { + Type::Ping => { + let data = ser::ser_vec(&MsgHeader::new(Type::Pong, 0)).unwrap(); + if let Err(e) = tx_inner.send(data) { + warn!("Couldn't send pong to remote peer: {}", e); + } + } + Type::Pong => {} + _ => { + error!("unknown message type {:?}", header.msg_type); + } + }; + reader + }) + }); + + // setting the writing future, getting messages from our system and sending + // them out + let sent_bytes = self.sent_bytes.clone(); + let send_data = rx.map(move |data| { + // add the count of bytes sent + let mut sent_bytes = sent_bytes.lock().unwrap(); + *sent_bytes += data.len() as u64; + data + }) + // write the data and make sure the future returns the right types + .fold(writer, + |writer, data| write_all(writer, data).map_err(|_| ()).map(|(writer, buf)| writer)) + .map(|_| ()) + .map_err(|_| ser::Error::CorruptedData); + + // select between our different futures and return them + Box::new(read_msg.map(|_| ()).select(send_data).map(|_| ()).map_err(|(e, _)| e)) } + /// Bytes sent and received by this peer to the remote peer. + fn transmitted_bytes(&self) -> (u64, u64) { + let sent = *self.sent_bytes.lock().unwrap(); + let recv = *self.received_bytes.lock().unwrap(); + (sent, recv) + } + /// Sends a ping message to the remote peer. Will panic if handle has never /// been called on this protocol. fn send_ping(&self) -> Result<(), ser::Error> { - let data = try!(ser::ser_vec(&MsgHeader::new(Type::Ping))); - let msg_send = self.msg_send.borrow(); - msg_send.as_ref().unwrap().send(data); + let data = try!(ser::ser_vec(&MsgHeader::new(Type::Ping, 0))); + let mut msg_send = self.outbound_chan.borrow_mut(); + if let Err(e) = msg_send.deref_mut().as_mut().unwrap().send(data) { + warn!("Couldn't send message to remote peer: {}", e); + } Ok(()) } /// Serializes and sends a block to our remote peer fn send_block(&self, b: &core::Block) -> Result<(), ser::Error> { - self.send_msg(Type::Block, b) + unimplemented!(); } /// Serializes and sends a transaction to our remote peer fn send_transaction(&self, tx: &core::Transaction) -> Result<(), ser::Error> { - self.send_msg(Type::Transaction, tx) - } - - /// Bytes sent and received by this peer to the remote peer. - fn transmitted_bytes(&self) -> (u64, u64) { - let sent = *self.sent_bytes.lock().unwrap().deref(); - let received = *self.received_bytes.lock().unwrap().deref(); - (sent, received) + unimplemented!(); } /// Close the connection to the remote peer fn close(&self) { - let stop_send = self.stop_send.borrow(); - stop_send.as_ref().unwrap().send(0); - } -} - -impl ProtocolV1 { - /// Reads a message from the peer tcp stream and process it, usually simply - /// forwarding to the adapter. - fn process_msg(&self, - mut conn: &mut TcpStream, - adapter: &NetAdapter) - -> Result<(), ser::Error> { - // deser the header to get the message type - let header = try!(ser::deserialize::(conn.deref_mut())); - if !header.acceptable() { - return Err(ser::Error::CorruptedData); - } - - // wrap our connection with limited byte-counting readers - let mut limit_conn = rw::LimitedRead::new(conn.deref_mut(), MAX_DATA_BYTES); - let mut read_conn = rw::CountingRead::new(&mut limit_conn); - - // check the message type and hopefully do what's expected with it - match header.msg_type { - Type::Ping => { - // respond with pong - try!(self.send_pong()); - } - Type::Pong => {} - Type::Transaction => { - let tx = try!(ser::deserialize(&mut read_conn)); - adapter.transaction_received(tx); - } - Type::Block => { - let b = try!(ser::deserialize(&mut read_conn)); - adapter.block_received(b); - } - _ => error!("uncaught unknown"), - } - - // update total of bytes sent - let mut received_bytes = self.received_bytes.lock().unwrap(); - *received_bytes += header.serialized_len() + (read_conn.bytes_read() as u64); - - Ok(()) - } - - /// Helper function to avoid boilerplate, builds a header followed by the - /// Writeable body and send the whole thing. - // TODO serialize straight to the connection - fn send_msg(&self, t: Type, body: &ser::Writeable) -> Result<(), ser::Error> { - let mut data = Vec::new(); - try!(ser::serialize(&mut data, &MsgHeader::new(t))); - try!(ser::serialize(&mut data, body)); - let msg_send = self.msg_send.borrow(); - msg_send.as_ref().unwrap().send(data); - Ok(()) - } - - /// Sends a pong message (usually in reply to ping) - fn send_pong(&self) -> Result<(), ser::Error> { - let data = try!(ser::ser_vec(&MsgHeader::new(Type::Pong))); - let msg_send = self.msg_send.borrow(); - msg_send.as_ref().unwrap().send(data); - Ok(()) - } - - /// Setup internal communication channels to select over - fn setup_channels(&self) -> (Receiver>, Receiver) { - let (msg_send, msg_recv) = sync_channel(10); - let (stop_send, stop_recv) = sync_channel(1); - { - let mut msg_mut = self.msg_send.borrow_mut(); - *msg_mut = Some(msg_send); - let mut stop_mut = self.stop_send.borrow_mut(); - *stop_mut = Some(stop_send); - } - (msg_recv, stop_recv) + // TODO some kind of shutdown signal } } diff --git a/p2p/src/rw.rs b/p2p/src/rw.rs deleted file mode 100644 index cd9cb03c1..000000000 --- a/p2p/src/rw.rs +++ /dev/null @@ -1,87 +0,0 @@ -use std::io::{self, Read, Write, Result}; -use core::ser; - -/// A Read implementation that counts the number of bytes consumed from an -/// underlying Read. -pub struct CountingRead<'a> { - counter: usize, - source: &'a mut Read, -} - -impl<'a> CountingRead<'a> { - /// Creates a new Read wrapping the underlying one, counting bytes consumed - pub fn new(source: &mut Read) -> CountingRead { - CountingRead { - counter: 0, - source: source, - } - } - - /// Number of bytes that have been read from the underlying reader - pub fn bytes_read(&self) -> usize { - self.counter - } -} - -impl<'a> Read for CountingRead<'a> { - fn read(&mut self, buf: &mut [u8]) -> Result { - let r = self.source.read(buf); - if let Ok(sz) = r { - self.counter += sz; - } - r - } -} - -/// A Read implementation that errors out past a maximum number of bytes read. -pub struct LimitedRead<'a> { - counter: usize, - max: usize, - source: &'a mut Read, -} - -impl<'a> LimitedRead<'a> { - /// Creates a new Read wrapping the underlying one, erroring once the - /// max_read bytes has been reached. - pub fn new(source: &mut Read, max_read: usize) -> LimitedRead { - LimitedRead { - counter: 0, - max: max_read, - source: source, - } - } -} - -impl<'a> Read for LimitedRead<'a> { - fn read(&mut self, buf: &mut [u8]) -> Result { - let r = self.source.read(buf); - if let Ok(sz) = r { - self.counter += sz; - } - if self.counter > self.max { - Err(io::Error::new(io::ErrorKind::Interrupted, "Reached read limit.")) - } else { - r - } - } -} - -/// A Write implementation that counts the number of bytes wrote to an -/// underlying Write. -struct CountingWrite<'a> { - counter: usize, - dest: &'a mut Write, -} - -impl<'a> Write for CountingWrite<'a> { - fn write(&mut self, buf: &[u8]) -> Result { - let w = self.dest.write(buf); - if let Ok(sz) = w { - self.counter += sz; - } - w - } - fn flush(&mut self) -> Result<()> { - self.dest.flush() - } -} diff --git a/p2p/src/server.rs b/p2p/src/server.rs index 12afefaaa..c5b2afbd1 100644 --- a/p2p/src/server.rs +++ b/p2p/src/server.rs @@ -15,17 +15,16 @@ //! Grin server implementation, accepts incoming connections and connects to //! other peers in the network. -use rand::{self, Rng}; use std::cell::RefCell; -use std::io; -use std::net::{SocketAddr, ToSocketAddrs}; +use std::net::SocketAddr; use std::ops::Deref; -use std::str::FromStr; use std::sync::{Arc, RwLock}; +use std::time::Duration; -use mioco; -use mioco::sync::mpsc::{sync_channel, SyncSender}; -use mioco::tcp::{TcpListener, TcpStream}; +use futures; +use futures::{Future, Stream}; +use tokio_core::net::{TcpListener, TcpStream}; +use tokio_core::reactor; use core::core; use core::ser::Error; @@ -43,8 +42,8 @@ impl NetAdapter for DummyAdapter { /// peers, receiving connections from other peers and keep track of all of them. pub struct Server { config: P2PConfig, - peers: RwLock>>, - stop_send: RefCell>>, + peers: Arc>>>, + stop: RefCell>>, } unsafe impl Sync for Server {} @@ -56,111 +55,100 @@ impl Server { pub fn new(config: P2PConfig) -> Server { Server { config: config, - peers: RwLock::new(Vec::new()), - stop_send: RefCell::new(None), + peers: Arc::new(RwLock::new(Vec::new())), + stop: RefCell::new(None), } } + /// Starts the p2p server. Opens a TCP port to allow incoming /// connections and starts the bootstrapping process to find peers. - pub fn start(&self) -> Result<(), Error> { + pub fn start(&self, h: reactor::Handle) -> Box> { let addr = SocketAddr::new(self.config.host, self.config.port); - let listener = try!(TcpListener::bind(&addr).map_err(&Error::IOErr)); + let socket = TcpListener::bind(&addr, &h.clone()).unwrap(); warn!("P2P server started on {}", addr); let hs = Arc::new(Handshake::new()); - let (stop_send, stop_recv) = sync_channel(1); - { - let mut stop_mut = self.stop_send.borrow_mut(); - *stop_mut = Some(stop_send); - } + let peers = self.peers.clone(); - loop { - select!( - r:listener => { - let conn = try!(listener.accept().map_err(&Error::IOErr)); - let hs = hs.clone(); - - let peer = try!(Peer::accept(conn, &hs)); - let wpeer = Arc::new(peer); - { - let mut peers = self.peers.write().unwrap(); - peers.push(wpeer.clone()); - } - - mioco::spawn(move || -> io::Result<()> { - if let Err(err) = wpeer.run(&DummyAdapter{}) { - error!("{:?}", err); - } - Ok(()) - }); - }, - r:stop_recv => { - stop_recv.recv(); - return Ok(()); - } - ); - } - } - - pub fn connect_peer(&self, addr: A) -> Result<(), Error> { - for sock_addr in addr.to_socket_addrs().unwrap() { - info!("Connecting to peer {}", sock_addr); - let tcp_client = TcpStream::connect(&sock_addr).unwrap(); - let peer = try!(Peer::connect(tcp_client, &Handshake::new()) - .map_err(|_| io::Error::last_os_error())); - - let peer = Arc::new(peer); - let in_peer = peer.clone(); - mioco::spawn(move || -> io::Result<()> { - in_peer.run(&DummyAdapter {}); - Ok(()) + // main peer acceptance future handling handshake + let hp = h.clone(); + let peers = socket.incoming().map_err(|e| Error::IOErr(e)).map(move |(conn, addr)| { + let peers = peers.clone(); + // accept the peer and add it to the server map + let peer_accept = Peer::accept(conn, &hs.clone()).map(move |(conn, peer)| { + let apeer = Arc::new(peer); + let mut peers = peers.write().unwrap(); + peers.push(apeer.clone()); + Ok((conn, apeer)) }); - self.peers.write().unwrap().push(peer); + + // wire in a future to timeout the accept after 5 secs + let timeout = reactor::Timeout::new(Duration::new(5, 0), &hp).unwrap(); + let timed_peer = peer_accept.select(timeout.map(Err).map_err(|e| Error::IOErr(e))) + .then(|res| { + match res { + Ok((Ok((conn, p)), _timeout)) => Ok((conn, p)), + Ok((_, _accept)) => Err(Error::TooLargeReadErr), + Err((e, _other)) => Err(e), + } + }); + + // run the main peer protocol + timed_peer.and_then(|(conn, peer)| peer.clone().run(conn, &DummyAdapter {})) + }); + + // spawn each peer future to its own task + let hs = h.clone(); + let server = peers.for_each(move |peer| { + hs.spawn(peer.then(|res| { + match res { + Err(e) => info!("Client error: {}", e), + _ => {} + } + futures::finished(()) + })); + Ok(()) + }); + + // setup the stopping oneshot on the server and join it with the peer future + let (stop, stop_rx) = futures::sync::oneshot::channel(); + { + let mut stop_mut = self.stop.borrow_mut(); + *stop_mut = Some(stop); } - Ok(()) + Box::new(server.select(stop_rx.map_err(|_| Error::CorruptedData)).then(|res| { + match res { + Ok((_, _)) => Ok(()), + Err((e, _)) => Err(e), + } + })) } - /// Asks all the peers to relay the provided block. A peer may choose to - /// ignore the relay request if it has knowledge that the remote peer - /// already knows the block. - pub fn relay_block(&self, b: &core::Block) -> Result<(), Error> { - let peers = self.peers.write().unwrap(); - for p in peers.deref() { - try!(p.send_block(b)); - } - Ok(()) - } - - /// Asks all the peers to relay the provided transaction. A peer may choose - /// to ignore the relay request if it has knowledge that the remote peer - /// already knows the transaction. - pub fn relay_transaction(&self, tx: &core::Transaction) -> Result<(), Error> { - let peers = self.peers.write().unwrap(); - for p in peers.deref() { - try!(p.send_transaction(tx)); - } - Ok(()) - } - - /// Number of peers this server is connected to. - pub fn peers_count(&self) -> u32 { - self.peers.read().unwrap().len() as u32 - } - - /// Gets a random peer from our set of connected peers. - pub fn get_any_peer(&self) -> Arc { - let mut rng = rand::thread_rng(); - let peers = self.peers.read().unwrap(); - peers[rng.gen_range(0, peers.len())].clone() + pub fn connect_peer(&self, + addr: SocketAddr, + h: reactor::Handle) + -> Box> { + let socket = TcpStream::connect(&addr, &h).map_err(|e| Error::IOErr(e)); + let peers = self.peers.clone(); + let request = socket.and_then(move |socket| { + let peers = peers.clone(); + Peer::connect(socket, &Handshake::new()).map(move |(conn, peer)| { + let apeer = Arc::new(peer); + let mut peers = peers.write().unwrap(); + peers.push(apeer.clone()); + (conn, apeer) + }) + }) + .and_then(|(socket, peer)| peer.run(socket, &DummyAdapter {})); + Box::new(request) } /// Stops the server. Disconnect from all peers at the same time. - pub fn stop(&self) { + pub fn stop(self) { let peers = self.peers.write().unwrap(); for p in peers.deref() { p.stop(); } - let stop_send = self.stop_send.borrow(); - stop_send.as_ref().unwrap().send(0); + self.stop.into_inner().unwrap().complete(()); } } diff --git a/p2p/src/types.rs b/p2p/src/types.rs index b0a6fc83b..e87e8609f 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -13,6 +13,10 @@ // limitations under the License. use std::net::{SocketAddr, IpAddr}; + +use futures::Future; +use tokio_core::net::TcpStream; + use core::core; use core::ser::Error; @@ -29,7 +33,7 @@ impl Default for P2PConfig { let ipaddr = "127.0.0.1".parse().unwrap(); P2PConfig { host: ipaddr, - port: 3414, + port: 13414, } } } @@ -61,7 +65,7 @@ pub trait Protocol { /// be known already, usually passed during construction. Will typically /// block so needs to be called withing a coroutine. Should also be called /// only once. - fn handle(&self, na: &NetAdapter) -> Result<(), Error>; + fn handle(&self, conn: TcpStream, na: &NetAdapter) -> Box>; /// Sends a ping message to the remote peer. fn send_ping(&self) -> Result<(), Error>; diff --git a/p2p/tests/common/mod.rs b/p2p/tests/common/mod.rs deleted file mode 100644 index 77105aef0..000000000 --- a/p2p/tests/common/mod.rs +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright 2016 The Grin Developers -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use mioco; -use mioco::tcp::TcpStream; -use std::io; -use std::sync::Arc; -use std::time; -use p2p; -use p2p::Peer; - -/// Server setup and teardown around the provided closure. -pub fn with_server(closure: F) where F: Fn(Arc) -> io::Result<()>, F: Send + 'static { - mioco::start(move || -> io::Result<()> { - // start a server in its own coroutine - let server = Arc::new(p2p::Server::new()); - let in_server = server.clone(); - mioco::spawn(move || -> io::Result<()> { - try!(in_server.start().map_err(|_| io::Error::last_os_error())); - Ok(()) - }); - - // giving server a little time to start - mioco::sleep(time::Duration::from_millis(50)); - - try!(closure(server.clone())); - - server.stop(); - Ok(()) - }).unwrap().unwrap(); -} - -pub fn connect_peer() -> io::Result> { - let addr = p2p::DEFAULT_LISTEN_ADDR.parse().unwrap(); - let tcp_client = TcpStream::connect(&addr).unwrap(); - let peer = try!(Peer::accept(tcp_client, &p2p::handshake::Handshake::new()).map_err(|_| io::Error::last_os_error())); - mioco::sleep(time::Duration::from_millis(50)); - - let peer = Arc::new(peer); - let in_peer = peer.clone(); - mioco::spawn(move || -> io::Result<()> { - in_peer.run(&p2p::DummyAdapter{}); - Ok(()) - }); - mioco::sleep(time::Duration::from_millis(50)); - Ok(peer.clone()) -} diff --git a/p2p/tests/peer_handshake.rs b/p2p/tests/peer_handshake.rs index 7f67f2e4b..8768aeb77 100644 --- a/p2p/tests/peer_handshake.rs +++ b/p2p/tests/peer_handshake.rs @@ -13,40 +13,58 @@ // limitations under the License. extern crate grin_core as core; -extern crate grin_p2p as p2p; -extern crate mioco; +extern crate grin_p2p_fut as p2p; extern crate env_logger; +extern crate futures; +extern crate tokio_core; -mod common; - -use std::io; +use std::net::SocketAddr; use std::time; -use core::core::*; +use futures::future::Future; +use tokio_core::net::TcpStream; +use tokio_core::reactor::{self, Core}; + +use core::ser; use p2p::Peer; -use common::*; // Starts a server and connects a client peer to it to check handshake, followed by a ping/pong exchange to make sure the connection is live. #[test] fn peer_handshake() { env_logger::init().unwrap(); - with_server(|server| -> io::Result<()> { - // connect a client peer to the server - let peer = try!(connect_peer()); + let mut evtlp = Core::new().unwrap(); + let handle = evtlp.handle(); + let p2p_conf = p2p::P2PConfig::default(); + let server = p2p::Server::new(p2p_conf); + let run_server = server.start(handle.clone()); - // check server peer count - let pc = server.peers_count(); - assert_eq!(pc, 1); + let phandle = handle.clone(); + let rhandle = handle.clone(); + let timeout = reactor::Timeout::new(time::Duration::new(1, 0), &handle).unwrap(); + let timeout_send = reactor::Timeout::new(time::Duration::new(2, 0), &handle).unwrap(); + handle.spawn(timeout.map_err(|e| ser::Error::IOErr(e)).and_then(move |_| { + let p2p_conf = p2p::P2PConfig::default(); + let addr = SocketAddr::new(p2p_conf.host, p2p_conf.port); + let socket = TcpStream::connect(&addr, &phandle).map_err(|e| ser::Error::IOErr(e)); + socket.and_then(move |socket| { + Peer::connect(socket, &p2p::handshake::Handshake::new()) + }).and_then(move |(socket, peer)| { + rhandle.spawn(peer.run(socket, &p2p::DummyAdapter {}).map_err(|e| { + panic!("Client run failed: {}", e); + })); + peer.send_ping().unwrap(); + timeout_send.map_err(|e| ser::Error::IOErr(e)).map(|_| peer) + }).and_then(|peer| { + let (sent, recv) = peer.transmitted_bytes(); + assert!(sent > 0); + assert!(recv > 0); + Ok(()) + }).and_then(|_| {server.stop(); Ok(())}) + }).map_err(|e| { + panic!("Client connection failed: {}", e); + })); - // send a ping and check we got ponged (received data back) - peer.send_ping(); - mioco::sleep(time::Duration::from_millis(50)); - let (sent, recv) = peer.transmitted_bytes(); - assert!(sent > 0); - assert!(recv > 0); + evtlp.run(run_server).unwrap(); - peer.stop(); - Ok(()) - }); } diff --git a/p2p/tests/peer_tx_send.rs b/p2p/tests/peer_tx_send.rs deleted file mode 100644 index 07b86da20..000000000 --- a/p2p/tests/peer_tx_send.rs +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright 2016 The Grin Developers -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -extern crate grin_core as core; -extern crate grin_p2p as p2p; -extern crate mioco; -extern crate env_logger; -extern crate rand; -extern crate secp256k1zkp as secp; - -mod common; - -use rand::Rng; -use rand::os::OsRng; -use std::io; -use std::sync::Arc; -use std::time; - -use mioco::tcp::TcpStream; -use secp::Secp256k1; -use secp::key::SecretKey; - -use core::core::*; -use p2p::Peer; -use common::*; - -// Connects a client peer and send a transaction. -#[test] -fn peer_tx_send() { - with_server(|server| -> io::Result<()> { - // connect a client peer to the server - let peer = try!(connect_peer()); - let tx1 = tx2i1o(); - - peer.send_transaction(&tx1); - mioco::sleep(time::Duration::from_millis(50)); - let (sent,_) = peer.transmitted_bytes(); - assert!(sent > 1000); - - let s_peer = server.get_any_peer(); - let (_, recv) = s_peer.transmitted_bytes(); - assert!(recv > 1000); - - peer.stop(); - - Ok(()) - }); -} - -// utility producing a transaction with 2 inputs and a single outputs -pub fn tx2i1o() -> Transaction { - let mut rng = OsRng::new().unwrap(); - let ref secp = secp::Secp256k1::with_caps(secp::ContextFlag::Commit); - - let outh = core::core::hash::ZERO_HASH; - Transaction::new(vec![Input::OvertInput { - output: outh, - value: 10, - blindkey: SecretKey::new(secp, &mut rng), - }, - Input::OvertInput { - output: outh, - value: 11, - blindkey: SecretKey::new(secp, &mut rng), - }], - vec![Output::OvertOutput { - value: 20, - blindkey: SecretKey::new(secp, &mut rng), - }], - 1).blind(&secp).unwrap() -}