From 4657b09c4e687c61ce1f66ccb6c88fc011fb7e3c Mon Sep 17 00:00:00 2001 From: Ignotus Peverell Date: Mon, 24 Oct 2016 21:35:10 -0700 Subject: [PATCH] WIP: more p2p code, mostly interaction between peer, protocol and server as well as ser/deser for first message types. --- core/src/ser.rs | 2 + p2p/src/lib.rs | 2 + p2p/src/msg.rs | 169 +++++++++++++++++++++++++++++++++++++++++++- p2p/src/peer.rs | 128 +++++++++++++++++++++++++++++++++ p2p/src/protocol.rs | 45 ++++++++++++ p2p/src/server.rs | 76 +------------------- p2p/src/types.rs | 33 +++++++++ 7 files changed, 378 insertions(+), 77 deletions(-) create mode 100644 p2p/src/peer.rs create mode 100644 p2p/src/protocol.rs create mode 100644 p2p/src/types.rs diff --git a/core/src/ser.rs b/core/src/ser.rs index 41049dac9..94c84a873 100644 --- a/core/src/ser.rs +++ b/core/src/ser.rs @@ -33,6 +33,8 @@ pub enum Error { expected: Vec, received: Vec, }, + /// Data wasn't in a consumable format + CorruptedData, /// When asked to read too much data TooLargeReadErr(String), } diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index c4e657ad9..bbb001763 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -27,3 +27,5 @@ extern crate mioco; mod msg; mod server; +mod peer; +mod protocol; diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index 9d3901209..268373354 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -16,20 +16,73 @@ use std::net::SocketAddr; -use core::ser::{Writeable, Readable, Writer, Reader}; +use core::ser::{Writeable, Readable, Writer, Reader, Error}; -mod ErrCodes { - const UNSUPPORTED_VERSION: u32 = 100; +/// Magic number expected in the header of every message +const MAGIC: [u8; 2] = [0x1e, 0xc5]; + +/// Codes for each error that can be produced reading a message. +enum ErrCodes { + UNSUPPORTED_VERSION = 100, } bitflags! { /// Options for block validation pub flags Capabilities: u32 { + /// We don't know (yet) what the peer can do. + const UNKNOWN = 0b00000000, /// Runs with the easier version of the Proof of Work, mostly to make testing easier. const FULL_SYNC = 0b00000001, } } +/// Types of messages +enum Type { + HAND = 1, + SHAKE = 2, + ERROR = 3, + /// Never actually used over the network but used to detect unrecognized + /// types. + /// Increment as needed. + MAX_MSG_TYPE = 4, +} + +/// Header of any protocol message, used to identify incoming messages. +pub struct MsgHeader { + magic: [u8; 2], + msg_type: Type, +} + +impl MsgHeader { + fn acceptable(&self) -> bool { + msg_type < MAX_MSG_TYPE; + } +} + +impl Writeable for MsgHeader { + fn write(&self, writer: &mut Writer) -> Option { + ser_multiwrite!(writer, + [write_u8, self.magic[0]], + [write_u8, self.magic[1]], + [write_u8, self.msg_type as u8]); + None + } +} + +impl Readable for MsgHeader { + fn read(reader: &mut Reader) -> Result { + try!(reader.expect_u8(MAGIC[0])); + try!(reader.expect_u8(MAGIC[1])); + let t = try!(reader.read_u8()); + Ok(MsgHeader { + magic: MAGIC, + msg_type: t, + }) + } +} + +/// First part of a handshake, sender advertises its version and +/// characteristics. pub struct Hand { version: u32, capabilities: Capabilities, @@ -38,13 +91,123 @@ pub struct Hand { user_agent: String, } +impl Writeable for Hand { + fn write(&self, writer: &mut Writer) -> Option { + ser_multiwrite!(writer, + [write_u32, self.version], + [write_u32, self.capabilities]); + sender_addr.write(writer); + receiver_addr.write(writer); + writer.write_vec(&mut self.user_agent.into_bytes()) + } +} + +impl Readable for Hand { + fn read(reader: &mut Reader) -> Result { + let (version, capab) = ser_multiread!(reader, read_u32, read_u32); + let sender_addr = SocketAddr::read(reader); + let receiver_addr = SocketAddr::read(reader); + let user_agent = reader.read_vec(); + Hand { + version: version, + capabilities: capab, + server_addr: sender_addr, + receiver_addr: receiver_addr, + user_agent: user_agent, + } + } +} + +/// Second part of a handshake, receiver of the first part replies with its own +/// version and characteristics. pub struct Shake { version: u32, capabilities: Capabilities, user_agent: String, } +impl Writeable for MsgHeader { + fn write(&self, writer: &mut Writer) -> Option { + ser_multiwrite!(writer, + [write_u32, self.version], + [write_u32, self.capabilities], + [write_vec, self.user_agent.as_mut_vec()]); + None + } +} + +impl Readable for Shake { + fn read(reader: &mut Reader) -> Result { + let (version, capab, ua) = ser_multiread!(reader, read_u32, read_u32, read_vec); + let user_agent = try!(String::from_utf8(ua).map_err(|_| ser::Error: CorruptedData)); + Hand { + version: version, + capabilities: capab, + user_agent: user_agent, + } + } +} + +/// We found some issue in the communication, sending an error back, usually +/// followed by closing the connection. pub struct PeerError { code: u32, message: String, } + +impl Writeable for PeerError { + fn write(&self, writer: &mut Writer) -> Option { + ser_multiwrite!(writer, + [write_u32, self.code], + [write_vec, &mut self.message.into_bytes()]); + None + } +} + +impl Readable for PeerError { + fn read(reader: &mut Reader) -> Result { + let (code, msg) = ser_multiread!(reader, read_u32, read_vec); + let message = try!(String::from_utf8(msg).map_err(|_| ser::Error: CorruptedData)); + PeerError { + code: code, + message: message, + } + } +} + +impl Writeable for SocketAddr { + fn write(&self, writer: &mut Writer) -> Option { + match self { + V4(sav4) => { + ser_multiwrite!(writer, + [write_u8, 0], + [write_fixed_bytes, sav4.ip().octets()], + [write_u16, sav4.port()]); + } + V6(sav6) => { + try_m(writer.write_u8(1)); + for seg in sav6.ip().segments() { + try_m(writer.write_u16(seg)); + } + try_m(writer.write_u16(sav6.port())); + } + } + None + } +} + +impl Readable for SocketAddr { + fn read(reader: &mut Reader) -> Result { + let v4_or_v6 = reader.read_u8(); + if v4_or_v6 == 0 { + let ip = reader.read_fixed_bytes(4); + let port = reader.read_u16(); + SocketAddrV4::new(Ipv4Addr::new(ip[0], ip[1], ip[2], ip[3]), port) + } else { + let ip = [0..8].map(|_| reader.read_u16()).collect::>(); + let port = reader.read_u16(); + SocketAddrV6::new(Ipv6Addr::new(ip[0], ip[1], ip[2], ip[3], ip[4], ip[5], ip[6], ip[7]), + port) + } + } +} diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs new file mode 100644 index 000000000..09ea42cea --- /dev/null +++ b/p2p/src/peer.rs @@ -0,0 +1,128 @@ +// Copyright 2016 The Grin Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use str::net::SocketAddr; +use std::str::FromStr; +use std::io::{Read, Write}; + +use time::Duration; + +use mioco::tcp::{TcpListener, TcpStream, Shutdown}; +use core::ser::{serialize, deserialize}; + +const PROTOCOL_VERSION: u32 = 1; +const USER_AGENT: &'static str = "MW/Grin 0.1"; + +/// The local representation of a remotely connected peer. Handles most +/// low-level network communication and tracks peer information. +struct Peer { + conn: TcpStream, + reader: BufReader, + capabilities: Capabilities, + user_agent: String, +} + +/// Make the Peer a Reader for convenient access to the underlying connection. +/// Allows the peer to track how much is received. +impl Read for Peer { + fn read(&mut self, buf: &mut [u8]) -> Result { + self.reader.read(buf) + } +} + +/// Make the Peer a Writer for convenient access to the underlying connection. +/// Allows the peer to track how much is sent. +impl Write for Peer { + fn write(&mut self, buf: &[u8]) -> Result { + self.conf.write(buf) + } +} + +impl Close for Peer { + fn close() { + self.conn.shutdown(Shutdown::Both); + } +} + +impl Peer { + /// Create a new local peer instance connected to a remote peer with the + /// provided TcpStream. + fn new(conn: TcpStream) -> Peer { + // don't wait on read for more than 2 seconds by default + conn.set_read_timeout(Some(Duration::seconds(2))); + + Peer { + conn: conn, + reader: BufReader::new(conn), + capabilities: UNKNOWN, + user_agent: "", + } + } + + /// Handles connecting to a new remote peer, starting the version handshake. + fn connect(&mut self) -> Result { + serialize(self.peer, + &Hand { + version: PROTOCOL_VERSION, + capabilities: FULL_SYNC, + sender_addr: listen_addr(), + receiver_addr: self.peer.peer_addr(), + user_agent: USER_AGENT, + }); + let shake = deserialize(self.peer); + if shake.version != 1 { + self.close(ErrCodes::UNSUPPORTED_VERSION, + format!("Unsupported version: {}, ours: {})", + shake.version, + PROTOCOL_VERSION)); + return; + } + self.capabilities = shake.capabilities; + self.user_agent = shake.user_agent; + + // when more than one protocol version is supported, choosing should go here + ProtocolV1::new(&self); + } + + /// Handles receiving a connection from a new remote peer that started the + /// version handshake. + fn handshake(&mut self) -> Result { + let hand = deserialize(self.peer); + if hand.version != 1 { + self.close(ErrCodes::UNSUPPORTED_VERSION, + format!("Unsupported version: {}, ours: {})", + hand.version, + PROTOCOL_VERSION)); + return; + } + + self.peer.capabilities = hand.capabilities; + self.peer.user_agent = hand.user_agent; + + serialize(self.peer, + &Shake { + version: PROTOCOL_VERSION, + capabilities: FULL_SYNC, + user_agent: USER_AGENT, + }); + self.accept_loop(); + + // when more than one protocol version is supported, choosing should go here + ProtocolV1::new(&self); + } + + fn peer_addr(&self) -> SocketAddr { + self.conn.peer_addr() + } +} diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs new file mode 100644 index 000000000..57db11303 --- /dev/null +++ b/p2p/src/protocol.rs @@ -0,0 +1,45 @@ +// Copyright 2016 The Grin Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use types::*; +use core::ser; + +pub struct ProtocolV1 { + comm: &mut Comm, +} + +impl Protocol for ProtocolV1 { + fn new(p: &mut Comm) -> Protocol { + Protocol { comm: p } + } + fn handle(&self, server: &Server) { + loop { + let header = ser::deserialize::(); + if !header.acceptable() { + continue; + } + } + } +} + +impl ProtocolV1 { + fn close(err_code: u32, explanation: &'static str) { + serialize(self.peer, + &Err { + code: err_code, + message: explanation, + }); + self.comm.close(); + } +} diff --git a/p2p/src/server.rs b/p2p/src/server.rs index 25d2d3e1e..274652d8a 100644 --- a/p2p/src/server.rs +++ b/p2p/src/server.rs @@ -13,11 +13,7 @@ // limitations under the License. //! Grin server implementation, accepts incoming connections and connects to -//! other peers in the network, handling handshake and message receive/send. - -use str::net::SocketAddr; -use std::str::FromStr; -use time::Duration; +//! other peers in the network. use mioco::tcp::{TcpListener, TcpStream, Shutdown}; @@ -25,80 +21,12 @@ use core::ser::{serialize, deserialize}; use msg::*; const DEFAULT_LISTEN_ADDR: &'static str = "127.0.0.1:555"; -const PROTOCOL_VERSION: u32 = 1; -const USER_AGENT: &'static str = "MW/Grin 0.1"; // replace with some config lookup or something fn listen_addr() -> SocketAddr { FromStr::from_str(DEFAULT_LISTEN_ADDR).unwrap() } -/// The local representation of a remotely connected peer. Handles most -/// low-level network communication. -struct Peer { - conn: TcpStream, - reader: BufReader, - capabilities: Capabilities, - user_agent: String, -} - -impl Peer { - /// Create a new local peer instance connected to a remote peer with the - /// provided TcpStream. - fn new(conn: TcpStream) -> Peer { - // don't wait on read for more than 2 seconds by default - conn.set_read_timeout(Some(Duration::seconds(2))); - - Peer { - conn: conn, - reader: BufReader::new(conn), - } - } - - /// Handles connecting to a new remote peer, starting the version handshake. - fn connect(&mut self) { - serialize(self.conn, - &Hand { - version: PROTOCOL_VERSION, - capabilities: FULL_SYNC, - sender_addr: listen_addr(), - receiver_addr: conn.peer_addr(), - user_agent: USER_AGENT, - }); - let shake = deserialize(self.reader); - if shake.version != 1 { - self.close(ErrCodes::UNSUPPORTED_VERSION, - format!("Unsupported version: {}, ours: {})", - shake.version, - PROTOCOL_VERSION)); - return; - } - self.capabilities = shake.capabilities; - self.user_agent = shake.user_agent; - - self.accept_loop(); - } - - /// Handles receiving a connection from a new remote peer that started the - /// version handshake. - fn handshake(&mut self) {} - - fn accept_loop(&mut self) { - loop { - let msg = deserialize(self.reader); - } - } - - fn close(err_code: u32, explanation: &'static str) { - serialize(self.conn, - &Err { - code: err_code, - message: explanation, - }); - self.conn.shutdown(Shutdown::Both); - } -} - pub struct Server { } @@ -114,7 +42,7 @@ impl Server { loop { let mut conn = try!(listener.accept()); mioco::spawn(move || -> io::Result<()> { - Peer::new(conn).connect(); + Peer::new(conn).handshake(); }); } }) diff --git a/p2p/src/types.rs b/p2p/src/types.rs new file mode 100644 index 000000000..03b751df8 --- /dev/null +++ b/p2p/src/types.rs @@ -0,0 +1,33 @@ +// Copyright 2016 The Grin Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io::{Read, Write}; + +/// Trait for pre-emptively and forcefully closing an underlying resource. +trait Close { + fn close(); +} + +/// Main trait we expect a peer to implement to be usable by a Protocol. +trait Comm : Read + Write + Close; + +/// A given communication protocol agreed upon between 2 peers (usually ourselves and a remove) after handshake. +trait Protocol { + /// Instantiate providing a reader and writer allowing to communicate to the other peer. + fn new(p: &mut Comm); + + /// Starts handling protocol communication, the peer(s) is expected to be known already, usually passed during construction. + fn handle(&self, server: &Server); +} +