diff --git a/p2p/src/handshake.rs b/p2p/src/handshake.rs index 659c571c3..3f517914a 100644 --- a/p2p/src/handshake.rs +++ b/p2p/src/handshake.rs @@ -44,9 +44,11 @@ impl Handshake { } /// Handles connecting to a new remote peer, starting the version handshake. - pub fn connect<'a>(&'a self, peer: &'a mut PeerConn) -> Result<&Protocol, Error> { + pub fn connect<'a>(&'a self, peer: &'a mut PeerConn) -> Result, Error> { + // get a new nonce that can be used on handshake to detect self-connection let nonce = self.next_nonce(); + // send the first part of the handshake let sender_addr = SockAddr(peer.local_addr()); let receiver_addr = SockAddr(peer.peer_addr()); let opt_err = serialize(peer, @@ -63,9 +65,10 @@ impl Handshake { None => {} } + // deserialize the handshake response and do version negotiation let shake = try!(deserialize::(peer)); if shake.version != 1 { - self.close(peer, ErrCodes::UNSUPPORTED_VERSION as u32, + self.close(peer, ErrCodes::UnsupportedVersion as u32, format!("Unsupported version: {}, ours: {})", shake.version, PROTOCOL_VERSION)); @@ -75,21 +78,23 @@ impl Handshake { peer.user_agent = shake.user_agent; // when more than one protocol version is supported, choosing should go here - Ok(&ProtocolV1::new(peer)) + Ok(Box::new(ProtocolV1::new(peer))) } /// Handles receiving a connection from a new remote peer that started the /// version handshake. - pub fn handshake<'a>(&'a self, peer: &'a mut PeerConn) -> Result<&Protocol, Error> { + pub fn handshake<'a>(&'a self, peer: &'a mut PeerConn) -> Result, Error> { + // deserialize first part of handshake sent to us and do version negotiation let hand = try!(deserialize::(peer)); if hand.version != 1 { - self.close(peer, ErrCodes::UNSUPPORTED_VERSION as u32, + self.close(peer, ErrCodes::UnsupportedVersion as u32, format!("Unsupported version: {}, ours: {})", hand.version, PROTOCOL_VERSION)); return Err(Error::UnexpectedData{expected: vec![PROTOCOL_VERSION as u8], received: vec![hand.version as u8]}); } { + // check the nonce to see if we could be trying to connect to ourselves let nonces = self.nonces.read().unwrap(); if nonces.contains(&hand.nonce) { return Err(Error::UnexpectedData { @@ -99,9 +104,11 @@ impl Handshake { } } + // all good, keep peer info peer.capabilities = hand.capabilities; peer.user_agent = hand.user_agent; + // send our reply with our info let opt_err = serialize(peer, &Shake { version: PROTOCOL_VERSION, @@ -114,9 +121,10 @@ impl Handshake { } // when more than one protocol version is supported, choosing should go here - Ok(&ProtocolV1::new(peer)) + Ok(Box::new(ProtocolV1::new(peer))) } + /// Generate a new random nonce and store it in our ring buffer fn next_nonce(&self) -> u64 { let mut rng = OsRng::new().unwrap(); let nonce = rng.next_u64(); diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index 915deef94..0488e033e 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -28,7 +28,7 @@ const MAGIC: [u8; 2] = [0x1e, 0xc5]; /// Codes for each error that can be produced reading a message. pub enum ErrCodes { - UNSUPPORTED_VERSION = 100, + UnsupportedVersion = 100, } bitflags! { @@ -50,7 +50,7 @@ pub enum Type { PING, PONG, /// Never used over the network but to detect unrecognized types. - MAX_MSG_TYPE, + MaxMsgType, } /// Header of any protocol message, used to identify incoming messages. @@ -61,7 +61,7 @@ pub struct MsgHeader { impl MsgHeader { pub fn acceptable(&self) -> bool { - (self.msg_type as u8) < (Type::MAX_MSG_TYPE as u8) + (self.msg_type as u8) < (Type::MaxMsgType as u8) } } @@ -80,7 +80,7 @@ impl Readable for MsgHeader { try!(reader.expect_u8(MAGIC[0])); try!(reader.expect_u8(MAGIC[1])); let t = try!(reader.read_u8()); - if t < (Type::MAX_MSG_TYPE as u8) { + if t < (Type::MaxMsgType as u8) { return Err(ser::Error::CorruptedData); } Ok(MsgHeader { diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 88a72be74..0ee26b24c 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -13,13 +13,12 @@ // limitations under the License. use std::net::SocketAddr; -use std::io::{self, Read, Write, BufReader}; +use std::io::{self, Read, Write}; -use mioco::tcp::{TcpListener, TcpStream, Shutdown}; -use time::Duration; +use mioco::tcp::{TcpStream, Shutdown}; -use core::ser::{serialize, deserialize, Error}; use handshake::Handshake; +use core::ser::Error; use msg::*; use types::*; @@ -27,7 +26,6 @@ use types::*; /// low-level network communication and tracks peer information. pub struct PeerConn { conn: TcpStream, - reader: BufReader, pub capabilities: Capabilities, pub user_agent: String, } @@ -36,7 +34,7 @@ pub struct PeerConn { /// Allows the peer to track how much is received. impl Read for PeerConn { fn read(&mut self, buf: &mut [u8]) -> io::Result { - self.reader.read(buf) + self.conn.read(buf) } } @@ -66,7 +64,6 @@ impl PeerConn { PeerConn { conn: conn, - reader: BufReader::new(conn), capabilities: UNKNOWN, user_agent: "".to_string(), } diff --git a/p2p/src/server.rs b/p2p/src/server.rs index d69c31413..e39c349a5 100644 --- a/p2p/src/server.rs +++ b/p2p/src/server.rs @@ -21,9 +21,8 @@ use std::str::FromStr; use std::sync::Arc; use mioco; -use mioco::tcp::{TcpListener, TcpStream, Shutdown}; +use mioco::tcp::TcpListener; -use core::ser::{serialize, deserialize}; use handshake::Handshake; use peer::PeerConn; use types::*; @@ -54,7 +53,7 @@ impl Server { let hs = Arc::new(Handshake::new()); loop { - let mut conn = try!(listener.accept()); + let conn = try!(listener.accept()); let hs_child = hs.clone(); mioco::spawn(move || -> io::Result<()> {