First working test with related fixes demonstrating a full peer handshake.

This commit is contained in:
Ignotus Peverell 2016-10-27 14:28:02 -07:00
parent a9dc8a05ac
commit fdaf2ba6af
No known key found for this signature in database
GPG key ID: 99CD25F39F8F8211
8 changed files with 158 additions and 106 deletions

View file

@ -12,3 +12,6 @@ mioco = "^0.8"
time = "^0.1" time = "^0.1"
grin_core = { path = "../core" } grin_core = { path = "../core" }
[dev-dependencies]
env_logger = "^0.3"

View file

@ -34,8 +34,8 @@ pub struct Handshake {
nonces: RwLock<VecDeque<u64>>, nonces: RwLock<VecDeque<u64>>,
} }
unsafe impl Sync for Handshake{} unsafe impl Sync for Handshake {}
unsafe impl Send for Handshake{} unsafe impl Send for Handshake {}
impl Handshake { impl Handshake {
/// Creates a new handshake handler /// Creates a new handshake handler
@ -44,87 +44,97 @@ impl Handshake {
} }
/// Handles connecting to a new remote peer, starting the version handshake. /// Handles connecting to a new remote peer, starting the version handshake.
pub fn connect<'a>(&'a self, peer: &'a mut PeerConn) -> Result<Box<Protocol+'a>, Error> { pub fn connect<'a>(&'a self, peer: &'a mut PeerConn) -> Result<Box<Protocol + 'a>, Error> {
// get a new nonce that can be used on handshake to detect self-connection // get a new nonce that can be used on handshake to detect self-connection
let nonce = self.next_nonce(); let nonce = self.next_nonce();
// send the first part of the handshake // send the first part of the handshake
let sender_addr = SockAddr(peer.local_addr()); let sender_addr = SockAddr(peer.local_addr());
let receiver_addr = SockAddr(peer.peer_addr()); let receiver_addr = SockAddr(peer.peer_addr());
let opt_err = serialize(peer, let opt_err = serialize(peer,
&Hand { &Hand {
version: PROTOCOL_VERSION, version: PROTOCOL_VERSION,
capabilities: FULL_SYNC, capabilities: FULL_SYNC,
nonce: nonce, nonce: nonce,
sender_addr: sender_addr, sender_addr: sender_addr,
receiver_addr: receiver_addr, receiver_addr: receiver_addr,
user_agent: USER_AGENT.to_string(), user_agent: USER_AGENT.to_string(),
}); });
match opt_err { match opt_err {
Some(err) => return Err(err), Some(err) => return Err(err),
None => {} None => {}
} }
// deserialize the handshake response and do version negotiation // deserialize the handshake response and do version negotiation
let shake = try!(deserialize::<Shake>(peer)); let shake = try!(deserialize::<Shake>(peer));
if shake.version != 1 { if shake.version != 1 {
self.close(peer, ErrCodes::UnsupportedVersion as u32, self.close(peer,
ErrCodes::UnsupportedVersion as u32,
format!("Unsupported version: {}, ours: {})", format!("Unsupported version: {}, ours: {})",
shake.version, shake.version,
PROTOCOL_VERSION)); PROTOCOL_VERSION));
return Err(Error::UnexpectedData{expected: vec![PROTOCOL_VERSION as u8], received: vec![shake.version as u8]}); return Err(Error::UnexpectedData {
expected: vec![PROTOCOL_VERSION as u8],
received: vec![shake.version as u8],
});
} }
peer.capabilities = shake.capabilities; peer.capabilities = shake.capabilities;
peer.user_agent = shake.user_agent; peer.user_agent = shake.user_agent;
info!("Connected to peer {}", peer);
// when more than one protocol version is supported, choosing should go here // when more than one protocol version is supported, choosing should go here
Ok(Box::new(ProtocolV1::new(peer))) Ok(Box::new(ProtocolV1::new(peer)))
} }
/// Handles receiving a connection from a new remote peer that started the /// Handles receiving a connection from a new remote peer that started the
/// version handshake. /// version handshake.
pub fn handshake<'a>(&'a self, peer: &'a mut PeerConn) -> Result<Box<Protocol+'a>, Error> { pub fn handshake<'a>(&'a self, peer: &'a mut PeerConn) -> Result<Box<Protocol + 'a>, Error> {
// deserialize first part of handshake sent to us and do version negotiation // deserialize first part of handshake sent to us and do version negotiation
let hand = try!(deserialize::<Hand>(peer)); let hand = try!(deserialize::<Hand>(peer));
if hand.version != 1 { if hand.version != 1 {
self.close(peer, ErrCodes::UnsupportedVersion as u32, self.close(peer,
ErrCodes::UnsupportedVersion as u32,
format!("Unsupported version: {}, ours: {})", format!("Unsupported version: {}, ours: {})",
hand.version, hand.version,
PROTOCOL_VERSION)); PROTOCOL_VERSION));
return Err(Error::UnexpectedData{expected: vec![PROTOCOL_VERSION as u8], received: vec![hand.version as u8]}); return Err(Error::UnexpectedData {
expected: vec![PROTOCOL_VERSION as u8],
received: vec![hand.version as u8],
});
}
{
// check the nonce to see if we could be trying to connect to ourselves
let nonces = self.nonces.read().unwrap();
if nonces.contains(&hand.nonce) {
return Err(Error::UnexpectedData {
expected: vec![],
received: vec![],
});
}
} }
{
// check the nonce to see if we could be trying to connect to ourselves
let nonces = self.nonces.read().unwrap();
if nonces.contains(&hand.nonce) {
return Err(Error::UnexpectedData {
expected: vec![],
received: vec![],
});
}
}
// all good, keep peer info // all good, keep peer info
peer.capabilities = hand.capabilities; peer.capabilities = hand.capabilities;
peer.user_agent = hand.user_agent; peer.user_agent = hand.user_agent;
// send our reply with our info // send our reply with our info
let opt_err = serialize(peer, let opt_err = serialize(peer,
&Shake { &Shake {
version: PROTOCOL_VERSION, version: PROTOCOL_VERSION,
capabilities: FULL_SYNC, capabilities: FULL_SYNC,
user_agent: USER_AGENT.to_string(), user_agent: USER_AGENT.to_string(),
}); });
match opt_err { match opt_err {
Some(err) => return Err(err), Some(err) => return Err(err),
None => {} None => {}
} }
info!("Received connection from peer {}", peer);
// when more than one protocol version is supported, choosing should go here // when more than one protocol version is supported, choosing should go here
Ok(Box::new(ProtocolV1::new(peer))) Ok(Box::new(ProtocolV1::new(peer)))
} }
/// Generate a new random nonce and store it in our ring buffer /// Generate a new random nonce and store it in our ring buffer
fn next_nonce(&self) -> u64 { fn next_nonce(&self) -> u64 {
let mut rng = OsRng::new().unwrap(); let mut rng = OsRng::new().unwrap();
let nonce = rng.next_u64(); let nonce = rng.next_u64();

View file

@ -37,3 +37,6 @@ mod handshake;
mod protocol; mod protocol;
mod server; mod server;
mod peer; mod peer;
pub use server::Server;
pub use server::DEFAULT_LISTEN_ADDR;

View file

@ -80,20 +80,20 @@ impl Readable<MsgHeader> for MsgHeader {
try!(reader.expect_u8(MAGIC[0])); try!(reader.expect_u8(MAGIC[0]));
try!(reader.expect_u8(MAGIC[1])); try!(reader.expect_u8(MAGIC[1]));
let t = try!(reader.read_u8()); let t = try!(reader.read_u8());
if t < (Type::MaxMsgType as u8) { if t < (Type::MaxMsgType as u8) {
return Err(ser::Error::CorruptedData); return Err(ser::Error::CorruptedData);
} }
Ok(MsgHeader { Ok(MsgHeader {
magic: MAGIC, magic: MAGIC,
msg_type: match t { msg_type: match t {
// TODO this is rather ugly, think of a better way // TODO this is rather ugly, think of a better way
0 => Type::ERROR, 0 => Type::ERROR,
1 => Type::HAND, 1 => Type::HAND,
2 => Type::SHAKE, 2 => Type::SHAKE,
3 => Type::PING, 3 => Type::PING,
4 => Type::PONG, 4 => Type::PONG,
_ => panic!(), _ => panic!(),
}, },
}) })
} }
} }
@ -120,7 +120,7 @@ impl Writeable for Hand {
ser_multiwrite!(writer, ser_multiwrite!(writer,
[write_u32, self.version], [write_u32, self.version],
[write_u32, self.capabilities.bits()], [write_u32, self.capabilities.bits()],
[write_u64, self.nonce]); [write_u64, self.nonce]);
self.sender_addr.write(writer); self.sender_addr.write(writer);
self.receiver_addr.write(writer); self.receiver_addr.write(writer);
writer.write_vec(&mut self.user_agent.clone().into_bytes()) writer.write_vec(&mut self.user_agent.clone().into_bytes())
@ -133,12 +133,12 @@ impl Readable<Hand> for Hand {
let sender_addr = try!(SockAddr::read(reader)); let sender_addr = try!(SockAddr::read(reader));
let receiver_addr = try!(SockAddr::read(reader)); let receiver_addr = try!(SockAddr::read(reader));
let ua = try!(reader.read_vec()); let ua = try!(reader.read_vec());
let user_agent = try!(String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData)); let user_agent = try!(String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData));
let capabilities = try!(Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData)); let capabilities = try!(Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData));
Ok(Hand { Ok(Hand {
version: version, version: version,
capabilities: capabilities, capabilities: capabilities,
nonce: nonce, nonce: nonce,
sender_addr: sender_addr, sender_addr: sender_addr,
receiver_addr: receiver_addr, receiver_addr: receiver_addr,
user_agent: user_agent, user_agent: user_agent,
@ -171,7 +171,7 @@ impl Readable<Shake> for Shake {
fn read(reader: &mut Reader) -> Result<Shake, ser::Error> { fn read(reader: &mut Reader) -> Result<Shake, ser::Error> {
let (version, capab, ua) = ser_multiread!(reader, read_u32, read_u32, read_vec); let (version, capab, ua) = ser_multiread!(reader, read_u32, read_u32, read_vec);
let user_agent = try!(String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData)); let user_agent = try!(String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData));
let capabilities = try!(Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData)); let capabilities = try!(Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData));
Ok(Shake { Ok(Shake {
version: version, version: version,
capabilities: capabilities, capabilities: capabilities,
@ -209,7 +209,9 @@ impl Readable<PeerError> for PeerError {
} }
} }
/// Only necessary so we can implement Readable and Writeable. Rust disallows implementing traits when both types are outside of this crate (which is the case for SocketAddr and Readable/Writeable). /// Only necessary so we can implement Readable and Writeable. Rust disallows
/// implementing traits when both types are outside of this crate (which is the
/// case for SocketAddr and Readable/Writeable).
pub struct SockAddr(pub SocketAddr); pub struct SockAddr(pub SocketAddr);
impl Writeable for SockAddr { impl Writeable for SockAddr {
@ -239,11 +241,25 @@ impl Readable<SockAddr> for SockAddr {
if v4_or_v6 == 0 { if v4_or_v6 == 0 {
let ip = try!(reader.read_fixed_bytes(4)); let ip = try!(reader.read_fixed_bytes(4));
let port = try!(reader.read_u16()); let port = try!(reader.read_u16());
Ok(SockAddr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(ip[0], ip[1], ip[2], ip[3]), port)))) Ok(SockAddr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(ip[0],
ip[1],
ip[2],
ip[3]),
port))))
} else { } else {
let ip = try_oap_vec!([0..8], |_| reader.read_u16()); let ip = try_oap_vec!([0..8], |_| reader.read_u16());
let port = try!(reader.read_u16()); let port = try!(reader.read_u16());
Ok(SockAddr(SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::new(ip[0], ip[1], ip[2], ip[3], ip[4], ip[5], ip[6], ip[7]), port, 0, 0)))) Ok(SockAddr(SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::new(ip[0],
ip[1],
ip[2],
ip[3],
ip[4],
ip[5],
ip[6],
ip[7]),
port,
0,
0))))
} }
} }
} }

