diff --git a/.gitignore b/.gitignore index 9d69b6472..0cd48e02b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ *.swp .* target +Cargo.lock diff --git a/chain/src/lib.rs b/chain/src/lib.rs index 9fc83a0e8..e7e5d0886 100644 --- a/chain/src/lib.rs +++ b/chain/src/lib.rs @@ -24,7 +24,7 @@ extern crate bitflags; extern crate byteorder; -#[macro_use(try_m)] +#[macro_use(try_o)] extern crate grin_core as core; extern crate grin_store; extern crate secp256k1zkp as secp; diff --git a/chain/src/pipe.rs b/chain/src/pipe.rs index 050fceaae..c2f6335b8 100644 --- a/chain/src/pipe.rs +++ b/chain/src/pipe.rs @@ -66,11 +66,11 @@ pub fn process_block(b: &Block, store: &ChainStore, opts: Options) -> Option Option { fn validate_block(b: &Block, ctx: &mut BlockContext) -> Option { // TODO check tx merkle tree let curve = secp::Secp256k1::with_caps(secp::ContextFlag::Commit); - try_m!(b.verify(&curve).err().map(&Error::InvalidBlockProof)); + try_o!(b.verify(&curve).err().map(&Error::InvalidBlockProof)); None } diff --git a/chain/src/store.rs b/chain/src/store.rs index 4b5ef9cd6..f1ed27585 100644 --- a/chain/src/store.rs +++ b/chain/src/store.rs @@ -51,7 +51,7 @@ impl ChainStore for ChainKVStore { } fn save_head(&self, t: &Tip) -> Option { - try_m!(self.save_tip(t)); + try_o!(self.save_tip(t)); self.db.put_ser(&vec![HEAD_PREFIX], t).map(&to_store_err) } diff --git a/chain/src/types.rs b/chain/src/types.rs index 142af6f63..f1ce594bd 100644 --- a/chain/src/types.rs +++ b/chain/src/types.rs @@ -41,9 +41,9 @@ impl Lineage { /// Serialization for lineage, necessary to serialize fork tips. impl ser::Writeable for Lineage { fn write(&self, writer: &mut ser::Writer) -> Option { - try_m!(writer.write_u32(self.0.len() as u32)); + try_o!(writer.write_u32(self.0.len() as u32)); for num in &self.0 { - try_m!(writer.write_u32(*num)); + try_o!(writer.write_u32(*num)); } None } @@ -100,9 +100,9 @@ impl Tip { /// Serialization of a tip, required to save to datastore. impl ser::Writeable for Tip { fn write(&self, writer: &mut ser::Writer) -> Option { - try_m!(writer.write_u64(self.height)); - try_m!(writer.write_fixed_bytes(&self.last_block_h)); - try_m!(writer.write_fixed_bytes(&self.prev_block_h)); + try_o!(writer.write_u64(self.height)); + try_o!(writer.write_fixed_bytes(&self.last_block_h)); + try_o!(writer.write_fixed_bytes(&self.prev_block_h)); self.lineage.write(writer) } } diff --git a/core/src/core/block.rs b/core/src/core/block.rs index 9413246f9..5ad9d4feb 100644 --- a/core/src/core/block.rs +++ b/core/src/core/block.rs @@ -70,10 +70,10 @@ impl Writeable for BlockHeader { [write_u64, self.total_fees]); // make sure to not introduce any variable length data before the nonce to // avoid complicating PoW - try_m!(writer.write_u64(self.nonce)); + try_o!(writer.write_u64(self.nonce)); // cuckoo cycle of 42 nodes for n in 0..42 { - try_m!(writer.write_u32(self.pow.0[n])); + try_o!(writer.write_u32(self.pow.0[n])); } writer.write_u64(self.td) } @@ -102,20 +102,20 @@ pub struct Block { /// block as binary. impl Writeable for Block { fn write(&self, writer: &mut Writer) -> Option { - try_m!(self.header.write(writer)); + try_o!(self.header.write(writer)); ser_multiwrite!(writer, [write_u64, self.inputs.len() as u64], [write_u64, self.outputs.len() as u64], [write_u64, self.proofs.len() as u64]); for inp in &self.inputs { - try_m!(inp.write(writer)); + try_o!(inp.write(writer)); } for out in &self.outputs { - try_m!(out.write(writer)); + try_o!(out.write(writer)); } for proof in &self.proofs { - try_m!(proof.write(writer)); + try_o!(proof.write(writer)); } None } @@ -218,7 +218,7 @@ impl Block { // repeated iterations, revisit if a problem // validate each transaction and gather their proofs - let mut proofs = try_map_vec!(txs, |tx| tx.verify_sig(&secp)); + let mut proofs = try_oap_vec!(txs, |tx| tx.verify_sig(&secp)); proofs.push(reward_proof); // build vectors with all inputs and all outputs, ordering them by hash diff --git a/core/src/core/transaction.rs b/core/src/core/transaction.rs index 0e287613c..98566c83a 100644 --- a/core/src/core/transaction.rs +++ b/core/src/core/transaction.rs @@ -40,7 +40,7 @@ pub struct TxProof { impl Writeable for TxProof { fn write(&self, writer: &mut Writer) -> Option { - try_m!(writer.write_fixed_bytes(&self.remainder)); + try_o!(writer.write_fixed_bytes(&self.remainder)); writer.write_vec(&mut self.sig.clone()) } } @@ -75,10 +75,10 @@ impl Writeable for Transaction { [write_u64, self.inputs.len() as u64], [write_u64, self.outputs.len() as u64]); for inp in &self.inputs { - try_m!(inp.write(writer)); + try_o!(inp.write(writer)); } for out in &self.outputs { - try_m!(out.write(writer)); + try_o!(out.write(writer)); } None } @@ -303,7 +303,7 @@ pub enum Output { /// an Output as binary. impl Writeable for Output { fn write(&self, writer: &mut Writer) -> Option { - try_m!(writer.write_fixed_bytes(&self.commitment().unwrap())); + try_o!(writer.write_fixed_bytes(&self.commitment().unwrap())); writer.write_vec(&mut self.proof().unwrap().bytes().to_vec()) } } diff --git a/core/src/macros.rs b/core/src/macros.rs index aa0db0f83..c2f2588fd 100644 --- a/core/src/macros.rs +++ b/core/src/macros.rs @@ -17,6 +17,7 @@ /// Eliminates some of the verbosity in having iter and collect /// around every map call. +#[macro_export] macro_rules! map_vec { ($thing:expr, $mapfn:expr ) => { $thing.iter() @@ -27,7 +28,8 @@ macro_rules! map_vec { /// Same as map_vec when the map closure returns Results. Makes sure the /// results are "pushed up" and wraps with a try. -macro_rules! try_map_vec { +#[macro_export] +macro_rules! try_oap_vec { ($thing:expr, $mapfn:expr ) => { try!($thing.iter() .map($mapfn) @@ -59,11 +61,11 @@ macro_rules! tee { } } -/// Simple equivalent of try! but for a Maybe. Motivated mostly by the +/// Simple equivalent of try! but for an Option. Motivated mostly by the /// io package and our serialization as an alternative to silly Result<(), /// Error>. #[macro_export] -macro_rules! try_m { +macro_rules! try_o { ($trying:expr) => { let tried = $trying; if let Some(_) = tried { @@ -72,6 +74,17 @@ macro_rules! try_m { } } +#[macro_export] +macro_rules! try_to_o { + ($trying:expr) => {{ + let tried = $trying; + if let Err(e) = tried { + return Some(e); + } + tried.unwrap() + }} +} + /// Eliminate some of the boilerplate of deserialization (package ser) by /// passing just the list of reader function. /// Example before: @@ -86,9 +99,16 @@ macro_rules! ser_multiread { } } +/// Eliminate some of the boilerplate of serialization (package ser) by +/// passing directly pairs of writer function and data to write. +/// Example before: +/// try!(reader.write_u64(42)); +/// try!(reader.write_u32(100)); +/// Example after: +/// ser_multiwrite!(writer, [write_u64, 42], [write_u32, 100]); #[macro_export] macro_rules! ser_multiwrite { ($wrtr:ident, $([ $write_call:ident, $val:expr ]),* ) => { - $( try_m!($wrtr.$write_call($val)) );* + $( try_o!($wrtr.$write_call($val)) );* } } diff --git a/core/src/pow/mod.rs b/core/src/pow/mod.rs index df1808666..b06383aea 100644 --- a/core/src/pow/mod.rs +++ b/core/src/pow/mod.rs @@ -72,15 +72,15 @@ struct PowHeader { /// to make incrementing from the serialized form trivial. impl Writeable for PowHeader { fn write(&self, writer: &mut Writer) -> Option { - try_m!(writer.write_u64(self.nonce)); - try_m!(writer.write_u64(self.height)); - try_m!(writer.write_fixed_bytes(&self.previous)); - try_m!(writer.write_i64(self.timestamp.to_timespec().sec)); - try_m!(writer.write_fixed_bytes(&self.utxo_merkle)); - try_m!(writer.write_fixed_bytes(&self.tx_merkle)); - try_m!(writer.write_u64(self.total_fees)); - try_m!(writer.write_u64(self.n_in)); - try_m!(writer.write_u64(self.n_out)); + try_o!(writer.write_u64(self.nonce)); + try_o!(writer.write_u64(self.height)); + try_o!(writer.write_fixed_bytes(&self.previous)); + try_o!(writer.write_i64(self.timestamp.to_timespec().sec)); + try_o!(writer.write_fixed_bytes(&self.utxo_merkle)); + try_o!(writer.write_fixed_bytes(&self.tx_merkle)); + try_o!(writer.write_u64(self.total_fees)); + try_o!(writer.write_u64(self.n_in)); + try_o!(writer.write_u64(self.n_out)); writer.write_u64(self.n_proofs) } } diff --git a/core/src/ser.rs b/core/src/ser.rs index 94c84a873..99680c5f7 100644 --- a/core/src/ser.rs +++ b/core/src/ser.rs @@ -51,6 +51,8 @@ pub trait AsFixedBytes { pub trait Writer { /// Writes a u8 as bytes fn write_u8(&mut self, n: u8) -> Option; + /// Writes a u16 as bytes + fn write_u16(&mut self, n: u16) -> Option; /// Writes a u32 as bytes fn write_u32(&mut self, n: u32) -> Option; /// Writes a u64 as bytes @@ -70,6 +72,8 @@ pub trait Writer { pub trait Reader { /// Read a u8 from the underlying Read fn read_u8(&mut self) -> Result; + /// Read a u16 from the underlying Read + fn read_u16(&mut self) -> Result; /// Read a u32 from the underlying Read fn read_u32(&mut self) -> Result; /// Read a u64 from the underlying Read @@ -137,6 +141,9 @@ impl<'a> Reader for BinReader<'a> { fn read_u8(&mut self) -> Result { self.source.read_u8().map_err(Error::IOErr) } + fn read_u16(&mut self) -> Result { + self.source.read_u16::().map_err(Error::IOErr) + } fn read_u32(&mut self) -> Result { self.source.read_u32::().map_err(Error::IOErr) } @@ -188,6 +195,9 @@ impl<'a> Writer for BinWriter<'a> { fn write_u8(&mut self, n: u8) -> Option { self.sink.write_u8(n).err().map(Error::IOErr) } + fn write_u16(&mut self, n: u16) -> Option { + self.sink.write_u16::(n).err().map(Error::IOErr) + } fn write_u32(&mut self, n: u32) -> Option { self.sink.write_u32::(n).err().map(Error::IOErr) } @@ -202,7 +212,7 @@ impl<'a> Writer for BinWriter<'a> { fn write_vec(&mut self, vec: &mut Vec) -> Option { - try_m!(self.write_u64(vec.len() as u64)); + try_o!(self.write_u64(vec.len() as u64)); self.sink.write_all(vec).err().map(Error::IOErr) } diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 866c13e03..5e82e3867 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -6,6 +6,8 @@ authors = ["Ignotus Peverell "] [dependencies] bitflags = "^0.7.0" byteorder = "^0.5" +log = "^0.3" +rand = "^0.3" mioco = "^0.8" time = "^0.1" diff --git a/p2p/src/handshake.rs b/p2p/src/handshake.rs new file mode 100644 index 000000000..659c571c3 --- /dev/null +++ b/p2p/src/handshake.rs @@ -0,0 +1,140 @@ +// Copyright 2016 The Grin Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::VecDeque; +use std::sync::RwLock; + +use rand::Rng; +use rand::os::OsRng; + +use core::ser::{serialize, deserialize, Error}; +use msg::*; +use types::*; +use protocol::ProtocolV1; +use peer::PeerConn; + +const NONCES_CAP: usize = 100; + +/// Handles the handshake negotiation when two peers connect and decides on +/// protocol. +pub struct Handshake { + /// Ring buffer of nonces sent to detect self connections without requiring + /// a node id. + nonces: RwLock>, +} + +unsafe impl Sync for Handshake{} +unsafe impl Send for Handshake{} + +impl Handshake { + /// Creates a new handshake handler + pub fn new() -> Handshake { + Handshake { nonces: RwLock::new(VecDeque::with_capacity(NONCES_CAP)) } + } + + /// Handles connecting to a new remote peer, starting the version handshake. + pub fn connect<'a>(&'a self, peer: &'a mut PeerConn) -> Result<&Protocol, Error> { + let nonce = self.next_nonce(); + + let sender_addr = SockAddr(peer.local_addr()); + let receiver_addr = SockAddr(peer.peer_addr()); + let opt_err = serialize(peer, + &Hand { + version: PROTOCOL_VERSION, + capabilities: FULL_SYNC, + nonce: nonce, + sender_addr: sender_addr, + receiver_addr: receiver_addr, + user_agent: USER_AGENT.to_string(), + }); + match opt_err { + Some(err) => return Err(err), + None => {} + } + + let shake = try!(deserialize::(peer)); + if shake.version != 1 { + self.close(peer, ErrCodes::UNSUPPORTED_VERSION as u32, + format!("Unsupported version: {}, ours: {})", + shake.version, + PROTOCOL_VERSION)); + return Err(Error::UnexpectedData{expected: vec![PROTOCOL_VERSION as u8], received: vec![shake.version as u8]}); + } + peer.capabilities = shake.capabilities; + peer.user_agent = shake.user_agent; + + // when more than one protocol version is supported, choosing should go here + Ok(&ProtocolV1::new(peer)) + } + + /// Handles receiving a connection from a new remote peer that started the + /// version handshake. + pub fn handshake<'a>(&'a self, peer: &'a mut PeerConn) -> Result<&Protocol, Error> { + let hand = try!(deserialize::(peer)); + if hand.version != 1 { + self.close(peer, ErrCodes::UNSUPPORTED_VERSION as u32, + format!("Unsupported version: {}, ours: {})", + hand.version, + PROTOCOL_VERSION)); + return Err(Error::UnexpectedData{expected: vec![PROTOCOL_VERSION as u8], received: vec![hand.version as u8]}); + } + { + let nonces = self.nonces.read().unwrap(); + if nonces.contains(&hand.nonce) { + return Err(Error::UnexpectedData { + expected: vec![], + received: vec![], + }); + } + } + + peer.capabilities = hand.capabilities; + peer.user_agent = hand.user_agent; + + let opt_err = serialize(peer, + &Shake { + version: PROTOCOL_VERSION, + capabilities: FULL_SYNC, + user_agent: USER_AGENT.to_string(), + }); + match opt_err { + Some(err) => return Err(err), + None => {} + } + + // when more than one protocol version is supported, choosing should go here + Ok(&ProtocolV1::new(peer)) + } + + fn next_nonce(&self) -> u64 { + let mut rng = OsRng::new().unwrap(); + let nonce = rng.next_u64(); + + let mut nonces = self.nonces.write().unwrap(); + nonces.push_back(nonce); + if nonces.len() >= NONCES_CAP { + nonces.pop_front(); + } + nonce + } + + fn close(&self, peer: &mut PeerConn, err_code: u32, explanation: String) { + serialize(peer, + &PeerError { + code: err_code, + message: explanation, + }); + peer.close(); + } +} diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index bbb001763..cf287e34e 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -23,9 +23,17 @@ #[macro_use] extern crate bitflags; +#[macro_use] +extern crate grin_core as core; +#[macro_use] +extern crate log; extern crate mioco; +extern crate rand; +extern crate time; +mod types; mod msg; +mod handshake; +mod protocol; mod server; mod peer; -mod protocol; diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index 268373354..915deef94 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -14,15 +14,20 @@ //! Message types that transit over the network and related serialization code. -use std::net::SocketAddr; +use std::net::*; -use core::ser::{Writeable, Readable, Writer, Reader, Error}; +use core::ser::{self, Writeable, Readable, Writer, Reader, Error}; + +/// Current latest version of the protocol +pub const PROTOCOL_VERSION: u32 = 1; +/// Grin's user agent with current version (TODO externalize) +pub const USER_AGENT: &'static str = "MW/Grin 0.1"; /// Magic number expected in the header of every message const MAGIC: [u8; 2] = [0x1e, 0xc5]; /// Codes for each error that can be produced reading a message. -enum ErrCodes { +pub enum ErrCodes { UNSUPPORTED_VERSION = 100, } @@ -37,14 +42,15 @@ bitflags! { } /// Types of messages -enum Type { - HAND = 1, - SHAKE = 2, - ERROR = 3, - /// Never actually used over the network but used to detect unrecognized - /// types. - /// Increment as needed. - MAX_MSG_TYPE = 4, +#[derive(Clone, Copy)] +pub enum Type { + ERROR, + HAND, + SHAKE, + PING, + PONG, + /// Never used over the network but to detect unrecognized types. + MAX_MSG_TYPE, } /// Header of any protocol message, used to identify incoming messages. @@ -54,8 +60,8 @@ pub struct MsgHeader { } impl MsgHeader { - fn acceptable(&self) -> bool { - msg_type < MAX_MSG_TYPE; + pub fn acceptable(&self) -> bool { + (self.msg_type as u8) < (Type::MAX_MSG_TYPE as u8) } } @@ -74,9 +80,20 @@ impl Readable for MsgHeader { try!(reader.expect_u8(MAGIC[0])); try!(reader.expect_u8(MAGIC[1])); let t = try!(reader.read_u8()); + if t < (Type::MAX_MSG_TYPE as u8) { + return Err(ser::Error::CorruptedData); + } Ok(MsgHeader { magic: MAGIC, - msg_type: t, + msg_type: match t { + // TODO this is rather ugly, think of a better way + 0 => Type::ERROR, + 1 => Type::HAND, + 2 => Type::SHAKE, + 3 => Type::PING, + 4 => Type::PONG, + _ => panic!(), + }, }) } } @@ -84,54 +101,68 @@ impl Readable for MsgHeader { /// First part of a handshake, sender advertises its version and /// characteristics. pub struct Hand { - version: u32, - capabilities: Capabilities, - sender_addr: SocketAddr, - receiver_addr: SocketAddr, - user_agent: String, + /// protocol version of the sender + pub version: u32, + /// capabilities of the sender + pub capabilities: Capabilities, + /// randomly generated for each handshake, helps detect self + pub nonce: u64, + /// network address of the sender + pub sender_addr: SockAddr, + /// network address of the receiver + pub receiver_addr: SockAddr, + /// name of version of the software + pub user_agent: String, } impl Writeable for Hand { fn write(&self, writer: &mut Writer) -> Option { ser_multiwrite!(writer, [write_u32, self.version], - [write_u32, self.capabilities]); - sender_addr.write(writer); - receiver_addr.write(writer); - writer.write_vec(&mut self.user_agent.into_bytes()) + [write_u32, self.capabilities.bits()], + [write_u64, self.nonce]); + self.sender_addr.write(writer); + self.receiver_addr.write(writer); + writer.write_vec(&mut self.user_agent.clone().into_bytes()) } } impl Readable for Hand { fn read(reader: &mut Reader) -> Result { - let (version, capab) = ser_multiread!(reader, read_u32, read_u32); - let sender_addr = SocketAddr::read(reader); - let receiver_addr = SocketAddr::read(reader); - let user_agent = reader.read_vec(); - Hand { + let (version, capab, nonce) = ser_multiread!(reader, read_u32, read_u32, read_u64); + let sender_addr = try!(SockAddr::read(reader)); + let receiver_addr = try!(SockAddr::read(reader)); + let ua = try!(reader.read_vec()); + let user_agent = try!(String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData)); + let capabilities = try!(Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData)); + Ok(Hand { version: version, - capabilities: capab, - server_addr: sender_addr, + capabilities: capabilities, + nonce: nonce, + sender_addr: sender_addr, receiver_addr: receiver_addr, user_agent: user_agent, - } + }) } } /// Second part of a handshake, receiver of the first part replies with its own /// version and characteristics. pub struct Shake { - version: u32, - capabilities: Capabilities, - user_agent: String, + /// sender version + pub version: u32, + /// sender capabilities + pub capabilities: Capabilities, + /// name of version of the software + pub user_agent: String, } -impl Writeable for MsgHeader { +impl Writeable for Shake { fn write(&self, writer: &mut Writer) -> Option { ser_multiwrite!(writer, [write_u32, self.version], - [write_u32, self.capabilities], - [write_vec, self.user_agent.as_mut_vec()]); + [write_u32, self.capabilities.bits()], + [write_vec, &mut self.user_agent.as_bytes().to_vec()]); None } } @@ -139,27 +170,30 @@ impl Writeable for MsgHeader { impl Readable for Shake { fn read(reader: &mut Reader) -> Result { let (version, capab, ua) = ser_multiread!(reader, read_u32, read_u32, read_vec); - let user_agent = try!(String::from_utf8(ua).map_err(|_| ser::Error: CorruptedData)); - Hand { + let user_agent = try!(String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData)); + let capabilities = try!(Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData)); + Ok(Shake { version: version, - capabilities: capab, + capabilities: capabilities, user_agent: user_agent, - } + }) } } /// We found some issue in the communication, sending an error back, usually /// followed by closing the connection. pub struct PeerError { - code: u32, - message: String, + /// error code + pub code: u32, + /// slightly more user friendly message + pub message: String, } impl Writeable for PeerError { fn write(&self, writer: &mut Writer) -> Option { ser_multiwrite!(writer, [write_u32, self.code], - [write_vec, &mut self.message.into_bytes()]); + [write_vec, &mut self.message.clone().into_bytes()]); None } } @@ -167,47 +201,49 @@ impl Writeable for PeerError { impl Readable for PeerError { fn read(reader: &mut Reader) -> Result { let (code, msg) = ser_multiread!(reader, read_u32, read_vec); - let message = try!(String::from_utf8(msg).map_err(|_| ser::Error: CorruptedData)); - PeerError { + let message = try!(String::from_utf8(msg).map_err(|_| ser::Error::CorruptedData)); + Ok(PeerError { code: code, message: message, - } + }) } } -impl Writeable for SocketAddr { +/// Only necessary so we can implement Readable and Writeable. Rust disallows implementing traits when both types are outside of this crate (which is the case for SocketAddr and Readable/Writeable). +pub struct SockAddr(pub SocketAddr); + +impl Writeable for SockAddr { fn write(&self, writer: &mut Writer) -> Option { - match self { - V4(sav4) => { + match self.0 { + SocketAddr::V4(sav4) => { ser_multiwrite!(writer, [write_u8, 0], - [write_fixed_bytes, sav4.ip().octets()], + [write_fixed_bytes, &sav4.ip().octets().to_vec()], [write_u16, sav4.port()]); } - V6(sav6) => { - try_m(writer.write_u8(1)); - for seg in sav6.ip().segments() { - try_m(writer.write_u16(seg)); + SocketAddr::V6(sav6) => { + try_o!(writer.write_u8(1)); + for seg in &sav6.ip().segments() { + try_o!(writer.write_u16(*seg)); } - try_m(writer.write_u16(sav6.port())); + try_o!(writer.write_u16(sav6.port())); } } None } } -impl Readable for SocketAddr { - fn read(reader: &mut Reader) -> Result { - let v4_or_v6 = reader.read_u8(); +impl Readable for SockAddr { + fn read(reader: &mut Reader) -> Result { + let v4_or_v6 = try!(reader.read_u8()); if v4_or_v6 == 0 { - let ip = reader.read_fixed_bytes(4); - let port = reader.read_u16(); - SocketAddrV4::new(Ipv4Addr::new(ip[0], ip[1], ip[2], ip[3]), port) + let ip = try!(reader.read_fixed_bytes(4)); + let port = try!(reader.read_u16()); + Ok(SockAddr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(ip[0], ip[1], ip[2], ip[3]), port)))) } else { - let ip = [0..8].map(|_| reader.read_u16()).collect::>(); - let port = reader.read_u16(); - SocketAddrV6::new(Ipv6Addr::new(ip[0], ip[1], ip[2], ip[3], ip[4], ip[5], ip[6], ip[7]), - port) + let ip = try_oap_vec!([0..8], |_| reader.read_u16()); + let port = try!(reader.read_u16()); + Ok(SockAddr(SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::new(ip[0], ip[1], ip[2], ip[3], ip[4], ip[5], ip[6], ip[7]), port, 0, 0)))) } } } diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 09ea42cea..88a72be74 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -12,117 +12,83 @@ // See the License for the specific language governing permissions and // limitations under the License. -use str::net::SocketAddr; -use std::str::FromStr; -use std::io::{Read, Write}; - -use time::Duration; +use std::net::SocketAddr; +use std::io::{self, Read, Write, BufReader}; use mioco::tcp::{TcpListener, TcpStream, Shutdown}; -use core::ser::{serialize, deserialize}; +use time::Duration; -const PROTOCOL_VERSION: u32 = 1; -const USER_AGENT: &'static str = "MW/Grin 0.1"; +use core::ser::{serialize, deserialize, Error}; +use handshake::Handshake; +use msg::*; +use types::*; /// The local representation of a remotely connected peer. Handles most /// low-level network communication and tracks peer information. -struct Peer { +pub struct PeerConn { conn: TcpStream, - reader: BufReader, - capabilities: Capabilities, - user_agent: String, + reader: BufReader, + pub capabilities: Capabilities, + pub user_agent: String, } /// Make the Peer a Reader for convenient access to the underlying connection. /// Allows the peer to track how much is received. -impl Read for Peer { - fn read(&mut self, buf: &mut [u8]) -> Result { +impl Read for PeerConn { + fn read(&mut self, buf: &mut [u8]) -> io::Result { self.reader.read(buf) } } /// Make the Peer a Writer for convenient access to the underlying connection. /// Allows the peer to track how much is sent. -impl Write for Peer { - fn write(&mut self, buf: &[u8]) -> Result { - self.conf.write(buf) +impl Write for PeerConn { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.conn.write(buf) } + fn flush(&mut self) -> io::Result<()> { + self.conn.flush() + } } -impl Close for Peer { - fn close() { +impl Close for PeerConn { + fn close(&self) { self.conn.shutdown(Shutdown::Both); } } -impl Peer { +impl PeerConn { /// Create a new local peer instance connected to a remote peer with the /// provided TcpStream. - fn new(conn: TcpStream) -> Peer { + pub fn new(conn: TcpStream) -> PeerConn { // don't wait on read for more than 2 seconds by default - conn.set_read_timeout(Some(Duration::seconds(2))); + conn.set_keepalive(Some(2)); - Peer { + PeerConn { conn: conn, reader: BufReader::new(conn), capabilities: UNKNOWN, - user_agent: "", + user_agent: "".to_string(), } } - /// Handles connecting to a new remote peer, starting the version handshake. - fn connect(&mut self) -> Result { - serialize(self.peer, - &Hand { - version: PROTOCOL_VERSION, - capabilities: FULL_SYNC, - sender_addr: listen_addr(), - receiver_addr: self.peer.peer_addr(), - user_agent: USER_AGENT, - }); - let shake = deserialize(self.peer); - if shake.version != 1 { - self.close(ErrCodes::UNSUPPORTED_VERSION, - format!("Unsupported version: {}, ours: {})", - shake.version, - PROTOCOL_VERSION)); - return; - } - self.capabilities = shake.capabilities; - self.user_agent = shake.user_agent; - - // when more than one protocol version is supported, choosing should go here - ProtocolV1::new(&self); + pub fn connect(&mut self, hs: &Handshake, na: &NetAdapter) -> Option { + let mut proto = try_to_o!(hs.connect(self)); + proto.handle(na) } - /// Handles receiving a connection from a new remote peer that started the - /// version handshake. - fn handshake(&mut self) -> Result { - let hand = deserialize(self.peer); - if hand.version != 1 { - self.close(ErrCodes::UNSUPPORTED_VERSION, - format!("Unsupported version: {}, ours: {})", - hand.version, - PROTOCOL_VERSION)); - return; - } - - self.peer.capabilities = hand.capabilities; - self.peer.user_agent = hand.user_agent; - - serialize(self.peer, - &Shake { - version: PROTOCOL_VERSION, - capabilities: FULL_SYNC, - user_agent: USER_AGENT, - }); - self.accept_loop(); - - // when more than one protocol version is supported, choosing should go here - ProtocolV1::new(&self); - } - - fn peer_addr(&self) -> SocketAddr { - self.conn.peer_addr() + pub fn handshake(&mut self, hs: &Handshake, na: &NetAdapter) -> Option { + let mut proto = try_to_o!(hs.handshake(self)); + proto.handle(na) + } +} + +impl PeerInfo for PeerConn { + fn peer_addr(&self) -> SocketAddr { + self.conn.peer_addr().unwrap() + } + fn local_addr(&self) -> SocketAddr { + // TODO likely not exactly what we want (private vs public IP) + self.conn.local_addr().unwrap() } } diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index 57db11303..fe6a02912 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -12,20 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -use types::*; use core::ser; +use msg::*; +use types::*; +use peer::PeerConn; -pub struct ProtocolV1 { - comm: &mut Comm, +pub struct ProtocolV1<'a> { + peer: &'a mut PeerConn, } -impl Protocol for ProtocolV1 { - fn new(p: &mut Comm) -> Protocol { - Protocol { comm: p } - } - fn handle(&self, server: &Server) { +impl<'a> Protocol for ProtocolV1<'a> { + fn handle(&mut self, server: &NetAdapter) -> Option { loop { - let header = ser::deserialize::(); + let header = try_to_o!(ser::deserialize::(self.peer)); if !header.acceptable() { continue; } @@ -33,13 +32,17 @@ impl Protocol for ProtocolV1 { } } -impl ProtocolV1 { - fn close(err_code: u32, explanation: &'static str) { - serialize(self.peer, - &Err { +impl<'a> ProtocolV1<'a> { + pub fn new(p: &mut PeerConn) -> ProtocolV1 { + ProtocolV1{peer: p} + } + + fn close(&mut self, err_code: u32, explanation: &'static str) { + ser::serialize(self.peer, + &PeerError { code: err_code, - message: explanation, + message: explanation.to_string(), }); - self.comm.close(); + self.peer.close(); } } diff --git a/p2p/src/server.rs b/p2p/src/server.rs index 274652d8a..d69c31413 100644 --- a/p2p/src/server.rs +++ b/p2p/src/server.rs @@ -15,10 +15,18 @@ //! Grin server implementation, accepts incoming connections and connects to //! other peers in the network. +use std::io; +use std::net::SocketAddr; +use std::str::FromStr; +use std::sync::Arc; + +use mioco; use mioco::tcp::{TcpListener, TcpStream, Shutdown}; use core::ser::{serialize, deserialize}; -use msg::*; +use handshake::Handshake; +use peer::PeerConn; +use types::*; const DEFAULT_LISTEN_ADDR: &'static str = "127.0.0.1:555"; @@ -27,6 +35,9 @@ fn listen_addr() -> SocketAddr { FromStr::from_str(DEFAULT_LISTEN_ADDR).unwrap() } +struct DummyAdapter {} +impl NetAdapter for DummyAdapter {} + pub struct Server { } @@ -35,18 +46,28 @@ impl Server { /// connections and starts the bootstrapping process to find peers. pub fn new() -> Server { mioco::start(|| -> io::Result<()> { + // TODO SSL let addr = "127.0.0.1:3414".parse().unwrap(); let listener = try!(TcpListener::bind(&addr)); info!("P2P server started on {}", addr); + let hs = Arc::new(Handshake::new()); + loop { let mut conn = try!(listener.accept()); + let hs_child = hs.clone(); + mioco::spawn(move || -> io::Result<()> { - Peer::new(conn).handshake(); + let ret = PeerConn::new(conn).handshake(&hs_child, &DummyAdapter {}); + if let Some(err) = ret { + error!("{:?}", err); + } + Ok(()) }); } }) .unwrap() .unwrap(); + Server{} } } diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 03b751df8..0f13b5437 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -13,21 +13,30 @@ // limitations under the License. use std::io::{Read, Write}; +use std::net::SocketAddr; +use core::ser::Error; /// Trait for pre-emptively and forcefully closing an underlying resource. -trait Close { - fn close(); +pub trait Close { + fn close(&self); } -/// Main trait we expect a peer to implement to be usable by a Protocol. -trait Comm : Read + Write + Close; - -/// A given communication protocol agreed upon between 2 peers (usually ourselves and a remove) after handshake. -trait Protocol { - /// Instantiate providing a reader and writer allowing to communicate to the other peer. - fn new(p: &mut Comm); - - /// Starts handling protocol communication, the peer(s) is expected to be known already, usually passed during construction. - fn handle(&self, server: &Server); +/// General information about a connected peer that's useful to other modules. +pub trait PeerInfo { + /// Address of the remote peer + fn peer_addr(&self) -> SocketAddr; + /// Our address, communicated to other peers + fn local_addr(&self) -> SocketAddr; } +/// A given communication protocol agreed upon between 2 peers (usually +/// ourselves and a remove) after handshake. +pub trait Protocol { + /// Starts handling protocol communication, the peer(s) is expected to be + /// known already, usually passed during construction. + fn handle(&mut self, na: &NetAdapter) -> Option; +} + +/// Bridge between the networking layer and the rest of the system. Handles the +/// forwarding or querying of blocks and transactions among other things. +pub trait NetAdapter {}