diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 5e82e3867..80af62f02 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -12,3 +12,6 @@ mioco = "^0.8" time = "^0.1" grin_core = { path = "../core" } + +[dev-dependencies] +env_logger = "^0.3" diff --git a/p2p/src/handshake.rs b/p2p/src/handshake.rs index 3f517914a..0629c109d 100644 --- a/p2p/src/handshake.rs +++ b/p2p/src/handshake.rs @@ -34,8 +34,8 @@ pub struct Handshake { nonces: RwLock>, } -unsafe impl Sync for Handshake{} -unsafe impl Send for Handshake{} +unsafe impl Sync for Handshake {} +unsafe impl Send for Handshake {} impl Handshake { /// Creates a new handshake handler @@ -44,87 +44,97 @@ impl Handshake { } /// Handles connecting to a new remote peer, starting the version handshake. - pub fn connect<'a>(&'a self, peer: &'a mut PeerConn) -> Result, Error> { - // get a new nonce that can be used on handshake to detect self-connection + pub fn connect<'a>(&'a self, peer: &'a mut PeerConn) -> Result, Error> { + // get a new nonce that can be used on handshake to detect self-connection let nonce = self.next_nonce(); - // send the first part of the handshake - let sender_addr = SockAddr(peer.local_addr()); - let receiver_addr = SockAddr(peer.peer_addr()); + // send the first part of the handshake + let sender_addr = SockAddr(peer.local_addr()); + let receiver_addr = SockAddr(peer.peer_addr()); let opt_err = serialize(peer, - &Hand { - version: PROTOCOL_VERSION, - capabilities: FULL_SYNC, - nonce: nonce, - sender_addr: sender_addr, - receiver_addr: receiver_addr, - user_agent: USER_AGENT.to_string(), - }); - match opt_err { - Some(err) => return Err(err), - None => {} - } + &Hand { + version: PROTOCOL_VERSION, + capabilities: FULL_SYNC, + nonce: nonce, + sender_addr: sender_addr, + receiver_addr: receiver_addr, + user_agent: USER_AGENT.to_string(), + }); + match opt_err { + Some(err) => return Err(err), + None => {} + } - // deserialize the handshake response and do version negotiation + // deserialize the handshake response and do version negotiation let shake = try!(deserialize::(peer)); if shake.version != 1 { - self.close(peer, ErrCodes::UnsupportedVersion as u32, + self.close(peer, + ErrCodes::UnsupportedVersion as u32, format!("Unsupported version: {}, ours: {})", shake.version, PROTOCOL_VERSION)); - return Err(Error::UnexpectedData{expected: vec![PROTOCOL_VERSION as u8], received: vec![shake.version as u8]}); + return Err(Error::UnexpectedData { + expected: vec![PROTOCOL_VERSION as u8], + received: vec![shake.version as u8], + }); } peer.capabilities = shake.capabilities; peer.user_agent = shake.user_agent; - + + info!("Connected to peer {}", peer); // when more than one protocol version is supported, choosing should go here Ok(Box::new(ProtocolV1::new(peer))) } /// Handles receiving a connection from a new remote peer that started the /// version handshake. - pub fn handshake<'a>(&'a self, peer: &'a mut PeerConn) -> Result, Error> { - // deserialize first part of handshake sent to us and do version negotiation + pub fn handshake<'a>(&'a self, peer: &'a mut PeerConn) -> Result, Error> { + // deserialize first part of handshake sent to us and do version negotiation let hand = try!(deserialize::(peer)); if hand.version != 1 { - self.close(peer, ErrCodes::UnsupportedVersion as u32, + self.close(peer, + ErrCodes::UnsupportedVersion as u32, format!("Unsupported version: {}, ours: {})", hand.version, PROTOCOL_VERSION)); - return Err(Error::UnexpectedData{expected: vec![PROTOCOL_VERSION as u8], received: vec![hand.version as u8]}); + return Err(Error::UnexpectedData { + expected: vec![PROTOCOL_VERSION as u8], + received: vec![hand.version as u8], + }); + } + { + // check the nonce to see if we could be trying to connect to ourselves + let nonces = self.nonces.read().unwrap(); + if nonces.contains(&hand.nonce) { + return Err(Error::UnexpectedData { + expected: vec![], + received: vec![], + }); + } } - { - // check the nonce to see if we could be trying to connect to ourselves - let nonces = self.nonces.read().unwrap(); - if nonces.contains(&hand.nonce) { - return Err(Error::UnexpectedData { - expected: vec![], - received: vec![], - }); - } - } - // all good, keep peer info + // all good, keep peer info peer.capabilities = hand.capabilities; peer.user_agent = hand.user_agent; - // send our reply with our info + // send our reply with our info let opt_err = serialize(peer, - &Shake { - version: PROTOCOL_VERSION, - capabilities: FULL_SYNC, - user_agent: USER_AGENT.to_string(), - }); - match opt_err { - Some(err) => return Err(err), - None => {} - } + &Shake { + version: PROTOCOL_VERSION, + capabilities: FULL_SYNC, + user_agent: USER_AGENT.to_string(), + }); + match opt_err { + Some(err) => return Err(err), + None => {} + } + info!("Received connection from peer {}", peer); // when more than one protocol version is supported, choosing should go here Ok(Box::new(ProtocolV1::new(peer))) } - /// Generate a new random nonce and store it in our ring buffer + /// Generate a new random nonce and store it in our ring buffer fn next_nonce(&self) -> u64 { let mut rng = OsRng::new().unwrap(); let nonce = rng.next_u64(); diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index cf287e34e..02904c23e 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -37,3 +37,6 @@ mod handshake; mod protocol; mod server; mod peer; + +pub use server::Server; +pub use server::DEFAULT_LISTEN_ADDR; diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index 0488e033e..8e120e136 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -80,20 +80,20 @@ impl Readable for MsgHeader { try!(reader.expect_u8(MAGIC[0])); try!(reader.expect_u8(MAGIC[1])); let t = try!(reader.read_u8()); - if t < (Type::MaxMsgType as u8) { - return Err(ser::Error::CorruptedData); - } + if t < (Type::MaxMsgType as u8) { + return Err(ser::Error::CorruptedData); + } Ok(MsgHeader { magic: MAGIC, msg_type: match t { - // TODO this is rather ugly, think of a better way - 0 => Type::ERROR, - 1 => Type::HAND, - 2 => Type::SHAKE, - 3 => Type::PING, - 4 => Type::PONG, - _ => panic!(), - }, + // TODO this is rather ugly, think of a better way + 0 => Type::ERROR, + 1 => Type::HAND, + 2 => Type::SHAKE, + 3 => Type::PING, + 4 => Type::PONG, + _ => panic!(), + }, }) } } @@ -120,7 +120,7 @@ impl Writeable for Hand { ser_multiwrite!(writer, [write_u32, self.version], [write_u32, self.capabilities.bits()], - [write_u64, self.nonce]); + [write_u64, self.nonce]); self.sender_addr.write(writer); self.receiver_addr.write(writer); writer.write_vec(&mut self.user_agent.clone().into_bytes()) @@ -133,12 +133,12 @@ impl Readable for Hand { let sender_addr = try!(SockAddr::read(reader)); let receiver_addr = try!(SockAddr::read(reader)); let ua = try!(reader.read_vec()); - let user_agent = try!(String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData)); - let capabilities = try!(Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData)); + let user_agent = try!(String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData)); + let capabilities = try!(Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData)); Ok(Hand { version: version, capabilities: capabilities, - nonce: nonce, + nonce: nonce, sender_addr: sender_addr, receiver_addr: receiver_addr, user_agent: user_agent, @@ -171,7 +171,7 @@ impl Readable for Shake { fn read(reader: &mut Reader) -> Result { let (version, capab, ua) = ser_multiread!(reader, read_u32, read_u32, read_vec); let user_agent = try!(String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData)); - let capabilities = try!(Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData)); + let capabilities = try!(Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData)); Ok(Shake { version: version, capabilities: capabilities, @@ -209,7 +209,9 @@ impl Readable for PeerError { } } -/// Only necessary so we can implement Readable and Writeable. Rust disallows implementing traits when both types are outside of this crate (which is the case for SocketAddr and Readable/Writeable). +/// Only necessary so we can implement Readable and Writeable. Rust disallows +/// implementing traits when both types are outside of this crate (which is the +/// case for SocketAddr and Readable/Writeable). pub struct SockAddr(pub SocketAddr); impl Writeable for SockAddr { @@ -239,11 +241,25 @@ impl Readable for SockAddr { if v4_or_v6 == 0 { let ip = try!(reader.read_fixed_bytes(4)); let port = try!(reader.read_u16()); - Ok(SockAddr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(ip[0], ip[1], ip[2], ip[3]), port)))) + Ok(SockAddr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(ip[0], + ip[1], + ip[2], + ip[3]), + port)))) } else { let ip = try_oap_vec!([0..8], |_| reader.read_u16()); let port = try!(reader.read_u16()); - Ok(SockAddr(SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::new(ip[0], ip[1], ip[2], ip[3], ip[4], ip[5], ip[6], ip[7]), port, 0, 0)))) + Ok(SockAddr(SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::new(ip[0], + ip[1], + ip[2], + ip[3], + ip[4], + ip[5], + ip[6], + ip[7]), + port, + 0, + 0)))) } } } diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 0ee26b24c..28ebb9fb2 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -12,8 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::net::SocketAddr; +use std::fmt; use std::io::{self, Read, Write}; +use std::net::SocketAddr; use mioco::tcp::{TcpStream, Shutdown}; @@ -30,6 +31,13 @@ pub struct PeerConn { pub user_agent: String, } +impl fmt::Display for PeerConn { + // This trait requires `fmt` with this exact signature. + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{} {}", self.peer_addr(), self.user_agent) + } +} + /// Make the Peer a Reader for convenient access to the underlying connection. /// Allows the peer to track how much is received. impl Read for PeerConn { @@ -44,9 +52,9 @@ impl Write for PeerConn { fn write(&mut self, buf: &[u8]) -> io::Result { self.conn.write(buf) } - fn flush(&mut self) -> io::Result<()> { - self.conn.flush() - } + fn flush(&mut self) -> io::Result<()> { + self.conn.flush() + } } impl Close for PeerConn { @@ -85,7 +93,7 @@ impl PeerInfo for PeerConn { self.conn.peer_addr().unwrap() } fn local_addr(&self) -> SocketAddr { - // TODO likely not exactly what we want (private vs public IP) + // TODO likely not exactly what we want (private vs public IP) self.conn.local_addr().unwrap() } } diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index fe6a02912..039989ae9 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -33,16 +33,16 @@ impl<'a> Protocol for ProtocolV1<'a> { } impl<'a> ProtocolV1<'a> { - pub fn new(p: &mut PeerConn) -> ProtocolV1 { - ProtocolV1{peer: p} - } + pub fn new(p: &mut PeerConn) -> ProtocolV1 { + ProtocolV1 { peer: p } + } fn close(&mut self, err_code: u32, explanation: &'static str) { ser::serialize(self.peer, - &PeerError { - code: err_code, - message: explanation.to_string(), - }); + &PeerError { + code: err_code, + message: explanation.to_string(), + }); self.peer.close(); } } diff --git a/p2p/src/server.rs b/p2p/src/server.rs index e39c349a5..b6b1652e4 100644 --- a/p2p/src/server.rs +++ b/p2p/src/server.rs @@ -21,13 +21,14 @@ use std::str::FromStr; use std::sync::Arc; use mioco; -use mioco::tcp::TcpListener; +use mioco::tcp::{TcpListener, TcpStream}; +use core::ser::Error; use handshake::Handshake; use peer::PeerConn; use types::*; -const DEFAULT_LISTEN_ADDR: &'static str = "127.0.0.1:555"; +pub const DEFAULT_LISTEN_ADDR: &'static str = "127.0.0.1:3414"; // replace with some config lookup or something fn listen_addr() -> SocketAddr { @@ -43,30 +44,41 @@ pub struct Server { impl Server { /// Creates a new p2p server. Opens a TCP port to allow incoming /// connections and starts the bootstrapping process to find peers. - pub fn new() -> Server { - mioco::start(|| -> io::Result<()> { - // TODO SSL - let addr = "127.0.0.1:3414".parse().unwrap(); - let listener = try!(TcpListener::bind(&addr)); - info!("P2P server started on {}", addr); + pub fn start() -> Result { + // TODO TLS + mioco::spawn(move || -> io::Result<()> { + let addr = DEFAULT_LISTEN_ADDR.parse().unwrap(); + let listener = try!(TcpListener::bind(&addr)); + warn!("P2P server started on {}", addr); - let hs = Arc::new(Handshake::new()); + let hs = Arc::new(Handshake::new()); - loop { - let conn = try!(listener.accept()); - let hs_child = hs.clone(); + loop { + let conn = try!(listener.accept()); + let hs_child = hs.clone(); - mioco::spawn(move || -> io::Result<()> { - let ret = PeerConn::new(conn).handshake(&hs_child, &DummyAdapter {}); - if let Some(err) = ret { - error!("{:?}", err); - } - Ok(()) - }); - } - }) - .unwrap() - .unwrap(); - Server{} + mioco::spawn(move || -> io::Result<()> { + let ret = PeerConn::new(conn).handshake(&hs_child, &DummyAdapter {}); + if let Some(err) = ret { + error!("{:?}", err); + } + Ok(()) + }); + } + Ok(()) + }); + Ok(Server {}) + } + + /// Simulates an unrelated client connecting to our server. Mostly used for + /// tests. + pub fn connect_as_client(addr: SocketAddr) -> Option { + let tcp_client = TcpStream::connect(&addr).unwrap(); + let mut peer = PeerConn::new(tcp_client); + let hs = Handshake::new(); + if let Err(e) = hs.connect(&mut peer) { + return Some(e); + } + None } } diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 0f13b5437..01b213b2a 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -23,9 +23,9 @@ pub trait Close { /// General information about a connected peer that's useful to other modules. pub trait PeerInfo { - /// Address of the remote peer + /// Address of the remote peer fn peer_addr(&self) -> SocketAddr; - /// Our address, communicated to other peers + /// Our address, communicated to other peers fn local_addr(&self) -> SocketAddr; }