From ee6fcab8db54b2d6938e75ce9a47577a41c7636a Mon Sep 17 00:00:00 2001 From: Ignotus Peverell Date: Sun, 30 Oct 2016 18:23:52 -0700 Subject: [PATCH] Proper server and protocol event loop. Channels for shutdown and ping/pong trivial roundtrip. Working test. --- p2p/src/handshake.rs | 104 +++++++++++++++++++------------------- p2p/src/lib.rs | 2 +- p2p/src/msg.rs | 39 ++++++++------ p2p/src/peer.rs | 46 +++++++++-------- p2p/src/protocol.rs | 92 ++++++++++++++++++++++++++------- p2p/src/server.rs | 94 +++++++++++++++++++++------------- p2p/src/types.rs | 23 ++++++--- p2p/tests/network_conn.rs | 24 +++++++-- 8 files changed, 268 insertions(+), 156 deletions(-) diff --git a/p2p/src/handshake.rs b/p2p/src/handshake.rs index 983827a39..24305c8b4 100644 --- a/p2p/src/handshake.rs +++ b/p2p/src/handshake.rs @@ -79,12 +79,12 @@ impl Handshake { }); } - let peer_info = PeerInfo{ - capabilities: shake.capabilities, - user_agent: shake.user_agent, - addr: receiver_addr, - version: shake.version, - }; + let peer_info = PeerInfo { + capabilities: shake.capabilities, + user_agent: shake.user_agent, + addr: receiver_addr, + version: shake.version, + }; info!("Connected to peer {:?}", peer_info); // when more than one protocol version is supported, choosing should go here @@ -94,54 +94,54 @@ impl Handshake { /// Handles receiving a connection from a new remote peer that started the /// version handshake. pub fn handshake(&self, mut conn: TcpStream) -> Result<(Box, PeerInfo), Error> { - // deserialize first part of handshake sent to us and do version negotiation - let hand = try!(deserialize::(&mut conn)); - if hand.version != 1 { - self.close(&mut conn, - ErrCodes::UnsupportedVersion as u32, - format!("Unsupported version: {}, ours: {})", - hand.version, - PROTOCOL_VERSION)); - return Err(Error::UnexpectedData { - expected: vec![PROTOCOL_VERSION as u8], - received: vec![hand.version as u8], - }); - } - { - // check the nonce to see if we could be trying to connect to ourselves - let nonces = self.nonces.read().unwrap(); - if nonces.contains(&hand.nonce) { - return Err(Error::UnexpectedData { - expected: vec![], - received: vec![], - }); - } - } - - // all good, keep peer info - let peer_info = PeerInfo{ - capabilities: hand.capabilities, - user_agent: hand.user_agent, - addr: conn.peer_addr().unwrap(), - version: hand.version, - }; + // deserialize first part of handshake sent to us and do version negotiation + let hand = try!(deserialize::(&mut conn)); + if hand.version != 1 { + self.close(&mut conn, + ErrCodes::UnsupportedVersion as u32, + format!("Unsupported version: {}, ours: {})", + hand.version, + PROTOCOL_VERSION)); + return Err(Error::UnexpectedData { + expected: vec![PROTOCOL_VERSION as u8], + received: vec![hand.version as u8], + }); + } + { + // check the nonce to see if we could be trying to connect to ourselves + let nonces = self.nonces.read().unwrap(); + if nonces.contains(&hand.nonce) { + return Err(Error::UnexpectedData { + expected: vec![], + received: vec![], + }); + } + } - // send our reply with our info - let opt_err = serialize(&mut conn, - &Shake { - version: PROTOCOL_VERSION, - capabilities: FULL_SYNC, - user_agent: USER_AGENT.to_string(), - }); - match opt_err { - Some(err) => return Err(err), - None => {} - } - - info!("Received connection from peer {:?}", peer_info); - // when more than one protocol version is supported, choosing should go here + // all good, keep peer info + let peer_info = PeerInfo { + capabilities: hand.capabilities, + user_agent: hand.user_agent, + addr: conn.peer_addr().unwrap(), + version: hand.version, + }; + + // send our reply with our info + let opt_err = serialize(&mut conn, + &Shake { + version: PROTOCOL_VERSION, + capabilities: FULL_SYNC, + user_agent: USER_AGENT.to_string(), + }); + match opt_err { + Some(err) => return Err(err), + None => {} + } + + info!("Received connection from peer {:?}", peer_info); + // when more than one protocol version is supported, choosing should go here Ok((Box::new(ProtocolV1::new(conn)), peer_info)) - } + } /// Generate a new random nonce and store it in our ring buffer fn next_nonce(&self) -> u64 { diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index bd51f079d..ed1dd38d5 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -39,5 +39,5 @@ mod protocol; mod server; mod peer; -pub use server::Server; +pub use server::{Server, DummyAdapter}; pub use server::DEFAULT_LISTEN_ADDR; diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index d5f21aa50..8a283be95 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -36,13 +36,13 @@ pub enum ErrCodes { /// Types of messages #[derive(Clone, Copy)] pub enum Type { - ERROR, - HAND, - SHAKE, - PING, - PONG, - GETPEERADDRS, - PEERADDRS, + Error, + Hand, + Shake, + Ping, + Pong, + GetPeerAddrs, + PeerAddrs, /// Never used over the network but to detect unrecognized types. MaxMsgType, } @@ -50,10 +50,17 @@ pub enum Type { /// Header of any protocol message, used to identify incoming messages. pub struct MsgHeader { magic: [u8; 2], - msg_type: Type, + pub msg_type: Type, } impl MsgHeader { + pub fn new(msg_type: Type) -> MsgHeader { + MsgHeader { + magic: MAGIC, + msg_type: msg_type, + } + } + pub fn acceptable(&self) -> bool { (self.msg_type as u8) < (Type::MaxMsgType as u8) } @@ -74,20 +81,20 @@ impl Readable for MsgHeader { try!(reader.expect_u8(MAGIC[0])); try!(reader.expect_u8(MAGIC[1])); let t = try!(reader.read_u8()); - if t < (Type::MaxMsgType as u8) { + if t >= (Type::MaxMsgType as u8) { return Err(ser::Error::CorruptedData); } Ok(MsgHeader { magic: MAGIC, msg_type: match t { // TODO this is rather ugly, think of a better way - 0 => Type::ERROR, - 1 => Type::HAND, - 2 => Type::SHAKE, - 3 => Type::PING, - 4 => Type::PONG, - 5 => Type::GETPEERADDRS, - 6 => Type::PEERADDRS, + 0 => Type::Error, + 1 => Type::Hand, + 2 => Type::Shake, + 3 => Type::Ping, + 4 => Type::Pong, + 5 => Type::GetPeerAddrs, + 6 => Type::PeerAddrs, _ => panic!(), }, }) diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 2ea61201a..69440d4d8 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -16,40 +16,46 @@ use mioco::tcp::TcpStream; use core::ser::Error; use handshake::Handshake; -use msg::*; use types::*; pub struct Peer { - info: PeerInfo, - proto: Box, + info: PeerInfo, + proto: Box, } unsafe impl Sync for Peer {} unsafe impl Send for Peer {} impl Peer { - pub fn connect(conn: TcpStream, hs: &Handshake) -> Result { let (proto, info) = try!(hs.connect(conn)); - Ok(Peer{ - info: info, - proto: proto, - }) - } + Ok(Peer { + info: info, + proto: proto, + }) + } pub fn accept(conn: TcpStream, hs: &Handshake) -> Result { let (proto, info) = try!(hs.handshake(conn)); - Ok(Peer{ - info: info, - proto: proto, - }) - } + Ok(Peer { + info: info, + proto: proto, + }) + } - pub fn run(&self, na: &NetAdapter) -> Option { - self.proto.handle(na) - } + pub fn run(&self, na: &NetAdapter) -> Option { + self.proto.handle(na) + } - pub fn stop(&self) { - self.proto.as_ref().close() - } + pub fn send_ping(&self) -> Option { + self.proto.send_ping() + } + + pub fn sent_bytes(&self) -> u64 { + self.proto.sent_bytes() + } + + pub fn stop(&self) { + self.proto.as_ref().close() + } } diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index 64c8c1ee5..8f5d9652c 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -13,8 +13,10 @@ // limitations under the License. use std::cell::RefCell; -use std::ops::DerefMut; +use std::io::{Read, Write}; +use std::ops::{Deref, DerefMut}; use std::rc::Rc; +use std::sync::Mutex; use mioco; use mioco::sync::mpsc::{sync_channel, SyncSender}; @@ -25,51 +27,103 @@ use handshake::Handshake; use msg::*; use types::*; +/// First version of our communication protocol. Manages the underlying +/// connection, listening to incoming messages and transmitting outgoing ones. pub struct ProtocolV1 { + // The underlying tcp connection. conn: RefCell, - //msg_send: Option>, - stop_send: RefCell>>, + // Send channel for the rest of the local system to send messages to the peer we're connected to. + msg_send: RefCell>>>, + // Stop channel to exit the send/listen loop. + stop_send: RefCell>>, + // Used both to count the amount of data sent and lock writing to the conn. We can't wrap conn with + // the lock as we're always listening to receive. + sent_bytes: Mutex, } impl Protocol for ProtocolV1 { + /// Main protocol connection handling loop, starts listening to incoming + /// messages and transmitting messages the rest of the local system wants + /// to send. Must be called before any interaction with a protocol instance + /// and should only be called once. Will block so also needs to be called + /// within a coroutine. fn handle(&self, server: &NetAdapter) -> Option { - // setup channels so we can switch between reads, writes and close - let (msg_send, msg_recv) = sync_channel(10); - let (stop_send, stop_recv) = sync_channel(1); + // setup channels so we can switch between reads, writes and close + let (msg_send, msg_recv) = sync_channel(10); + let (stop_send, stop_recv) = sync_channel(1); + { + let mut msg_mut = self.msg_send.borrow_mut(); + *msg_mut = Some(msg_send); + let mut stop_mut = self.stop_send.borrow_mut(); + *stop_mut = Some(stop_send); + } - //self.msg_send = Some(msg_send); - let mut stop_mut = self.stop_send.borrow_mut(); - *stop_mut = Some(stop_send); - - let mut conn = self.conn.borrow_mut(); + let mut conn = self.conn.borrow_mut(); loop { - select!( + // main select loop, switches between listening, sending or stopping + select!( r:conn => { + // deser the header ot get the message type let header = try_to_o!(ser::deserialize::(conn.deref_mut())); if !header.acceptable() { continue; + } + // check the message and hopefully do what's expected + match header.msg_type { + Type::Ping => { + // respond with pong + let data = try_to_o!(ser::ser_vec(&MsgHeader::new(Type::Pong))); + let mut sent_bytes = self.sent_bytes.lock().unwrap(); + *sent_bytes += data.len() as u64; + try_to_o!(conn.deref_mut().write_all(&data[..]).map_err(&ser::Error::IOErr)); + }, + Type::Pong => {}, + _ => error!("uncaught unknown"), } }, r:msg_recv => { - ser::serialize(conn.deref_mut(), msg_recv.recv().unwrap()); + // relay a message originated from the rest of the local system + let data = &msg_recv.recv().unwrap()[..]; + let mut sent_bytes = self.sent_bytes.lock().unwrap(); + *sent_bytes += data.len() as u64; + try_to_o!(conn.deref_mut().write_all(data).map_err(&ser::Error::IOErr)); }, r:stop_recv => { + // shuts the connection don and end the loop stop_recv.recv(); conn.shutdown(Shutdown::Both); - return None;; + return None; } ); } } - fn close(&self) { - let stop_send = self.stop_send.borrow(); - stop_send.as_ref().unwrap().send(0); - } + /// Sends a ping message to the remote peer. Will panic if handle has never + /// been called on this protocol. + fn send_ping(&self) -> Option { + let data = try_to_o!(ser::ser_vec(&MsgHeader::new(Type::Ping))); + let msg_send = self.msg_send.borrow(); + msg_send.as_ref().unwrap().send(data); + None + } + + fn sent_bytes(&self) -> u64 { + *self.sent_bytes.lock().unwrap().deref() + } + + fn close(&self) { + let stop_send = self.stop_send.borrow(); + stop_send.as_ref().unwrap().send(0); + } } impl ProtocolV1 { pub fn new(conn: TcpStream) -> ProtocolV1 { - ProtocolV1 { conn: RefCell::new(conn), /* msg_send: None, */ stop_send: RefCell::new(None) } + ProtocolV1 { + conn: RefCell::new(conn), + msg_send: RefCell::new(None), + stop_send: RefCell::new(None), + sent_bytes: Mutex::new(0), + } } } diff --git a/p2p/src/server.rs b/p2p/src/server.rs index d8ec1b033..65458e0cb 100644 --- a/p2p/src/server.rs +++ b/p2p/src/server.rs @@ -23,6 +23,7 @@ use std::str::FromStr; use std::sync::{Arc, RwLock}; use mioco; +use mioco::sync::mpsc::{sync_channel, SyncSender}; use mioco::tcp::{TcpListener, TcpStream}; use core::ser::Error; @@ -37,58 +38,79 @@ fn listen_addr() -> SocketAddr { FromStr::from_str(DEFAULT_LISTEN_ADDR).unwrap() } -struct DummyAdapter {} +pub struct DummyAdapter {} impl NetAdapter for DummyAdapter {} /// P2P server implementation, handling bootstrapping to find and connect to /// peers, receiving connections from other peers and keep track of all of them. pub struct Server { - peers: RwLock>>, + peers: RwLock>>, + stop_send: RefCell>>, } +unsafe impl Sync for Server {} +unsafe impl Send for Server {} + // TODO TLS impl Server { - pub fn new() -> Server { - Server{peers: RwLock::new(Vec::new())} - } - /// Creates a new p2p server. Opens a TCP port to allow incoming + /// Creates a new idle p2p server with no peers + pub fn new() -> Server { + Server { + peers: RwLock::new(Vec::new()), + stop_send: RefCell::new(None), + } + } + /// Starts the p2p server. Opens a TCP port to allow incoming /// connections and starts the bootstrapping process to find peers. - pub fn start(& self) -> Result<(), Error> { - mioco::spawn(move || -> io::Result<()> { - let addr = DEFAULT_LISTEN_ADDR.parse().unwrap(); - let listener = try!(TcpListener::bind(&addr)); - warn!("P2P server started on {}", addr); + pub fn start(&self) -> Result<(), Error> { + let addr = DEFAULT_LISTEN_ADDR.parse().unwrap(); + let listener = try!(TcpListener::bind(&addr).map_err(&Error::IOErr)); + warn!("P2P server started on {}", addr); - let hs = Arc::new(Handshake::new()); + let hs = Arc::new(Handshake::new()); + let (stop_send, stop_recv) = sync_channel(1); + { + let mut stop_mut = self.stop_send.borrow_mut(); + *stop_mut = Some(stop_send); + } - loop { - let conn = try!(listener.accept()); - let hs = hs.clone(); + loop { + select!( + r:listener => { + let conn = try!(listener.accept().map_err(&Error::IOErr)); + let hs = hs.clone(); - mioco::spawn(move || -> io::Result<()> { - let peer = try!(Peer::connect(conn, &hs).map_err(|_| io::Error::last_os_error())); + let peer = try!(Peer::connect(conn, &hs)); let wpeer = Arc::new(peer); - // { - // let mut peers = self.peers.write().unwrap(); - // peers.push(wpeer.clone()); - // } - if let Some(err) = wpeer.run(&DummyAdapter{}) { - error!("{:?}", err); - } - Ok(()) - }); - } - Ok(()) - }); - Ok(()) + { + let mut peers = self.peers.write().unwrap(); + peers.push(wpeer.clone()); + } + + mioco::spawn(move || -> io::Result<()> { + if let Some(err) = wpeer.run(&DummyAdapter{}) { + error!("{:?}", err); + } + Ok(()) + }); + }, + r:stop_recv => { + stop_recv.recv(); + return Ok(()); + } + ); + } } - pub fn stop(&self) { - let peers = self.peers.write().unwrap(); - for p in peers.deref() { - p.stop(); - } - } + /// Stops the server. Disconnect from all peers at the same time. + pub fn stop(&self) { + let peers = self.peers.write().unwrap(); + for p in peers.deref() { + p.stop(); + } + let stop_send = self.stop_send.borrow(); + stop_send.as_ref().unwrap().send(0); + } /// Simulates an unrelated client connecting to our server. Mostly used for /// tests. diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 37abbd930..6c350a00c 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::io::{Read, Write}; use std::net::SocketAddr; use core::ser::Error; @@ -31,18 +30,28 @@ bitflags! { pub struct PeerInfo { pub capabilities: Capabilities, pub user_agent: String, - pub version: u32, - pub addr: SocketAddr, + pub version: u32, + pub addr: SocketAddr, } /// A given communication protocol agreed upon between 2 peers (usually -/// ourselves and a remove) after handshake. +/// ourselves and a remote) after handshake. This trait is necessary to allow +/// protocol negotiation as it gets upgraded to multiple versions. pub trait Protocol { - /// Starts handling protocol communication, the peer(s) is expected to be - /// known already, usually passed during construction. + /// Starts handling protocol communication, the connection) is expected to + /// be known already, usually passed during construction. Will typically + /// block so needs to be called withing a coroutine. Should also be called + /// only once. fn handle(&self, na: &NetAdapter) -> Option; - fn close(&self); + /// Sends a ping message to the remote peer. + fn send_ping(&self) -> Option; + + /// How many bytes have been sent to the remote peer. + fn sent_bytes(&self) -> u64; + + /// Close the connection to the remote peer. + fn close(&self); } /// Bridge between the networking layer and the rest of the system. Handles the diff --git a/p2p/tests/network_conn.rs b/p2p/tests/network_conn.rs index 44d4c66ba..17929763f 100644 --- a/p2p/tests/network_conn.rs +++ b/p2p/tests/network_conn.rs @@ -17,7 +17,7 @@ extern crate mioco; extern crate env_logger; use std::io; -use std::thread; +use std::sync::Arc; use std::time; #[test] @@ -25,16 +25,30 @@ fn peer_handshake() { env_logger::init().unwrap(); mioco::start(|| -> io::Result<()> { - let server = p2p::Server::new(); - server.start(); + let server = Arc::new(p2p::Server::new()); + let in_server = server.clone(); + mioco::spawn(move || -> io::Result<()> { + try!(in_server.start()); + Ok(()) + }); // given server a little time to start mioco::sleep(time::Duration::from_millis(200)); let addr = p2p::DEFAULT_LISTEN_ADDR.parse().unwrap(); - try!(p2p::Server::connect_as_client(addr).map_err(|_| io::Error::last_os_error())); + let peer = try!(p2p::Server::connect_as_client(addr).map_err(|_| io::Error::last_os_error())); + let peer = Arc::new(peer); + let in_peer = peer.clone(); + mioco::spawn(move || -> io::Result<()> { + in_peer.run(&p2p::DummyAdapter{}); + Ok(()) + }); + mioco::sleep(time::Duration::from_millis(100)); + peer.send_ping(); + mioco::sleep(time::Duration::from_millis(100)); + assert!(peer.sent_bytes() > 0); server.stop(); - mioco::shutdown(); + Ok(()) }).unwrap().unwrap(); }