WIP: more p2p code, mostly interaction between peer, protocol and server as well as ser/deser for first message types.

This commit is contained in:
Ignotus Peverell 2016-10-24 21:35:10 -07:00
parent 9f780f6865
commit 4657b09c4e
No known key found for this signature in database
GPG key ID: 99CD25F39F8F8211
7 changed files with 378 additions and 77 deletions

View file

@ -33,6 +33,8 @@ pub enum Error {
expected: Vec<u8>,
received: Vec<u8>,
},
/// Data wasn't in a consumable format
CorruptedData,
/// When asked to read too much data
TooLargeReadErr(String),
}

View file

@ -27,3 +27,5 @@ extern crate mioco;
mod msg;
mod server;
mod peer;
mod protocol;

View file

@ -16,20 +16,73 @@
use std::net::SocketAddr;
use core::ser::{Writeable, Readable, Writer, Reader};
use core::ser::{Writeable, Readable, Writer, Reader, Error};
mod ErrCodes {
const UNSUPPORTED_VERSION: u32 = 100;
/// Magic number expected in the header of every message
const MAGIC: [u8; 2] = [0x1e, 0xc5];
/// Codes for each error that can be produced reading a message.
enum ErrCodes {
UNSUPPORTED_VERSION = 100,
}
bitflags! {
/// Options for block validation
pub flags Capabilities: u32 {
/// We don't know (yet) what the peer can do.
const UNKNOWN = 0b00000000,
/// Runs with the easier version of the Proof of Work, mostly to make testing easier.
const FULL_SYNC = 0b00000001,
}
}
/// Types of messages
enum Type {
HAND = 1,
SHAKE = 2,
ERROR = 3,
/// Never actually used over the network but used to detect unrecognized
/// types.
/// Increment as needed.
MAX_MSG_TYPE = 4,
}
/// Header of any protocol message, used to identify incoming messages.
pub struct MsgHeader {
magic: [u8; 2],
msg_type: Type,
}
impl MsgHeader {
fn acceptable(&self) -> bool {
msg_type < MAX_MSG_TYPE;
}
}
impl Writeable for MsgHeader {
fn write(&self, writer: &mut Writer) -> Option<ser::Error> {
ser_multiwrite!(writer,
[write_u8, self.magic[0]],
[write_u8, self.magic[1]],
[write_u8, self.msg_type as u8]);
None
}
}
impl Readable<MsgHeader> for MsgHeader {
fn read(reader: &mut Reader) -> Result<MsgHeader, ser::Error> {
try!(reader.expect_u8(MAGIC[0]));
try!(reader.expect_u8(MAGIC[1]));
let t = try!(reader.read_u8());
Ok(MsgHeader {
magic: MAGIC,
msg_type: t,
})
}
}
/// First part of a handshake, sender advertises its version and
/// characteristics.
pub struct Hand {
version: u32,
capabilities: Capabilities,
@ -38,13 +91,123 @@ pub struct Hand {
user_agent: String,
}
impl Writeable for Hand {
fn write(&self, writer: &mut Writer) -> Option<ser::Error> {
ser_multiwrite!(writer,
[write_u32, self.version],
[write_u32, self.capabilities]);
sender_addr.write(writer);
receiver_addr.write(writer);
writer.write_vec(&mut self.user_agent.into_bytes())
}
}
impl Readable<Hand> for Hand {
fn read(reader: &mut Reader) -> Result<Hand, ser::Error> {
let (version, capab) = ser_multiread!(reader, read_u32, read_u32);
let sender_addr = SocketAddr::read(reader);
let receiver_addr = SocketAddr::read(reader);
let user_agent = reader.read_vec();
Hand {
version: version,
capabilities: capab,
server_addr: sender_addr,
receiver_addr: receiver_addr,
user_agent: user_agent,
}
}
}
/// Second part of a handshake, receiver of the first part replies with its own
/// version and characteristics.
pub struct Shake {
version: u32,
capabilities: Capabilities,
user_agent: String,
}
impl Writeable for MsgHeader {
fn write(&self, writer: &mut Writer) -> Option<ser::Error> {
ser_multiwrite!(writer,
[write_u32, self.version],
[write_u32, self.capabilities],
[write_vec, self.user_agent.as_mut_vec()]);
None
}
}
impl Readable<Shake> for Shake {
fn read(reader: &mut Reader) -> Result<Shake, ser::Error> {
let (version, capab, ua) = ser_multiread!(reader, read_u32, read_u32, read_vec);
let user_agent = try!(String::from_utf8(ua).map_err(|_| ser::Error: CorruptedData));
Hand {
version: version,
capabilities: capab,
user_agent: user_agent,
}
}
}
/// We found some issue in the communication, sending an error back, usually
/// followed by closing the connection.
pub struct PeerError {
code: u32,
message: String,
}
impl Writeable for PeerError {
fn write(&self, writer: &mut Writer) -> Option<ser::Error> {
ser_multiwrite!(writer,
[write_u32, self.code],
[write_vec, &mut self.message.into_bytes()]);
None
}
}
impl Readable<PeerError> for PeerError {
fn read(reader: &mut Reader) -> Result<PeerError, ser::Error> {
let (code, msg) = ser_multiread!(reader, read_u32, read_vec);
let message = try!(String::from_utf8(msg).map_err(|_| ser::Error: CorruptedData));
PeerError {
code: code,
message: message,
}
}
}
impl Writeable for SocketAddr {
fn write(&self, writer: &mut Writer) -> Option<ser::Error> {
match self {
V4(sav4) => {
ser_multiwrite!(writer,
[write_u8, 0],
[write_fixed_bytes, sav4.ip().octets()],
[write_u16, sav4.port()]);
}
V6(sav6) => {
try_m(writer.write_u8(1));
for seg in sav6.ip().segments() {
try_m(writer.write_u16(seg));
}
try_m(writer.write_u16(sav6.port()));
}
}
None
}
}
impl Readable<SocketAddr> for SocketAddr {
fn read(reader: &mut Reader) -> Result<SocketAddr, ser::Error> {
let v4_or_v6 = reader.read_u8();
if v4_or_v6 == 0 {
let ip = reader.read_fixed_bytes(4);
let port = reader.read_u16();
SocketAddrV4::new(Ipv4Addr::new(ip[0], ip[1], ip[2], ip[3]), port)
} else {
let ip = [0..8].map(|_| reader.read_u16()).collect::<Vec<u16>>();
let port = reader.read_u16();
SocketAddrV6::new(Ipv6Addr::new(ip[0], ip[1], ip[2], ip[3], ip[4], ip[5], ip[6], ip[7]),
port)
}
}
}

128
p2p/src/peer.rs Normal file
View file

@ -0,0 +1,128 @@
// Copyright 2016 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use str::net::SocketAddr;
use std::str::FromStr;
use std::io::{Read, Write};
use time::Duration;
use mioco::tcp::{TcpListener, TcpStream, Shutdown};
use core::ser::{serialize, deserialize};
const PROTOCOL_VERSION: u32 = 1;
const USER_AGENT: &'static str = "MW/Grin 0.1";
/// The local representation of a remotely connected peer. Handles most
/// low-level network communication and tracks peer information.
struct Peer {
conn: TcpStream,
reader: BufReader,
capabilities: Capabilities,
user_agent: String,
}
/// Make the Peer a Reader for convenient access to the underlying connection.
/// Allows the peer to track how much is received.
impl Read for Peer {
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
self.reader.read(buf)
}
}
/// Make the Peer a Writer for convenient access to the underlying connection.
/// Allows the peer to track how much is sent.
impl Write for Peer {
fn write(&mut self, buf: &[u8]) -> Result<usize> {
self.conf.write(buf)
}
}
impl Close for Peer {
fn close() {
self.conn.shutdown(Shutdown::Both);
}
}
impl Peer {
/// Create a new local peer instance connected to a remote peer with the
/// provided TcpStream.
fn new(conn: TcpStream) -> Peer {
// don't wait on read for more than 2 seconds by default
conn.set_read_timeout(Some(Duration::seconds(2)));
Peer {
conn: conn,
reader: BufReader::new(conn),
capabilities: UNKNOWN,
user_agent: "",
}
}
/// Handles connecting to a new remote peer, starting the version handshake.
fn connect(&mut self) -> Result<Protocol, Error> {
serialize(self.peer,
&Hand {
version: PROTOCOL_VERSION,
capabilities: FULL_SYNC,
sender_addr: listen_addr(),
receiver_addr: self.peer.peer_addr(),
user_agent: USER_AGENT,
});
let shake = deserialize(self.peer);
if shake.version != 1 {
self.close(ErrCodes::UNSUPPORTED_VERSION,
format!("Unsupported version: {}, ours: {})",
shake.version,
PROTOCOL_VERSION));
return;
}
self.capabilities = shake.capabilities;
self.user_agent = shake.user_agent;
// when more than one protocol version is supported, choosing should go here
ProtocolV1::new(&self);
}
/// Handles receiving a connection from a new remote peer that started the
/// version handshake.
fn handshake(&mut self) -> Result<Protocol, Error> {
let hand = deserialize(self.peer);
if hand.version != 1 {
self.close(ErrCodes::UNSUPPORTED_VERSION,
format!("Unsupported version: {}, ours: {})",
hand.version,
PROTOCOL_VERSION));
return;
}
self.peer.capabilities = hand.capabilities;
self.peer.user_agent = hand.user_agent;
serialize(self.peer,
&Shake {
version: PROTOCOL_VERSION,
capabilities: FULL_SYNC,
user_agent: USER_AGENT,
});
self.accept_loop();
// when more than one protocol version is supported, choosing should go here
ProtocolV1::new(&self);
}
fn peer_addr(&self) -> SocketAddr {
self.conn.peer_addr()
}
}

45
p2p/src/protocol.rs Normal file
View file

@ -0,0 +1,45 @@
// Copyright 2016 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use types::*;
use core::ser;
pub struct ProtocolV1 {
comm: &mut Comm,
}
impl Protocol for ProtocolV1 {
fn new(p: &mut Comm) -> Protocol {
Protocol { comm: p }
}
fn handle(&self, server: &Server) {
loop {
let header = ser::deserialize::<MsgHeader>();
if !header.acceptable() {
continue;
}
}
}
}
impl ProtocolV1 {
fn close(err_code: u32, explanation: &'static str) {
serialize(self.peer,
&Err {
code: err_code,
message: explanation,
});
self.comm.close();
}
}

View file

@ -13,11 +13,7 @@
// limitations under the License.
//! Grin server implementation, accepts incoming connections and connects to
//! other peers in the network, handling handshake and message receive/send.
use str::net::SocketAddr;
use std::str::FromStr;
use time::Duration;
//! other peers in the network.
use mioco::tcp::{TcpListener, TcpStream, Shutdown};
@ -25,80 +21,12 @@ use core::ser::{serialize, deserialize};
use msg::*;
const DEFAULT_LISTEN_ADDR: &'static str = "127.0.0.1:555";
const PROTOCOL_VERSION: u32 = 1;
const USER_AGENT: &'static str = "MW/Grin 0.1";
// replace with some config lookup or something
fn listen_addr() -> SocketAddr {
FromStr::from_str(DEFAULT_LISTEN_ADDR).unwrap()
}
/// The local representation of a remotely connected peer. Handles most
/// low-level network communication.
struct Peer {
conn: TcpStream,
reader: BufReader,
capabilities: Capabilities,
user_agent: String,
}
impl Peer {
/// Create a new local peer instance connected to a remote peer with the
/// provided TcpStream.
fn new(conn: TcpStream) -> Peer {
// don't wait on read for more than 2 seconds by default
conn.set_read_timeout(Some(Duration::seconds(2)));
Peer {
conn: conn,
reader: BufReader::new(conn),
}
}
/// Handles connecting to a new remote peer, starting the version handshake.
fn connect(&mut self) {
serialize(self.conn,
&Hand {
version: PROTOCOL_VERSION,
capabilities: FULL_SYNC,
sender_addr: listen_addr(),
receiver_addr: conn.peer_addr(),
user_agent: USER_AGENT,
});
let shake = deserialize(self.reader);
if shake.version != 1 {
self.close(ErrCodes::UNSUPPORTED_VERSION,
format!("Unsupported version: {}, ours: {})",
shake.version,
PROTOCOL_VERSION));
return;
}
self.capabilities = shake.capabilities;
self.user_agent = shake.user_agent;
self.accept_loop();
}
/// Handles receiving a connection from a new remote peer that started the
/// version handshake.
fn handshake(&mut self) {}
fn accept_loop(&mut self) {
loop {
let msg = deserialize(self.reader);
}
}
fn close(err_code: u32, explanation: &'static str) {
serialize(self.conn,
&Err {
code: err_code,
message: explanation,
});
self.conn.shutdown(Shutdown::Both);
}
}
pub struct Server {
}
@ -114,7 +42,7 @@ impl Server {
loop {
let mut conn = try!(listener.accept());
mioco::spawn(move || -> io::Result<()> {
Peer::new(conn).connect();
Peer::new(conn).handshake();
});
}
})

33
p2p/src/types.rs Normal file
View file

@ -0,0 +1,33 @@
// Copyright 2016 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::io::{Read, Write};
/// Trait for pre-emptively and forcefully closing an underlying resource.
trait Close {
fn close();
}
/// Main trait we expect a peer to implement to be usable by a Protocol.
trait Comm : Read + Write + Close;
/// A given communication protocol agreed upon between 2 peers (usually ourselves and a remove) after handshake.
trait Protocol {
/// Instantiate providing a reader and writer allowing to communicate to the other peer.
fn new(p: &mut Comm);
/// Starts handling protocol communication, the peer(s) is expected to be known already, usually passed during construction.
fn handle(&self, server: &Server);
}