View file

@ -12,8 +12,9 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::net::SocketAddr; use std::fmt;
use std::io::{self, Read, Write}; use std::io::{self, Read, Write};
use std::net::SocketAddr;
use mioco::tcp::{TcpStream, Shutdown}; use mioco::tcp::{TcpStream, Shutdown};
@ -30,6 +31,13 @@ pub struct PeerConn {
pub user_agent: String, pub user_agent: String,
} }
impl fmt::Display for PeerConn {
// This trait requires `fmt` with this exact signature.
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{} {}", self.peer_addr(), self.user_agent)
}
}
/// Make the Peer a Reader for convenient access to the underlying connection. /// Make the Peer a Reader for convenient access to the underlying connection.
/// Allows the peer to track how much is received. /// Allows the peer to track how much is received.
impl Read for PeerConn { impl Read for PeerConn {
@ -44,9 +52,9 @@ impl Write for PeerConn {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> { fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.conn.write(buf) self.conn.write(buf)
} }
fn flush(&mut self) -> io::Result<()> { fn flush(&mut self) -> io::Result<()> {
self.conn.flush() self.conn.flush()
} }
} }
impl Close for PeerConn { impl Close for PeerConn {
@ -85,7 +93,7 @@ impl PeerInfo for PeerConn {
self.conn.peer_addr().unwrap() self.conn.peer_addr().unwrap()
} }
fn local_addr(&self) -> SocketAddr { fn local_addr(&self) -> SocketAddr {
// TODO likely not exactly what we want (private vs public IP) // TODO likely not exactly what we want (private vs public IP)
self.conn.local_addr().unwrap() self.conn.local_addr().unwrap()
} }
} }

