p2p module getting close to first integration test allowing 2 peers to connect. Fleshed out handshake and protocol use. Fixed most errors, just need to make the borrow checker happy.

This commit is contained in:
Ignotus Peverell 2016-10-25 22:06:13 -07:00
parent 4657b09c4e
commit b1762cb5f4
No known key found for this signature in database
GPG key ID: 99CD25F39F8F8211
18 changed files with 426 additions and 210 deletions

1
.gitignore vendored
View file

@ -1,3 +1,4 @@
*.swp *.swp
.* .*
target target
Cargo.lock

View file

@ -24,7 +24,7 @@
extern crate bitflags; extern crate bitflags;
extern crate byteorder; extern crate byteorder;
#[macro_use(try_m)] #[macro_use(try_o)]
extern crate grin_core as core; extern crate grin_core as core;
extern crate grin_store; extern crate grin_store;
extern crate secp256k1zkp as secp; extern crate secp256k1zkp as secp;

View file

@ -66,11 +66,11 @@ pub fn process_block(b: &Block, store: &ChainStore, opts: Options) -> Option<Err
tip: None, tip: None,
}; };
try_m!(validate_header(&b, &mut ctx)); try_o!(validate_header(&b, &mut ctx));
try_m!(set_tip(&b.header, &mut ctx)); try_o!(set_tip(&b.header, &mut ctx));
try_m!(validate_block(b, &mut ctx)); try_o!(validate_block(b, &mut ctx));
try_m!(add_block(b, &mut ctx)); try_o!(add_block(b, &mut ctx));
try_m!(update_tips(&mut ctx)); try_o!(update_tips(&mut ctx));
None None
} }
@ -118,7 +118,7 @@ fn set_tip(h: &BlockHeader, ctx: &mut BlockContext) -> Option<Error> {
fn validate_block(b: &Block, ctx: &mut BlockContext) -> Option<Error> { fn validate_block(b: &Block, ctx: &mut BlockContext) -> Option<Error> {
// TODO check tx merkle tree // TODO check tx merkle tree
let curve = secp::Secp256k1::with_caps(secp::ContextFlag::Commit); let curve = secp::Secp256k1::with_caps(secp::ContextFlag::Commit);
try_m!(b.verify(&curve).err().map(&Error::InvalidBlockProof)); try_o!(b.verify(&curve).err().map(&Error::InvalidBlockProof));
None None
} }

View file

@ -51,7 +51,7 @@ impl ChainStore for ChainKVStore {
} }
fn save_head(&self, t: &Tip) -> Option<Error> { fn save_head(&self, t: &Tip) -> Option<Error> {
try_m!(self.save_tip(t)); try_o!(self.save_tip(t));
self.db.put_ser(&vec![HEAD_PREFIX], t).map(&to_store_err) self.db.put_ser(&vec![HEAD_PREFIX], t).map(&to_store_err)
} }

View file

