Proper server and protocol event loop. Channels for shutdown and ping/pong trivial roundtrip. Working test.

This commit is contained in:
Ignotus Peverell 2016-10-30 18:23:52 -07:00
parent 2efa8ce706
commit ee6fcab8db
No known key found for this signature in database
GPG key ID: 99CD25F39F8F8211
8 changed files with 268 additions and 156 deletions

View file

@ -79,12 +79,12 @@ impl Handshake {
});
}
let peer_info = PeerInfo{
capabilities: shake.capabilities,
user_agent: shake.user_agent,
addr: receiver_addr,
version: shake.version,
};
let peer_info = PeerInfo {
capabilities: shake.capabilities,
user_agent: shake.user_agent,
addr: receiver_addr,
version: shake.version,
};
info!("Connected to peer {:?}", peer_info);
// when more than one protocol version is supported, choosing should go here
@ -94,54 +94,54 @@ impl Handshake {
/// Handles receiving a connection from a new remote peer that started the
/// version handshake.
pub fn handshake(&self, mut conn: TcpStream) -> Result<(Box<Protocol>, PeerInfo), Error> {
// deserialize first part of handshake sent to us and do version negotiation
let hand = try!(deserialize::<Hand>(&mut conn));
if hand.version != 1 {
self.close(&mut conn,
ErrCodes::UnsupportedVersion as u32,
format!("Unsupported version: {}, ours: {})",
hand.version,
PROTOCOL_VERSION));
return Err(Error::UnexpectedData {
expected: vec![PROTOCOL_VERSION as u8],
received: vec![hand.version as u8],
});
}
{
// check the nonce to see if we could be trying to connect to ourselves
let nonces = self.nonces.read().unwrap();
if nonces.contains(&hand.nonce) {
return Err(Error::UnexpectedData {
expected: vec![],
received: vec![],
});
}
}
// all good, keep peer info
let peer_info = PeerInfo{
capabilities: hand.capabilities,
user_agent: hand.user_agent,
addr: conn.peer_addr().unwrap(),
version: hand.version,
};
// deserialize first part of handshake sent to us and do version negotiation
let hand = try!(deserialize::<Hand>(&mut conn));
if hand.version != 1 {
self.close(&mut conn,
ErrCodes::UnsupportedVersion as u32,
format!("Unsupported version: {}, ours: {})",
hand.version,
PROTOCOL_VERSION));
return Err(Error::UnexpectedData {
expected: vec![PROTOCOL_VERSION as u8],
received: vec![hand.version as u8],
});
}
{
// check the nonce to see if we could be trying to connect to ourselves
let nonces = self.nonces.read().unwrap();
if nonces.contains(&hand.nonce) {
return Err(Error::UnexpectedData {
expected: vec![],
received: vec![],
});
}
}
// send our reply with our info
let opt_err = serialize(&mut conn,
&Shake {
version: PROTOCOL_VERSION,
capabilities: FULL_SYNC,
user_agent: USER_AGENT.to_string(),
});
match opt_err {
Some(err) => return Err(err),
None => {}
}
info!("Received connection from peer {:?}", peer_info);
// when more than one protocol version is supported, choosing should go here
// all good, keep peer info
let peer_info = PeerInfo {
capabilities: hand.capabilities,
user_agent: hand.user_agent,
addr: conn.peer_addr().unwrap(),
version: hand.version,
};
// send our reply with our info
let opt_err = serialize(&mut conn,
&Shake {
version: PROTOCOL_VERSION,
capabilities: FULL_SYNC,
user_agent: USER_AGENT.to_string(),
});
match opt_err {
Some(err) => return Err(err),
None => {}
}
info!("Received connection from peer {:?}", peer_info);
// when more than one protocol version is supported, choosing should go here
Ok((Box::new(ProtocolV1::new(conn)), peer_info))
}
}
/// Generate a new random nonce and store it in our ring buffer
fn next_nonce(&self) -> u64 {

View file

@ -39,5 +39,5 @@ mod protocol;
mod server;
mod peer;
pub use server::Server;
pub use server::{Server, DummyAdapter};
pub use server::DEFAULT_LISTEN_ADDR;

View file

@ -36,13 +36,13 @@ pub enum ErrCodes {
/// Types of messages
#[derive(Clone, Copy)]
pub enum Type {
ERROR,
HAND,
SHAKE,
PING,
PONG,
GETPEERADDRS,
PEERADDRS,
Error,
Hand,
Shake,
Ping,
Pong,
GetPeerAddrs,
PeerAddrs,
/// Never used over the network but to detect unrecognized types.
MaxMsgType,
}
@ -50,10 +50,17 @@ pub enum Type {
/// Header of any protocol message, used to identify incoming messages.
pub struct MsgHeader {
magic: [u8; 2],
msg_type: Type,
pub msg_type: Type,
}
impl MsgHeader {
pub fn new(msg_type: Type) -> MsgHeader {
MsgHeader {
magic: MAGIC,
msg_type: msg_type,
}
}
pub fn acceptable(&self) -> bool {
(self.msg_type as u8) < (Type::MaxMsgType as u8)
}
@ -74,20 +81,20 @@ impl Readable<MsgHeader> for MsgHeader {
try!(reader.expect_u8(MAGIC[0]));
try!(reader.expect_u8(MAGIC[1]));
let t = try!(reader.read_u8());
if t < (Type::MaxMsgType as u8) {
if t >= (Type::MaxMsgType as u8) {
return Err(ser::Error::CorruptedData);
}
Ok(MsgHeader {
magic: MAGIC,
msg_type: match t {
// TODO this is rather ugly, think of a better way
0 => Type::ERROR,
1 => Type::HAND,
2 => Type::SHAKE,
3 => Type::PING,
4 => Type::PONG,
5 => Type::GETPEERADDRS,
6 => Type::PEERADDRS,
0 => Type::Error,
1 => Type::Hand,
2 => Type::Shake,
3 => Type::Ping,
4 => Type::Pong,
5 => Type::GetPeerAddrs,
6 => Type::PeerAddrs,
_ => panic!(),
},
})

View file

@ -16,40 +16,46 @@ use mioco::tcp::TcpStream;
use core::ser::Error;
use handshake::Handshake;
use msg::*;
use types::*;
pub struct Peer {
info: PeerInfo,
proto: Box<Protocol>,
info: PeerInfo,
proto: Box<Protocol>,
}
unsafe impl Sync for Peer {}
unsafe impl Send for Peer {}
impl Peer {
pub fn connect(conn: TcpStream, hs: &Handshake) -> Result<Peer, Error> {
let (proto, info) = try!(hs.connect(conn));
Ok(Peer{
info: info,
proto: proto,
})
}
Ok(Peer {
info: info,
proto: proto,
})
}
pub fn accept(conn: TcpStream, hs: &Handshake) -> Result<Peer, Error> {
let (proto, info) = try!(hs.handshake(conn));
Ok(Peer{
info: info,
proto: proto,
})
}
Ok(Peer {
info: info,
proto: proto,
})
}
pub fn run(&self, na: &NetAdapter) -> Option<Error> {
self.proto.handle(na)
}
pub fn run(&self, na: &NetAdapter) -> Option<Error> {
self.proto.handle(na)
}
pub fn stop(&self) {
self.proto.as_ref().close()
}
pub fn send_ping(&self) -> Option<Error> {
self.proto.send_ping()
}
pub fn sent_bytes(&self) -> u64 {
self.proto.sent_bytes()
}
pub fn stop(&self) {
self.proto.as_ref().close()
}
}

View file

@ -13,8 +13,10 @@
// limitations under the License.
use std::cell::RefCell;
use std::ops::DerefMut;
use std::io::{Read, Write};
use std::ops::{Deref, DerefMut};
use std::rc::Rc;
use std::sync::Mutex;
use mioco;
use mioco::sync::mpsc::{sync_channel, SyncSender};
@ -25,51 +27,103 @@ use handshake::Handshake;
use msg::*;
use types::*;
/// First version of our communication protocol. Manages the underlying
/// connection, listening to incoming messages and transmitting outgoing ones.
pub struct ProtocolV1 {
// The underlying tcp connection.
conn: RefCell<TcpStream>,
//msg_send: Option<SyncSender<ser::Writeable>>,
stop_send: RefCell<Option<SyncSender<u8>>>,
// Send channel for the rest of the local system to send messages to the peer we're connected to.
msg_send: RefCell<Option<SyncSender<Vec<u8>>>>,
// Stop channel to exit the send/listen loop.
stop_send: RefCell<Option<SyncSender<u8>>>,
// Used both to count the amount of data sent and lock writing to the conn. We can't wrap conn with
// the lock as we're always listening to receive.
sent_bytes: Mutex<u64>,
}
impl Protocol for ProtocolV1 {
/// Main protocol connection handling loop, starts listening to incoming
/// messages and transmitting messages the rest of the local system wants
/// to send. Must be called before any interaction with a protocol instance
/// and should only be called once. Will block so also needs to be called
/// within a coroutine.
fn handle(&self, server: &NetAdapter) -> Option<ser::Error> {
// setup channels so we can switch between reads, writes and close
let (msg_send, msg_recv) = sync_channel(10);
let (stop_send, stop_recv) = sync_channel(1);
// setup channels so we can switch between reads, writes and close
let (msg_send, msg_recv) = sync_channel(10);
let (stop_send, stop_recv) = sync_channel(1);
{
let mut msg_mut = self.msg_send.borrow_mut();
*msg_mut = Some(msg_send);
let mut stop_mut = self.stop_send.borrow_mut();
*stop_mut = Some(stop_send);
}
//self.msg_send = Some(msg_send);
let mut stop_mut = self.stop_send.borrow_mut();
*stop_mut = Some(stop_send);
let mut conn = self.conn.borrow_mut();
let mut conn = self.conn.borrow_mut();
loop {
select!(
// main select loop, switches between listening, sending or stopping
select!(
r:conn => {
// deser the header ot get the message type
let header = try_to_o!(ser::deserialize::<MsgHeader>(conn.deref_mut()));
if !header.acceptable() {
continue;
}
// check the message and hopefully do what's expected
match header.msg_type {
Type::Ping => {
// respond with pong
let data = try_to_o!(ser::ser_vec(&MsgHeader::new(Type::Pong)));
let mut sent_bytes = self.sent_bytes.lock().unwrap();
*sent_bytes += data.len() as u64;
try_to_o!(conn.deref_mut().write_all(&data[..]).map_err(&ser::Error::IOErr));
},
Type::Pong => {},
_ => error!("uncaught unknown"),
}
},
r:msg_recv => {
ser::serialize(conn.deref_mut(), msg_recv.recv().unwrap());
// relay a message originated from the rest of the local system
let data = &msg_recv.recv().unwrap()[..];
let mut sent_bytes = self.sent_bytes.lock().unwrap();
*sent_bytes += data.len() as u64;
try_to_o!(conn.deref_mut().write_all(data).map_err(&ser::Error::IOErr));
},
r:stop_recv => {
// shuts the connection don and end the loop
stop_recv.recv();
conn.shutdown(Shutdown::Both);
return None;;
return None;
}
);
}
}
fn close(&self) {
let stop_send = self.stop_send.borrow();
stop_send.as_ref().unwrap().send(0);
}
/// Sends a ping message to the remote peer. Will panic if handle has never
/// been called on this protocol.
fn send_ping(&self) -> Option<ser::Error> {
let data = try_to_o!(ser::ser_vec(&MsgHeader::new(Type::Ping)));
let msg_send = self.msg_send.borrow();
msg_send.as_ref().unwrap().send(data);
None
}
fn sent_bytes(&self) -> u64 {
*self.sent_bytes.lock().unwrap().deref()
}
fn close(&self) {
let stop_send = self.stop_send.borrow();
stop_send.as_ref().unwrap().send(0);
}
}
impl ProtocolV1 {
pub fn new(conn: TcpStream) -> ProtocolV1 {
ProtocolV1 { conn: RefCell::new(conn), /* msg_send: None, */ stop_send: RefCell::new(None) }
ProtocolV1 {
conn: RefCell::new(conn),
msg_send: RefCell::new(None),
stop_send: RefCell::new(None),
sent_bytes: Mutex::new(0),
}
}
}

View file

@ -23,6 +23,7 @@ use std::str::FromStr;
use std::sync::{Arc, RwLock};
use mioco;
use mioco::sync::mpsc::{sync_channel, SyncSender};
use mioco::tcp::{TcpListener, TcpStream};
use core::ser::Error;
@ -37,58 +38,79 @@ fn listen_addr() -> SocketAddr {
FromStr::from_str(DEFAULT_LISTEN_ADDR).unwrap()
}
struct DummyAdapter {}
pub struct DummyAdapter {}
impl NetAdapter for DummyAdapter {}
/// P2P server implementation, handling bootstrapping to find and connect to
/// peers, receiving connections from other peers and keep track of all of them.
pub struct Server {
peers: RwLock<Vec<Arc<Peer>>>,
peers: RwLock<Vec<Arc<Peer>>>,
stop_send: RefCell<Option<SyncSender<u8>>>,
}
unsafe impl Sync for Server {}
unsafe impl Send for Server {}
// TODO TLS
impl Server {
pub fn new() -> Server {
Server{peers: RwLock::new(Vec::new())}
}
/// Creates a new p2p server. Opens a TCP port to allow incoming
/// Creates a new idle p2p server with no peers
pub fn new() -> Server {
Server {
peers: RwLock::new(Vec::new()),
stop_send: RefCell::new(None),
}
}
/// Starts the p2p server. Opens a TCP port to allow incoming
/// connections and starts the bootstrapping process to find peers.
pub fn start(& self) -> Result<(), Error> {
mioco::spawn(move || -> io::Result<()> {
let addr = DEFAULT_LISTEN_ADDR.parse().unwrap();
let listener = try!(TcpListener::bind(&addr));
warn!("P2P server started on {}", addr);
pub fn start(&self) -> Result<(), Error> {
let addr = DEFAULT_LISTEN_ADDR.parse().unwrap();
let listener = try!(TcpListener::bind(&addr).map_err(&Error::IOErr));
warn!("P2P server started on {}", addr);
let hs = Arc::new(Handshake::new());
let hs = Arc::new(Handshake::new());
let (stop_send, stop_recv) = sync_channel(1);
{
let mut stop_mut = self.stop_send.borrow_mut();
*stop_mut = Some(stop_send);
}
loop {
let conn = try!(listener.accept());
let hs = hs.clone();
loop {
select!(
r:listener => {
let conn = try!(listener.accept().map_err(&Error::IOErr));
let hs = hs.clone();
mioco::spawn(move || -> io::Result<()> {
let peer = try!(Peer::connect(conn, &hs).map_err(|_| io::Error::last_os_error()));
let peer = try!(Peer::connect(conn, &hs));
let wpeer = Arc::new(peer);
// {
// let mut peers = self.peers.write().unwrap();
// peers.push(wpeer.clone());
// }
if let Some(err) = wpeer.run(&DummyAdapter{}) {
error!("{:?}", err);
}
Ok(())
});
}
Ok(())
});
Ok(())
{
let mut peers = self.peers.write().unwrap();
peers.push(wpeer.clone());
}
mioco::spawn(move || -> io::Result<()> {
if let Some(err) = wpeer.run(&DummyAdapter{}) {
error!("{:?}", err);
}
Ok(())
});
},
r:stop_recv => {
stop_recv.recv();
return Ok(());
}
);
}
}
pub fn stop(&self) {
let peers = self.peers.write().unwrap();
for p in peers.deref() {
p.stop();
}
}
/// Stops the server. Disconnect from all peers at the same time.
pub fn stop(&self) {
let peers = self.peers.write().unwrap();
for p in peers.deref() {
p.stop();
}
let stop_send = self.stop_send.borrow();
stop_send.as_ref().unwrap().send(0);
}
/// Simulates an unrelated client connecting to our server. Mostly used for
/// tests.

View file

@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::io::{Read, Write};
use std::net::SocketAddr;
use core::ser::Error;
@ -31,18 +30,28 @@ bitflags! {
pub struct PeerInfo {
pub capabilities: Capabilities,
pub user_agent: String,
pub version: u32,
pub addr: SocketAddr,
pub version: u32,
pub addr: SocketAddr,
}
/// A given communication protocol agreed upon between 2 peers (usually
/// ourselves and a remove) after handshake.
/// ourselves and a remote) after handshake. This trait is necessary to allow
/// protocol negotiation as it gets upgraded to multiple versions.
pub trait Protocol {
/// Starts handling protocol communication, the peer(s) is expected to be
/// known already, usually passed during construction.
/// Starts handling protocol communication, the connection) is expected to
/// be known already, usually passed during construction. Will typically
/// block so needs to be called withing a coroutine. Should also be called
/// only once.
fn handle(&self, na: &NetAdapter) -> Option<Error>;
fn close(&self);
/// Sends a ping message to the remote peer.
fn send_ping(&self) -> Option<Error>;
/// How many bytes have been sent to the remote peer.
fn sent_bytes(&self) -> u64;
/// Close the connection to the remote peer.
fn close(&self);
}
/// Bridge between the networking layer and the rest of the system. Handles the

View file

@ -17,7 +17,7 @@ extern crate mioco;
extern crate env_logger;
use std::io;
use std::thread;
use std::sync::Arc;
use std::time;
#[test]
@ -25,16 +25,30 @@ fn peer_handshake() {
env_logger::init().unwrap();
mioco::start(|| -> io::Result<()> {
let server = p2p::Server::new();
server.start();
let server = Arc::new(p2p::Server::new());
let in_server = server.clone();
mioco::spawn(move || -> io::Result<()> {
try!(in_server.start());
Ok(())
});
// given server a little time to start
mioco::sleep(time::Duration::from_millis(200));
let addr = p2p::DEFAULT_LISTEN_ADDR.parse().unwrap();
try!(p2p::Server::connect_as_client(addr).map_err(|_| io::Error::last_os_error()));
let peer = try!(p2p::Server::connect_as_client(addr).map_err(|_| io::Error::last_os_error()));
let peer = Arc::new(peer);
let in_peer = peer.clone();
mioco::spawn(move || -> io::Result<()> {
in_peer.run(&p2p::DummyAdapter{});
Ok(())
});
mioco::sleep(time::Duration::from_millis(100));
peer.send_ping();
mioco::sleep(time::Duration::from_millis(100));
assert!(peer.sent_bytes() > 0);
server.stop();
mioco::shutdown();
Ok(())
}).unwrap().unwrap();
}