View file

@ -33,16 +33,16 @@ impl<'a> Protocol for ProtocolV1<'a> {
} }
impl<'a> ProtocolV1<'a> { impl<'a> ProtocolV1<'a> {
pub fn new(p: &mut PeerConn) -> ProtocolV1 { pub fn new(p: &mut PeerConn) -> ProtocolV1 {
ProtocolV1{peer: p} ProtocolV1 { peer: p }
} }
fn close(&mut self, err_code: u32, explanation: &'static str) { fn close(&mut self, err_code: u32, explanation: &'static str) {
ser::serialize(self.peer, ser::serialize(self.peer,
&PeerError { &PeerError {
code: err_code, code: err_code,
message: explanation.to_string(), message: explanation.to_string(),
}); });
self.peer.close(); self.peer.close();
} }
} }

View file

@ -21,13 +21,14 @@ use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use mioco; use mioco;
use mioco::tcp::TcpListener; use mioco::tcp::{TcpListener, TcpStream};
use core::ser::Error;
use handshake::Handshake; use handshake::Handshake;
use peer::PeerConn; use peer::PeerConn;
use types::*; use types::*;
const DEFAULT_LISTEN_ADDR: &'static str = "127.0.0.1:555"; pub const DEFAULT_LISTEN_ADDR: &'static str = "127.0.0.1:3414";
// replace with some config lookup or something // replace with some config lookup or something
fn listen_addr() -> SocketAddr { fn listen_addr() -> SocketAddr {
@ -43,30 +44,41 @@ pub struct Server {
impl Server { impl Server {
/// Creates a new p2p server. Opens a TCP port to allow incoming /// Creates a new p2p server. Opens a TCP port to allow incoming
/// connections and starts the bootstrapping process to find peers. /// connections and starts the bootstrapping process to find peers.
pub fn new() -> Server { pub fn start() -> Result<Server, Error> {
mioco::start(|| -> io::Result<()> { // TODO TLS
// TODO SSL mioco::spawn(move || -> io::Result<()> {
let addr = "127.0.0.1:3414".parse().unwrap(); let addr = DEFAULT_LISTEN_ADDR.parse().unwrap();
let listener = try!(TcpListener::bind(&addr)); let listener = try!(TcpListener::bind(&addr));
info!("P2P server started on {}", addr); warn!("P2P server started on {}", addr);
let hs = Arc::new(Handshake::new()); let hs = Arc::new(Handshake::new());
loop { loop {
let conn = try!(listener.accept()); let conn = try!(listener.accept());
let hs_child = hs.clone(); let hs_child = hs.clone();
mioco::spawn(move || -> io::Result<()> { mioco::spawn(move || -> io::Result<()> {
let ret = PeerConn::new(conn).handshake(&hs_child, &DummyAdapter {}); let ret = PeerConn::new(conn).handshake(&hs_child, &DummyAdapter {});
if let Some(err) = ret { if let Some(err) = ret {
error!("{:?}", err); error!("{:?}", err);
} }
Ok(()) Ok(())
}); });
} }
}) Ok(())
.unwrap() });
.unwrap(); Ok(Server {})
Server{} }
/// Simulates an unrelated client connecting to our server. Mostly used for
/// tests.
pub fn connect_as_client(addr: SocketAddr) -> Option<Error> {
let tcp_client = TcpStream::connect(&addr).unwrap();
let mut peer = PeerConn::new(tcp_client);
let hs = Handshake::new();
if let Err(e) = hs.connect(&mut peer) {
return Some(e);
}
None
} }
} }

View file

@ -23,9 +23,9 @@ pub trait Close {
/// General information about a connected peer that's useful to other modules. /// General information about a connected peer that's useful to other modules.
pub trait PeerInfo { pub trait PeerInfo {
/// Address of the remote peer /// Address of the remote peer
fn peer_addr(&self) -> SocketAddr; fn peer_addr(&self) -> SocketAddr;
/// Our address, communicated to other peers /// Our address, communicated to other peers
fn local_addr(&self) -> SocketAddr; fn local_addr(&self) -> SocketAddr;
} }