@ -41,9 +41,9 @@ impl Lineage {
/// Serialization for lineage, necessary to serialize fork tips. /// Serialization for lineage, necessary to serialize fork tips.
impl ser::Writeable for Lineage { impl ser::Writeable for Lineage {
fn write(&self, writer: &mut ser::Writer) -> Option<ser::Error> { fn write(&self, writer: &mut ser::Writer) -> Option<ser::Error> {
try_m!(writer.write_u32(self.0.len() as u32)); try_o!(writer.write_u32(self.0.len() as u32));
for num in &self.0 { for num in &self.0 {
try_m!(writer.write_u32(*num)); try_o!(writer.write_u32(*num));
} }
None None
} }
@ -100,9 +100,9 @@ impl Tip {
/// Serialization of a tip, required to save to datastore. /// Serialization of a tip, required to save to datastore.
impl ser::Writeable for Tip { impl ser::Writeable for Tip {
fn write(&self, writer: &mut ser::Writer) -> Option<ser::Error> { fn write(&self, writer: &mut ser::Writer) -> Option<ser::Error> {
try_m!(writer.write_u64(self.height)); try_o!(writer.write_u64(self.height));
try_m!(writer.write_fixed_bytes(&self.last_block_h)); try_o!(writer.write_fixed_bytes(&self.last_block_h));
try_m!(writer.write_fixed_bytes(&self.prev_block_h)); try_o!(writer.write_fixed_bytes(&self.prev_block_h));
self.lineage.write(writer) self.lineage.write(writer)
} }
} }

View file

@ -70,10 +70,10 @@ impl Writeable for BlockHeader {
[write_u64, self.total_fees]); [write_u64, self.total_fees]);
// make sure to not introduce any variable length data before the nonce to // make sure to not introduce any variable length data before the nonce to
// avoid complicating PoW // avoid complicating PoW
try_m!(writer.write_u64(self.nonce)); try_o!(writer.write_u64(self.nonce));
// cuckoo cycle of 42 nodes // cuckoo cycle of 42 nodes
for n in 0..42 { for n in 0..42 {
try_m!(writer.write_u32(self.pow.0[n])); try_o!(writer.write_u32(self.pow.0[n]));
} }
writer.write_u64(self.td) writer.write_u64(self.td)
} }
@ -102,20 +102,20 @@ pub struct Block {
/// block as binary. /// block as binary.
impl Writeable for Block { impl Writeable for Block {
fn write(&self, writer: &mut Writer) -> Option<ser::Error> { fn write(&self, writer: &mut Writer) -> Option<ser::Error> {
try_m!(self.header.write(writer)); try_o!(self.header.write(writer));
ser_multiwrite!(writer, ser_multiwrite!(writer,
[write_u64, self.inputs.len() as u64], [write_u64, self.inputs.len() as u64],
[write_u64, self.outputs.len() as u64], [write_u64, self.outputs.len() as u64],
[write_u64, self.proofs.len() as u64]); [write_u64, self.proofs.len() as u64]);
for inp in &self.inputs { for inp in &self.inputs {
try_m!(inp.write(writer)); try_o!(inp.write(writer));
} }
for out in &self.outputs { for out in &self.outputs {
try_m!(out.write(writer)); try_o!(out.write(writer));
} }
for proof in &self.proofs { for proof in &self.proofs {
try_m!(proof.write(writer)); try_o!(proof.write(writer));
} }
None None
} }
@ -218,7 +218,7 @@ impl Block {
// repeated iterations, revisit if a problem // repeated iterations, revisit if a problem
// validate each transaction and gather their proofs // validate each transaction and gather their proofs
let mut proofs = try_map_vec!(txs, |tx| tx.verify_sig(&secp)); let mut proofs = try_oap_vec!(txs, |tx| tx.verify_sig(&secp));
proofs.push(reward_proof); proofs.push(reward_proof);
// build vectors with all inputs and all outputs, ordering them by hash // build vectors with all inputs and all outputs, ordering them by hash

View file

@ -40,7 +40,7 @@ pub struct TxProof {
impl Writeable for TxProof { impl Writeable for TxProof {
fn write(&self, writer: &mut Writer) -> Option<ser::Error> { fn write(&self, writer: &mut Writer) -> Option<ser::Error> {
try_m!(writer.write_fixed_bytes(&self.remainder)); try_o!(writer.write_fixed_bytes(&self.remainder));
writer.write_vec(&mut self.sig.clone()) writer.write_vec(&mut self.sig.clone())
} }
} }
@ -75,10 +75,10 @@ impl Writeable for Transaction {
[write_u64, self.inputs.len() as u64], [write_u64, self.inputs.len() as u64],
[write_u64, self.outputs.len() as u64]); [write_u64, self.outputs.len() as u64]);
for inp in &self.inputs { for inp in &self.inputs {
try_m!(inp.write(writer)); try_o!(inp.write(writer));
} }
for out in &self.outputs { for out in &self.outputs {
try_m!(out.write(writer)); try_o!(out.write(writer));
} }
None None
} }
@ -303,7 +303,7 @@ pub enum Output {
/// an Output as binary. /// an Output as binary.
impl Writeable for Output { impl Writeable for Output {
fn write(&self, writer: &mut Writer) -> Option<ser::Error> { fn write(&self, writer: &mut Writer) -> Option<ser::Error> {
try_m!(writer.write_fixed_bytes(&self.commitment().unwrap())); try_o!(writer.write_fixed_bytes(&self.commitment().unwrap()));
writer.write_vec(&mut self.proof().unwrap().bytes().to_vec()) writer.write_vec(&mut self.proof().unwrap().bytes().to_vec())
} }
} }

View file

@ -17,6 +17,7 @@
/// Eliminates some of the verbosity in having iter and collect /// Eliminates some of the verbosity in having iter and collect
/// around every map call. /// around every map call.
#[macro_export]
macro_rules! map_vec { macro_rules! map_vec {
($thing:expr, $mapfn:expr ) => { ($thing:expr, $mapfn:expr ) => {
$thing.iter() $thing.iter()
@ -27,7 +28,8 @@ macro_rules! map_vec {
/// Same as map_vec when the map closure returns Results. Makes sure the /// Same as map_vec when the map closure returns Results. Makes sure the
/// results are "pushed up" and wraps with a try. /// results are "pushed up" and wraps with a try.
macro_rules! try_map_vec { #[macro_export]
macro_rules! try_oap_vec {
($thing:expr, $mapfn:expr ) => { ($thing:expr, $mapfn:expr ) => {
try!($thing.iter() try!($thing.iter()
.map($mapfn) .map($mapfn)
@ -59,11 +61,11 @@ macro_rules! tee {
} }
} }
/// Simple equivalent of try! but for a Maybe<Error>. Motivated mostly by the /// Simple equivalent of try! but for an Option<Error>. Motivated mostly by the
/// io package and our serialization as an alternative to silly Result<(), /// io package and our serialization as an alternative to silly Result<(),
/// Error>. /// Error>.
#[macro_export] #[macro_export]
macro_rules! try_m { macro_rules! try_o {
($trying:expr) => { ($trying:expr) => {
let tried = $trying; let tried = $trying;
if let Some(_) = tried { if let Some(_) = tried {
@ -72,6 +74,17 @@ macro_rules! try_m {
} }
} }
#[macro_export]
macro_rules! try_to_o {
($trying:expr) => {{
let tried = $trying;
if let Err(e) = tried {
return Some(e);
}
tried.unwrap()
}}
}
/// Eliminate some of the boilerplate of deserialization (package ser) by /// Eliminate some of the boilerplate of deserialization (package ser) by
/// passing just the list of reader function. /// passing just the list of reader function.
/// Example before: /// Example before:
@ -86,9 +99,16 @@ macro_rules! ser_multiread {
} }
} }
/// Eliminate some of the boilerplate of serialization (package ser) by
/// passing directly pairs of writer function and data to write.
/// Example before:
/// try!(reader.write_u64(42));
/// try!(reader.write_u32(100));
/// Example after:
/// ser_multiwrite!(writer, [write_u64, 42], [write_u32, 100]);
#[macro_export] #[macro_export]
macro_rules! ser_multiwrite { macro_rules! ser_multiwrite {
($wrtr:ident, $([ $write_call:ident, $val:expr ]),* ) => { ($wrtr:ident, $([ $write_call:ident, $val:expr ]),* ) => {
$( try_m!($wrtr.$write_call($val)) );* $( try_o!($wrtr.$write_call($val)) );*
} }
} }

View file

@ -72,15 +72,15 @@ struct PowHeader {
/// to make incrementing from the serialized form trivial. /// to make incrementing from the serialized form trivial.
impl Writeable for PowHeader { impl Writeable for PowHeader {
fn write(&self, writer: &mut Writer) -> Option<ser::Error> { fn write(&self, writer: &mut Writer) -> Option<ser::Error> {
try_m!(writer.write_u64(self.nonce)); try_o!(writer.write_u64(self.nonce));
try_m!(writer.write_u64(self.height)); try_o!(writer.write_u64(self.height));
try_m!(writer.write_fixed_bytes(&self.previous)); try_o!(writer.write_fixed_bytes(&self.previous));
try_m!(writer.write_i64(self.timestamp.to_timespec().sec)); try_o!(writer.write_i64(self.timestamp.to_timespec().sec));
try_m!(writer.write_fixed_bytes(&self.utxo_merkle)); try_o!(writer.write_fixed_bytes(&self.utxo_merkle));
try_m!(writer.write_fixed_bytes(&self.tx_merkle)); try_o!(writer.write_fixed_bytes(&self.tx_merkle));
try_m!(writer.write_u64(self.total_fees)); try_o!(writer.write_u64(self.total_fees));
try_m!(writer.write_u64(self.n_in)); try_o!(writer.write_u64(self.n_in));
try_m!(writer.write_u64(self.n_out)); try_o!(writer.write_u64(self.n_out));
writer.write_u64(self.n_proofs) writer.write_u64(self.n_proofs)
} }
} }

View file

@ -51,6 +51,8 @@ pub trait AsFixedBytes {
pub trait Writer { pub trait Writer {
/// Writes a u8 as bytes /// Writes a u8 as bytes
fn write_u8(&mut self, n: u8) -> Option<Error>; fn write_u8(&mut self, n: u8) -> Option<Error>;
/// Writes a u16 as bytes
fn write_u16(&mut self, n: u16) -> Option<Error>;
/// Writes a u32 as bytes /// Writes a u32 as bytes
fn write_u32(&mut self, n: u32) -> Option<Error>; fn write_u32(&mut self, n: u32) -> Option<Error>;
/// Writes a u64 as bytes /// Writes a u64 as bytes
@ -70,6 +72,8 @@ pub trait Writer {
pub trait Reader { pub trait Reader {
/// Read a u8 from the underlying Read /// Read a u8 from the underlying Read
fn read_u8(&mut self) -> Result<u8, Error>; fn read_u8(&mut self) -> Result<u8, Error>;
/// Read a u16 from the underlying Read
fn read_u16(&mut self) -> Result<u16, Error>;
/// Read a u32 from the underlying Read /// Read a u32 from the underlying Read
fn read_u32(&mut self) -> Result<u32, Error>; fn read_u32(&mut self) -> Result<u32, Error>;
/// Read a u64 from the underlying Read /// Read a u64 from the underlying Read
@ -137,6 +141,9 @@ impl<'a> Reader for BinReader<'a> {
fn read_u8(&mut self) -> Result<u8, Error> { fn read_u8(&mut self) -> Result<u8, Error> {
self.source.read_u8().map_err(Error::IOErr) self.source.read_u8().map_err(Error::IOErr)
} }
fn read_u16(&mut self) -> Result<u16, Error> {
self.source.read_u16::<BigEndian>().map_err(Error::IOErr)
}
fn read_u32(&mut self) -> Result<u32, Error> { fn read_u32(&mut self) -> Result<u32, Error> {
self.source.read_u32::<BigEndian>().map_err(Error::IOErr) self.source.read_u32::<BigEndian>().map_err(Error::IOErr)
} }
@ -188,6 +195,9 @@ impl<'a> Writer for BinWriter<'a> {
fn write_u8(&mut self, n: u8) -> Option<Error> { fn write_u8(&mut self, n: u8) -> Option<Error> {
self.sink.write_u8(n).err().map(Error::IOErr) self.sink.write_u8(n).err().map(Error::IOErr)
} }
fn write_u16(&mut self, n: u16) -> Option<Error> {
self.sink.write_u16::<BigEndian>(n).err().map(Error::IOErr)
}
fn write_u32(&mut self, n: u32) -> Option<Error> { fn write_u32(&mut self, n: u32) -> Option<Error> {
self.sink.write_u32::<BigEndian>(n).err().map(Error::IOErr) self.sink.write_u32::<BigEndian>(n).err().map(Error::IOErr)
} }
@ -202,7 +212,7 @@ impl<'a> Writer for BinWriter<'a> {
fn write_vec(&mut self, vec: &mut Vec<u8>) -> Option<Error> { fn write_vec(&mut self, vec: &mut Vec<u8>) -> Option<Error> {
try_m!(self.write_u64(vec.len() as u64)); try_o!(self.write_u64(vec.len() as u64));
self.sink.write_all(vec).err().map(Error::IOErr) self.sink.write_all(vec).err().map(Error::IOErr)
} }

View file

@ -6,6 +6,8 @@ authors = ["Ignotus Peverell <igno.peverell@protonmail.com>"]
[dependencies] [dependencies]
bitflags = "^0.7.0" bitflags = "^0.7.0"
byteorder = "^0.5" byteorder = "^0.5"
log = "^0.3"
rand = "^0.3"
mioco = "^0.8" mioco = "^0.8"
time = "^0.1" time = "^0.1"

140
p2p/src/handshake.rs Normal file
View file

@ -0,0 +1,140 @@
// Copyright 2016 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::VecDeque;
use std::sync::RwLock;
use rand::Rng;
use rand::os::OsRng;
use core::ser::{serialize, deserialize, Error};
use msg::*;
use types::*;
use protocol::ProtocolV1;
use peer::PeerConn;
const NONCES_CAP: usize = 100;
/// Handles the handshake negotiation when two peers connect and decides on
/// protocol.
pub struct Handshake {
/// Ring buffer of nonces sent to detect self connections without requiring
/// a node id.
nonces: RwLock<VecDeque<u64>>,
}
unsafe impl Sync for Handshake{}
unsafe impl Send for Handshake{}
impl Handshake {
/// Creates a new handshake handler
pub fn new() -> Handshake {
Handshake { nonces: RwLock::new(VecDeque::with_capacity(NONCES_CAP)) }
}
/// Handles connecting to a new remote peer, starting the version handshake.
pub fn connect<'a>(&'a self, peer: &'a mut PeerConn) -> Result<&Protocol, Error> {
let nonce = self.next_nonce();
let sender_addr = SockAddr(peer.local_addr());
let receiver_addr = SockAddr(peer.peer_addr());
let opt_err = serialize(peer,
&Hand {
version: PROTOCOL_VERSION,
capabilities: FULL_SYNC,
nonce: nonce,
sender_addr: sender_addr,
receiver_addr: receiver_addr,
user_agent: USER_AGENT.to_string(),
});
match opt_err {
Some(err) => return Err(err),
None => {}
}
let shake = try!(deserialize::<Shake>(peer));
if shake.version != 1 {
self.close(peer, ErrCodes::UNSUPPORTED_VERSION as u32,
format!("Unsupported version: {}, ours: {})",
shake.version,
PROTOCOL_VERSION));
return Err(Error::UnexpectedData{expected: vec![PROTOCOL_VERSION as u8], received: vec![shake.version as u8]});
}
peer.capabilities = shake.capabilities;
peer.user_agent = shake.user_agent;
// when more than one protocol version is supported, choosing should go here
Ok(&ProtocolV1::new(peer))
}
/// Handles receiving a connection from a new remote peer that started the
/// version handshake.
pub fn handshake<'a>(&'a self, peer: &'a mut PeerConn) -> Result<&Protocol, Error> {
let hand = try!(deserialize::<Hand>(peer));
if hand.version != 1 {
self.close(peer, ErrCodes::UNSUPPORTED_VERSION as u32,
format!("Unsupported version: {}, ours: {})",
hand.version,
PROTOCOL_VERSION));
return Err(Error::UnexpectedData{expected: vec![PROTOCOL_VERSION as u8], received: vec![hand.version as u8]});
}
{
let nonces = self.nonces.read().unwrap();
if nonces.contains(&hand.nonce) {
return Err(Error::UnexpectedData {
expected: vec![],
received: vec![],
});
}
}
peer.capabilities = hand.capabilities;
peer.user_agent = hand.user_agent;
let opt_err = serialize(peer,
&Shake {
version: PROTOCOL_VERSION,
capabilities: FULL_SYNC,
user_agent: USER_AGENT.to_string(),
});
match opt_err {
Some(err) => return Err(err),
None => {}
}
// when more than one protocol version is supported, choosing should go here
Ok(&ProtocolV1::new(peer))
}
fn next_nonce(&self) -> u64 {
let mut rng = OsRng::new().unwrap();
let nonce = rng.next_u64();
let mut nonces = self.nonces.write().unwrap();
nonces.push_back(nonce);
if nonces.len() >= NONCES_CAP {
nonces.pop_front();
}
nonce
}
fn close(&self, peer: &mut PeerConn, err_code: u32, explanation: String) {
serialize(peer,
&PeerError {
code: err_code,
message: explanation,
});
peer.close();
}
}

View file

@ -23,9 +23,17 @@
#[macro_use] #[macro_use]
extern crate bitflags; extern crate bitflags;
#[macro_use]
extern crate grin_core as core;
#[macro_use]
extern crate log;
extern crate mioco; extern crate mioco;
extern crate rand;
extern crate time;
mod types;
mod msg; mod msg;
mod handshake;
mod protocol;
mod server; mod server;
mod peer; mod peer;
mod protocol;

View file

@ -14,15 +14,20 @@
//! Message types that transit over the network and related serialization code. //! Message types that transit over the network and related serialization code.
use std::net::SocketAddr; use std::net::*;
use core::ser::{Writeable, Readable, Writer, Reader, Error}; use core::ser::{self, Writeable, Readable, Writer, Reader, Error};
/// Current latest version of the protocol
pub const PROTOCOL_VERSION: u32 = 1;
/// Grin's user agent with current version (TODO externalize)
pub const USER_AGENT: &'static str = "MW/Grin 0.1";
/// Magic number expected in the header of every message /// Magic number expected in the header of every message
const MAGIC: [u8; 2] = [0x1e, 0xc5]; const MAGIC: [u8; 2] = [0x1e, 0xc5];
/// Codes for each error that can be produced reading a message. /// Codes for each error that can be produced reading a message.
enum ErrCodes { pub enum ErrCodes {
UNSUPPORTED_VERSION = 100, UNSUPPORTED_VERSION = 100,
} }
@ -37,14 +42,15 @@ bitflags! {
} }
/// Types of messages /// Types of messages
enum Type { #[derive(Clone, Copy)]
HAND = 1, pub enum Type {
SHAKE = 2, ERROR,
ERROR = 3, HAND,
/// Never actually used over the network but used to detect unrecognized SHAKE,
/// types. PING,
/// Increment as needed. PONG,
MAX_MSG_TYPE = 4, /// Never used over the network but to detect unrecognized types.
MAX_MSG_TYPE,
} }
/// Header of any protocol message, used to identify incoming messages. /// Header of any protocol message, used to identify incoming messages.
@ -54,8 +60,8 @@ pub struct MsgHeader {
} }
impl MsgHeader { impl MsgHeader {
fn acceptable(&self) -> bool { pub fn acceptable(&self) -> bool {
msg_type < MAX_MSG_TYPE; (self.msg_type as u8) < (Type::MAX_MSG_TYPE as u8)
} }
} }
@ -74,9 +80,20 @@ impl Readable<MsgHeader> for MsgHeader {
try!(reader.expect_u8(MAGIC[0])); try!(reader.expect_u8(MAGIC[0]));
try!(reader.expect_u8(MAGIC[1])); try!(reader.expect_u8(MAGIC[1]));
let t = try!(reader.read_u8()); let t = try!(reader.read_u8());
if t < (Type::MAX_MSG_TYPE as u8) {
return Err(ser::Error::CorruptedData);
}
Ok(MsgHeader { Ok(MsgHeader {
magic: MAGIC, magic: MAGIC,
msg_type: t, msg_type: match t {
// TODO this is rather ugly, think of a better way
0 => Type::ERROR,
1 => Type::HAND,
2 => Type::SHAKE,
3 => Type::PING,
4 => Type::PONG,
_ => panic!(),
},
}) })
} }
} }
@ -84,54 +101,68 @@ impl Readable<MsgHeader> for MsgHeader {
/// First part of a handshake, sender advertises its version and /// First part of a handshake, sender advertises its version and
/// characteristics. /// characteristics.
pub struct Hand { pub struct Hand {
version: u32, /// protocol version of the sender
capabilities: Capabilities, pub version: u32,
sender_addr: SocketAddr, /// capabilities of the sender
receiver_addr: SocketAddr, pub capabilities: Capabilities,
user_agent: String, /// randomly generated for each handshake, helps detect self
pub nonce: u64,
/// network address of the sender
pub sender_addr: SockAddr,
/// network address of the receiver
pub receiver_addr: SockAddr,
/// name of version of the software
pub user_agent: String,
} }
impl Writeable for Hand { impl Writeable for Hand {
fn write(&self, writer: &mut Writer) -> Option<ser::Error> { fn write(&self, writer: &mut Writer) -> Option<ser::Error> {
ser_multiwrite!(writer, ser_multiwrite!(writer,
[write_u32, self.version], [write_u32, self.version],
[write_u32, self.capabilities]); [write_u32, self.capabilities.bits()],
sender_addr.write(writer); [write_u64, self.nonce]);
receiver_addr.write(writer); self.sender_addr.write(writer);
writer.write_vec(&mut self.user_agent.into_bytes()) self.receiver_addr.write(writer);
writer.write_vec(&mut self.user_agent.clone().into_bytes())
} }
} }
impl Readable<Hand> for Hand { impl Readable<Hand> for Hand {
fn read(reader: &mut Reader) -> Result<Hand, ser::Error> { fn read(reader: &mut Reader) -> Result<Hand, ser::Error> {
let (version, capab) = ser_multiread!(reader, read_u32, read_u32); let (version, capab, nonce) = ser_multiread!(reader, read_u32, read_u32, read_u64);
let sender_addr = SocketAddr::read(reader); let sender_addr = try!(SockAddr::read(reader));
let receiver_addr = SocketAddr::read(reader); let receiver_addr = try!(SockAddr::read(reader));
let user_agent = reader.read_vec(); let ua = try!(reader.read_vec());
Hand { let user_agent = try!(String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData));
let capabilities = try!(Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData));
Ok(Hand {
version: version, version: version,
capabilities: capab, capabilities: capabilities,
server_addr: sender_addr, nonce: nonce,
sender_addr: sender_addr,
receiver_addr: receiver_addr, receiver_addr: receiver_addr,
user_agent: user_agent, user_agent: user_agent,
} })
} }
} }
/// Second part of a handshake, receiver of the first part replies with its own /// Second part of a handshake, receiver of the first part replies with its own
/// version and characteristics. /// version and characteristics.
pub struct Shake { pub struct Shake {
version: u32, /// sender version
capabilities: Capabilities, pub version: u32,
user_agent: String, /// sender capabilities
pub capabilities: Capabilities,
/// name of version of the software
pub user_agent: String,
} }
impl Writeable for MsgHeader { impl Writeable for Shake {
fn write(&self, writer: &mut Writer) -> Option<ser::Error> { fn write(&self, writer: &mut Writer) -> Option<ser::Error> {
ser_multiwrite!(writer, ser_multiwrite!(writer,
[write_u32, self.version], [write_u32, self.version],
[write_u32, self.capabilities], [write_u32, self.capabilities.bits()],
[write_vec, self.user_agent.as_mut_vec()]); [write_vec, &mut self.user_agent.as_bytes().to_vec()]);
None None
} }
} }
@ -139,27 +170,30 @@ impl Writeable for MsgHeader {
impl Readable<Shake> for Shake { impl Readable<Shake> for Shake {
fn read(reader: &mut Reader) -> Result<Shake, ser::Error> { fn read(reader: &mut Reader) -> Result<Shake, ser::Error> {
let (version, capab, ua) = ser_multiread!(reader, read_u32, read_u32, read_vec); let (version, capab, ua) = ser_multiread!(reader, read_u32, read_u32, read_vec);
let user_agent = try!(String::from_utf8(ua).map_err(|_| ser::Error: CorruptedData)); let user_agent = try!(String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData));
Hand { let capabilities = try!(Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData));
Ok(Shake {
version: version, version: version,
capabilities: capab, capabilities: capabilities,
user_agent: user_agent, user_agent: user_agent,
} })
} }
} }
/// We found some issue in the communication, sending an error back, usually /// We found some issue in the communication, sending an error back, usually
/// followed by closing the connection. /// followed by closing the connection.
pub struct PeerError { pub struct PeerError {
code: u32, /// error code
message: String, pub code: u32,
/// slightly more user friendly message
pub message: String,
} }
impl Writeable for PeerError { impl Writeable for PeerError {
fn write(&self, writer: &mut Writer) -> Option<ser::Error> { fn write(&self, writer: &mut Writer) -> Option<ser::Error> {
ser_multiwrite!(writer, ser_multiwrite!(writer,
[write_u32, self.code], [write_u32, self.code],
[write_vec, &mut self.message.into_bytes()]); [write_vec, &mut self.message.clone().into_bytes()]);
None None
} }
} }
@ -167,47 +201,49 @@ impl Writeable for PeerError {
impl Readable<PeerError> for PeerError { impl Readable<PeerError> for PeerError {
fn read(reader: &mut Reader) -> Result<PeerError, ser::Error> { fn read(reader: &mut Reader) -> Result<PeerError, ser::Error> {
let (code, msg) = ser_multiread!(reader, read_u32, read_vec); let (code, msg) = ser_multiread!(reader, read_u32, read_vec);
let message = try!(String::from_utf8(msg).map_err(|_| ser::Error: CorruptedData)); let message = try!(String::from_utf8(msg).map_err(|_| ser::Error::CorruptedData));
PeerError { Ok(PeerError {
code: code, code: code,
message: message, message: message,
} })
} }
} }
impl Writeable for SocketAddr { /// Only necessary so we can implement Readable and Writeable. Rust disallows implementing traits when both types are outside of this crate (which is the case for SocketAddr and Readable/Writeable).
pub struct SockAddr(pub SocketAddr);
impl Writeable for SockAddr {
fn write(&self, writer: &mut Writer) -> Option<ser::Error> { fn write(&self, writer: &mut Writer) -> Option<ser::Error> {
match self { match self.0 {
V4(sav4) => { SocketAddr::V4(sav4) => {
ser_multiwrite!(writer, ser_multiwrite!(writer,
[write_u8, 0], [write_u8, 0],
[write_fixed_bytes, sav4.ip().octets()], [write_fixed_bytes, &sav4.ip().octets().to_vec()],
[write_u16, sav4.port()]); [write_u16, sav4.port()]);
} }
V6(sav6) => { SocketAddr::V6(sav6) => {
try_m(writer.write_u8(1)); try_o!(writer.write_u8(1));
for seg in sav6.ip().segments() { for seg in &sav6.ip().segments() {
try_m(writer.write_u16(seg)); try_o!(writer.write_u16(*seg));
} }
try_m(writer.write_u16(sav6.port())); try_o!(writer.write_u16(sav6.port()));
} }
} }
None None
} }
} }
impl Readable<SocketAddr> for SocketAddr { impl Readable<SockAddr> for SockAddr {
fn read(reader: &mut Reader) -> Result<SocketAddr, ser::Error> { fn read(reader: &mut Reader) -> Result<SockAddr, ser::Error> {
let v4_or_v6 = reader.read_u8(); let v4_or_v6 = try!(reader.read_u8());
if v4_or_v6 == 0 { if v4_or_v6 == 0 {
let ip = reader.read_fixed_bytes(4); let ip = try!(reader.read_fixed_bytes(4));
let port = reader.read_u16(); let port = try!(reader.read_u16());
SocketAddrV4::new(Ipv4Addr::new(ip[0], ip[1], ip[2], ip[3]), port) Ok(SockAddr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(ip[0], ip[1], ip[2], ip[3]), port))))
} else { } else {
let ip = [0..8].map(|_| reader.read_u16()).collect::<Vec<u16>>(); let ip = try_oap_vec!([0..8], |_| reader.read_u16());
let port = reader.read_u16(); let port = try!(reader.read_u16());
SocketAddrV6::new(Ipv6Addr::new(ip[0], ip[1], ip[2], ip[3], ip[4], ip[5], ip[6], ip[7]), Ok(SockAddr(SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::new(ip[0], ip[1], ip[2], ip[3], ip[4], ip[5], ip[6], ip[7]), port, 0, 0))))
port)
} }
} }
} }

View file

@ -12,117 +12,83 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use str::net::SocketAddr; use std::net::SocketAddr;
use std::str::FromStr; use std::io::{self, Read, Write, BufReader};
use std::io::{Read, Write};
use time::Duration;
use mioco::tcp::{TcpListener, TcpStream, Shutdown}; use mioco::tcp::{TcpListener, TcpStream, Shutdown};
use core::ser::{serialize, deserialize}; use time::Duration;
const PROTOCOL_VERSION: u32 = 1; use core::ser::{serialize, deserialize, Error};
const USER_AGENT: &'static str = "MW/Grin 0.1"; use handshake::Handshake;
use msg::*;
use types::*;
/// The local representation of a remotely connected peer. Handles most /// The local representation of a remotely connected peer. Handles most
/// low-level network communication and tracks peer information. /// low-level network communication and tracks peer information.
struct Peer { pub struct PeerConn {
conn: TcpStream, conn: TcpStream,
reader: BufReader, reader: BufReader<TcpStream>,
capabilities: Capabilities, pub capabilities: Capabilities,
user_agent: String, pub user_agent: String,
} }
/// Make the Peer a Reader for convenient access to the underlying connection. /// Make the Peer a Reader for convenient access to the underlying connection.
/// Allows the peer to track how much is received. /// Allows the peer to track how much is received.
impl Read for Peer { impl Read for PeerConn {
fn read(&mut self, buf: &mut [u8]) -> Result<usize> { fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.reader.read(buf) self.reader.read(buf)
} }
} }
/// Make the Peer a Writer for convenient access to the underlying connection. /// Make the Peer a Writer for convenient access to the underlying connection.
/// Allows the peer to track how much is sent. /// Allows the peer to track how much is sent.
impl Write for Peer { impl Write for PeerConn {
fn write(&mut self, buf: &[u8]) -> Result<usize> { fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.conf.write(buf) self.conn.write(buf)
} }
fn flush(&mut self) -> io::Result<()> {
self.conn.flush()
}
} }
impl Close for Peer { impl Close for PeerConn {
fn close() { fn close(&self) {
self.conn.shutdown(Shutdown::Both); self.conn.shutdown(Shutdown::Both);
} }
} }
impl Peer { impl PeerConn {
/// Create a new local peer instance connected to a remote peer with the /// Create a new local peer instance connected to a remote peer with the
/// provided TcpStream. /// provided TcpStream.
fn new(conn: TcpStream) -> Peer { pub fn new(conn: TcpStream) -> PeerConn {
// don't wait on read for more than 2 seconds by default // don't wait on read for more than 2 seconds by default
conn.set_read_timeout(Some(Duration::seconds(2))); conn.set_keepalive(Some(2));
Peer { PeerConn {
conn: conn, conn: conn,
reader: BufReader::new(conn), reader: BufReader::new(conn),
capabilities: UNKNOWN, capabilities: UNKNOWN,
user_agent: "", user_agent: "".to_string(),
} }
} }
/// Handles connecting to a new remote peer, starting the version handshake. pub fn connect(&mut self, hs: &Handshake, na: &NetAdapter) -> Option<Error> {
fn connect(&mut self) -> Result<Protocol, Error> { let mut proto = try_to_o!(hs.connect(self));
serialize(self.peer, proto.handle(na)
&Hand {
version: PROTOCOL_VERSION,
capabilities: FULL_SYNC,
sender_addr: listen_addr(),
receiver_addr: self.peer.peer_addr(),
user_agent: USER_AGENT,
});
let shake = deserialize(self.peer);
if shake.version != 1 {
self.close(ErrCodes::UNSUPPORTED_VERSION,
format!("Unsupported version: {}, ours: {})",
shake.version,
PROTOCOL_VERSION));
return;
}
self.capabilities = shake.capabilities;
self.user_agent = shake.user_agent;
// when more than one protocol version is supported, choosing should go here
ProtocolV1::new(&self);
} }
/// Handles receiving a connection from a new remote peer that started the pub fn handshake(&mut self, hs: &Handshake, na: &NetAdapter) -> Option<Error> {
/// version handshake. let mut proto = try_to_o!(hs.handshake(self));
fn handshake(&mut self) -> Result<Protocol, Error> { proto.handle(na)
let hand = deserialize(self.peer); }
if hand.version != 1 { }
self.close(ErrCodes::UNSUPPORTED_VERSION,
format!("Unsupported version: {}, ours: {})", impl PeerInfo for PeerConn {
hand.version, fn peer_addr(&self) -> SocketAddr {
PROTOCOL_VERSION)); self.conn.peer_addr().unwrap()
return; }
} fn local_addr(&self) -> SocketAddr {
// TODO likely not exactly what we want (private vs public IP)
self.peer.capabilities = hand.capabilities; self.conn.local_addr().unwrap()
self.peer.user_agent = hand.user_agent;
serialize(self.peer,
&Shake {
version: PROTOCOL_VERSION,
capabilities: FULL_SYNC,
user_agent: USER_AGENT,
});
self.accept_loop();
// when more than one protocol version is supported, choosing should go here
ProtocolV1::new(&self);
}
fn peer_addr(&self) -> SocketAddr {
self.conn.peer_addr()
} }
} }

View file

@ -12,20 +12,19 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use types::*;
use core::ser; use core::ser;
use msg::*;
use types::*;
use peer::PeerConn;
pub struct ProtocolV1 { pub struct ProtocolV1<'a> {
comm: &mut Comm, peer: &'a mut PeerConn,
} }
impl Protocol for ProtocolV1 { impl<'a> Protocol for ProtocolV1<'a> {
fn new(p: &mut Comm) -> Protocol { fn handle(&mut self, server: &NetAdapter) -> Option<ser::Error> {
Protocol { comm: p }
}
fn handle(&self, server: &Server) {
loop { loop {
let header = ser::deserialize::<MsgHeader>(); let header = try_to_o!(ser::deserialize::<MsgHeader>(self.peer));
if !header.acceptable() { if !header.acceptable() {
continue; continue;
} }
@ -33,13 +32,17 @@ impl Protocol for ProtocolV1 {
} }
} }
impl ProtocolV1 { impl<'a> ProtocolV1<'a> {
fn close(err_code: u32, explanation: &'static str) { pub fn new(p: &mut PeerConn) -> ProtocolV1 {
serialize(self.peer, ProtocolV1{peer: p}
&Err { }
fn close(&mut self, err_code: u32, explanation: &'static str) {
ser::serialize(self.peer,
&PeerError {
code: err_code, code: err_code,
message: explanation, message: explanation.to_string(),
}); });
self.comm.close(); self.peer.close();
} }
} }

View file

@ -15,10 +15,18 @@
//! Grin server implementation, accepts incoming connections and connects to //! Grin server implementation, accepts incoming connections and connects to
//! other peers in the network. //! other peers in the network.
use std::io;
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::Arc;
use mioco;
use mioco::tcp::{TcpListener, TcpStream, Shutdown}; use mioco::tcp::{TcpListener, TcpStream, Shutdown};
use core::ser::{serialize, deserialize}; use core::ser::{serialize, deserialize};
use msg::*; use handshake::Handshake;
use peer::PeerConn;
use types::*;
const DEFAULT_LISTEN_ADDR: &'static str = "127.0.0.1:555"; const DEFAULT_LISTEN_ADDR: &'static str = "127.0.0.1:555";
@ -27,6 +35,9 @@ fn listen_addr() -> SocketAddr {
FromStr::from_str(DEFAULT_LISTEN_ADDR).unwrap() FromStr::from_str(DEFAULT_LISTEN_ADDR).unwrap()
} }
struct DummyAdapter {}
impl NetAdapter for DummyAdapter {}
pub struct Server { pub struct Server {
} }
@ -35,18 +46,28 @@ impl Server {
/// connections and starts the bootstrapping process to find peers. /// connections and starts the bootstrapping process to find peers.
pub fn new() -> Server { pub fn new() -> Server {
mioco::start(|| -> io::Result<()> { mioco::start(|| -> io::Result<()> {
// TODO SSL
let addr = "127.0.0.1:3414".parse().unwrap(); let addr = "127.0.0.1:3414".parse().unwrap();
let listener = try!(TcpListener::bind(&addr)); let listener = try!(TcpListener::bind(&addr));
info!("P2P server started on {}", addr); info!("P2P server started on {}", addr);
let hs = Arc::new(Handshake::new());
loop { loop {
let mut conn = try!(listener.accept()); let mut conn = try!(listener.accept());
let hs_child = hs.clone();
mioco::spawn(move || -> io::Result<()> { mioco::spawn(move || -> io::Result<()> {
Peer::new(conn).handshake(); let ret = PeerConn::new(conn).handshake(&hs_child, &DummyAdapter {});
if let Some(err) = ret {
error!("{:?}", err);
}
Ok(())
}); });
} }
}) })
.unwrap() .unwrap()
.unwrap(); .unwrap();
Server{}
} }
} }

View file

@ -13,21 +13,30 @@
// limitations under the License. // limitations under the License.
use std::io::{Read, Write}; use std::io::{Read, Write};
use std::net::SocketAddr;
use core::ser::Error;
/// Trait for pre-emptively and forcefully closing an underlying resource. /// Trait for pre-emptively and forcefully closing an underlying resource.
trait Close { pub trait Close {
fn close(); fn close(&self);
} }
/// Main trait we expect a peer to implement to be usable by a Protocol. /// General information about a connected peer that's useful to other modules.
trait Comm : Read + Write + Close; pub trait PeerInfo {
/// Address of the remote peer
/// A given communication protocol agreed upon between 2 peers (usually ourselves and a remove) after handshake. fn peer_addr(&self) -> SocketAddr;
trait Protocol { /// Our address, communicated to other peers
/// Instantiate providing a reader and writer allowing to communicate to the other peer. fn local_addr(&self) -> SocketAddr;
fn new(p: &mut Comm);
/// Starts handling protocol communication, the peer(s) is expected to be known already, usually passed during construction.
fn handle(&self, server: &Server);
} }
/// A given communication protocol agreed upon between 2 peers (usually
/// ourselves and a remove) after handshake.
pub trait Protocol {
/// Starts handling protocol communication, the peer(s) is expected to be
/// known already, usually passed during construction.
fn handle(&mut self, na: &NetAdapter) -> Option<Error>;
}
/// Bridge between the networking layer and the rest of the system. Handles the
/// forwarding or querying of blocks and transactions among other things.
pub trait NetAdapter {}