Preliminary impl of p2p package compiles now. Next step: basic handshake integration test.

This commit is contained in:
Ignotus Peverell 2016-10-26 11:21:45 -07:00
parent b1762cb5f4
commit a9dc8a05ac
No known key found for this signature in database
GPG key ID: 99CD25F39F8F8211
4 changed files with 24 additions and 20 deletions

View file

@ -44,9 +44,11 @@ impl Handshake {
} }
/// Handles connecting to a new remote peer, starting the version handshake. /// Handles connecting to a new remote peer, starting the version handshake.
pub fn connect<'a>(&'a self, peer: &'a mut PeerConn) -> Result<&Protocol, Error> { pub fn connect<'a>(&'a self, peer: &'a mut PeerConn) -> Result<Box<Protocol+'a>, Error> {
// get a new nonce that can be used on handshake to detect self-connection
let nonce = self.next_nonce(); let nonce = self.next_nonce();
// send the first part of the handshake
let sender_addr = SockAddr(peer.local_addr()); let sender_addr = SockAddr(peer.local_addr());
let receiver_addr = SockAddr(peer.peer_addr()); let receiver_addr = SockAddr(peer.peer_addr());
let opt_err = serialize(peer, let opt_err = serialize(peer,
@ -63,9 +65,10 @@ impl Handshake {
None => {} None => {}
} }
// deserialize the handshake response and do version negotiation
let shake = try!(deserialize::<Shake>(peer)); let shake = try!(deserialize::<Shake>(peer));
if shake.version != 1 { if shake.version != 1 {
self.close(peer, ErrCodes::UNSUPPORTED_VERSION as u32, self.close(peer, ErrCodes::UnsupportedVersion as u32,
format!("Unsupported version: {}, ours: {})", format!("Unsupported version: {}, ours: {})",
shake.version, shake.version,
PROTOCOL_VERSION)); PROTOCOL_VERSION));
@ -75,21 +78,23 @@ impl Handshake {
peer.user_agent = shake.user_agent; peer.user_agent = shake.user_agent;
// when more than one protocol version is supported, choosing should go here // when more than one protocol version is supported, choosing should go here
Ok(&ProtocolV1::new(peer)) Ok(Box::new(ProtocolV1::new(peer)))
} }
/// Handles receiving a connection from a new remote peer that started the /// Handles receiving a connection from a new remote peer that started the
/// version handshake. /// version handshake.
pub fn handshake<'a>(&'a self, peer: &'a mut PeerConn) -> Result<&Protocol, Error> { pub fn handshake<'a>(&'a self, peer: &'a mut PeerConn) -> Result<Box<Protocol+'a>, Error> {
// deserialize first part of handshake sent to us and do version negotiation
let hand = try!(deserialize::<Hand>(peer)); let hand = try!(deserialize::<Hand>(peer));
if hand.version != 1 { if hand.version != 1 {
self.close(peer, ErrCodes::UNSUPPORTED_VERSION as u32, self.close(peer, ErrCodes::UnsupportedVersion as u32,
format!("Unsupported version: {}, ours: {})", format!("Unsupported version: {}, ours: {})",
hand.version, hand.version,
PROTOCOL_VERSION)); PROTOCOL_VERSION));
return Err(Error::UnexpectedData{expected: vec![PROTOCOL_VERSION as u8], received: vec![hand.version as u8]}); return Err(Error::UnexpectedData{expected: vec![PROTOCOL_VERSION as u8], received: vec![hand.version as u8]});
} }
{ {
// check the nonce to see if we could be trying to connect to ourselves
let nonces = self.nonces.read().unwrap(); let nonces = self.nonces.read().unwrap();
if nonces.contains(&hand.nonce) { if nonces.contains(&hand.nonce) {
return Err(Error::UnexpectedData { return Err(Error::UnexpectedData {
@ -99,9 +104,11 @@ impl Handshake {
} }
} }
// all good, keep peer info
peer.capabilities = hand.capabilities; peer.capabilities = hand.capabilities;
peer.user_agent = hand.user_agent; peer.user_agent = hand.user_agent;
// send our reply with our info
let opt_err = serialize(peer, let opt_err = serialize(peer,
&Shake { &Shake {
version: PROTOCOL_VERSION, version: PROTOCOL_VERSION,
@ -114,9 +121,10 @@ impl Handshake {
} }
// when more than one protocol version is supported, choosing should go here // when more than one protocol version is supported, choosing should go here
Ok(&ProtocolV1::new(peer)) Ok(Box::new(ProtocolV1::new(peer)))
} }
/// Generate a new random nonce and store it in our ring buffer
fn next_nonce(&self) -> u64 { fn next_nonce(&self) -> u64 {
let mut rng = OsRng::new().unwrap(); let mut rng = OsRng::new().unwrap();
let nonce = rng.next_u64(); let nonce = rng.next_u64();

View file

@ -28,7 +28,7 @@ const MAGIC: [u8; 2] = [0x1e, 0xc5];
/// Codes for each error that can be produced reading a message. /// Codes for each error that can be produced reading a message.
pub enum ErrCodes { pub enum ErrCodes {
UNSUPPORTED_VERSION = 100, UnsupportedVersion = 100,
} }
bitflags! { bitflags! {
@ -50,7 +50,7 @@ pub enum Type {
PING, PING,
PONG, PONG,
/// Never used over the network but to detect unrecognized types. /// Never used over the network but to detect unrecognized types.
MAX_MSG_TYPE, MaxMsgType,
} }
/// Header of any protocol message, used to identify incoming messages. /// Header of any protocol message, used to identify incoming messages.
@ -61,7 +61,7 @@ pub struct MsgHeader {
impl MsgHeader { impl MsgHeader {
pub fn acceptable(&self) -> bool { pub fn acceptable(&self) -> bool {
(self.msg_type as u8) < (Type::MAX_MSG_TYPE as u8) (self.msg_type as u8) < (Type::MaxMsgType as u8)
} }
} }
@ -80,7 +80,7 @@ impl Readable<MsgHeader> for MsgHeader {
try!(reader.expect_u8(MAGIC[0])); try!(reader.expect_u8(MAGIC[0]));
try!(reader.expect_u8(MAGIC[1])); try!(reader.expect_u8(MAGIC[1]));
let t = try!(reader.read_u8()); let t = try!(reader.read_u8());
if t < (Type::MAX_MSG_TYPE as u8) { if t < (Type::MaxMsgType as u8) {
return Err(ser::Error::CorruptedData); return Err(ser::Error::CorruptedData);
} }
Ok(MsgHeader { Ok(MsgHeader {

View file

@ -13,13 +13,12 @@
// limitations under the License. // limitations under the License.
use std::net::SocketAddr; use std::net::SocketAddr;
use std::io::{self, Read, Write, BufReader}; use std::io::{self, Read, Write};
use mioco::tcp::{TcpListener, TcpStream, Shutdown}; use mioco::tcp::{TcpStream, Shutdown};
use time::Duration;
use core::ser::{serialize, deserialize, Error};
use handshake::Handshake; use handshake::Handshake;
use core::ser::Error;
use msg::*; use msg::*;
use types::*; use types::*;
@ -27,7 +26,6 @@ use types::*;
/// low-level network communication and tracks peer information. /// low-level network communication and tracks peer information.
pub struct PeerConn { pub struct PeerConn {
conn: TcpStream, conn: TcpStream,
reader: BufReader<TcpStream>,
pub capabilities: Capabilities, pub capabilities: Capabilities,
pub user_agent: String, pub user_agent: String,
} }
@ -36,7 +34,7 @@ pub struct PeerConn {
/// Allows the peer to track how much is received. /// Allows the peer to track how much is received.
impl Read for PeerConn { impl Read for PeerConn {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.reader.read(buf) self.conn.read(buf)
} }
} }
@ -66,7 +64,6 @@ impl PeerConn {
PeerConn { PeerConn {
conn: conn, conn: conn,
reader: BufReader::new(conn),
capabilities: UNKNOWN, capabilities: UNKNOWN,
user_agent: "".to_string(), user_agent: "".to_string(),
} }

View file

@ -21,9 +21,8 @@ use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use mioco; use mioco;
use mioco::tcp::{TcpListener, TcpStream, Shutdown}; use mioco::tcp::TcpListener;
use core::ser::{serialize, deserialize};
use handshake::Handshake; use handshake::Handshake;
use peer::PeerConn; use peer::PeerConn;
use types::*; use types::*;
@ -54,7 +53,7 @@ impl Server {
let hs = Arc::new(Handshake::new()); let hs = Arc::new(Handshake::new());
loop { loop {
let mut conn = try!(listener.accept()); let conn = try!(listener.accept());
let hs_child = hs.clone(); let hs_child = hs.clone();
mioco::spawn(move || -> io::Result<()> { mioco::spawn(move || -> io::Result<()> {