Re-implemented the whole interaction between server, peer and protocol to be more Rust-ish. Server maintains peer references and protocol is internally mutable.

This commit is contained in:
Ignotus Peverell 2016-10-29 12:36:45 -07:00
parent fdaf2ba6af
commit 42769c373c
No known key found for this signature in database
GPG key ID: 99CD25F39F8F8211
9 changed files with 232 additions and 194 deletions

View file

@ -218,7 +218,7 @@ impl Block {
// repeated iterations, revisit if a problem // repeated iterations, revisit if a problem
// validate each transaction and gather their proofs // validate each transaction and gather their proofs
let mut proofs = try_oap_vec!(txs, |tx| tx.verify_sig(&secp)); let mut proofs = try_map_vec!(txs, |tx| tx.verify_sig(&secp));
proofs.push(reward_proof); proofs.push(reward_proof);
// build vectors with all inputs and all outputs, ordering them by hash // build vectors with all inputs and all outputs, ordering them by hash

View file

@ -29,7 +29,7 @@ macro_rules! map_vec {
/// Same as map_vec when the map closure returns Results. Makes sure the /// Same as map_vec when the map closure returns Results. Makes sure the
/// results are "pushed up" and wraps with a try. /// results are "pushed up" and wraps with a try.
#[macro_export] #[macro_export]
macro_rules! try_oap_vec { macro_rules! try_map_vec {
($thing:expr, $mapfn:expr ) => { ($thing:expr, $mapfn:expr ) => {
try!($thing.iter() try!($thing.iter()
.map($mapfn) .map($mapfn)

View file

@ -17,12 +17,12 @@ use std::sync::RwLock;
use rand::Rng; use rand::Rng;
use rand::os::OsRng; use rand::os::OsRng;
use mioco::tcp::{TcpStream, Shutdown};
use core::ser::{serialize, deserialize, Error}; use core::ser::{serialize, deserialize, Error};
use msg::*; use msg::*;
use types::*; use types::*;
use protocol::ProtocolV1; use protocol::ProtocolV1;
use peer::PeerConn;
const NONCES_CAP: usize = 100; const NONCES_CAP: usize = 100;
@ -44,20 +44,20 @@ impl Handshake {
} }
/// Handles connecting to a new remote peer, starting the version handshake. /// Handles connecting to a new remote peer, starting the version handshake.
pub fn connect<'a>(&'a self, peer: &'a mut PeerConn) -> Result<Box<Protocol + 'a>, Error> { pub fn connect(&self, mut conn: TcpStream) -> Result<(Box<Protocol>, PeerInfo), Error> {
// get a new nonce that can be used on handshake to detect self-connection // get a new nonce that can be used on handshake to detect self-connection
let nonce = self.next_nonce(); let nonce = self.next_nonce();
// send the first part of the handshake // send the first part of the handshake
let sender_addr = SockAddr(peer.local_addr()); let sender_addr = conn.local_addr().unwrap();
let receiver_addr = SockAddr(peer.peer_addr()); let receiver_addr = conn.peer_addr().unwrap();
let opt_err = serialize(peer, let opt_err = serialize(&mut conn,
&Hand { &Hand {
version: PROTOCOL_VERSION, version: PROTOCOL_VERSION,
capabilities: FULL_SYNC, capabilities: FULL_SYNC,
nonce: nonce, nonce: nonce,
sender_addr: sender_addr, sender_addr: SockAddr(sender_addr),
receiver_addr: receiver_addr, receiver_addr: SockAddr(receiver_addr),
user_agent: USER_AGENT.to_string(), user_agent: USER_AGENT.to_string(),
}); });
match opt_err { match opt_err {
@ -66,9 +66,9 @@ impl Handshake {
} }
// deserialize the handshake response and do version negotiation // deserialize the handshake response and do version negotiation
let shake = try!(deserialize::<Shake>(peer)); let shake = try!(deserialize::<Shake>(&mut conn));
if shake.version != 1 { if shake.version != 1 {
self.close(peer, self.close(&mut conn,
ErrCodes::UnsupportedVersion as u32, ErrCodes::UnsupportedVersion as u32,
format!("Unsupported version: {}, ours: {})", format!("Unsupported version: {}, ours: {})",
shake.version, shake.version,
@ -78,21 +78,26 @@ impl Handshake {
received: vec![shake.version as u8], received: vec![shake.version as u8],
}); });
} }
peer.capabilities = shake.capabilities;
peer.user_agent = shake.user_agent;
info!("Connected to peer {}", peer); let peer_info = PeerInfo{
capabilities: shake.capabilities,
user_agent: shake.user_agent,
addr: receiver_addr,
version: shake.version,
};
info!("Connected to peer {:?}", peer_info);
// when more than one protocol version is supported, choosing should go here // when more than one protocol version is supported, choosing should go here
Ok(Box::new(ProtocolV1::new(peer))) Ok((Box::new(ProtocolV1::new(conn)), peer_info))
} }
/// Handles receiving a connection from a new remote peer that started the /// Handles receiving a connection from a new remote peer that started the
/// version handshake. /// version handshake.
pub fn handshake<'a>(&'a self, peer: &'a mut PeerConn) -> Result<Box<Protocol + 'a>, Error> { pub fn handshake(&self, mut conn: TcpStream) -> Result<(Box<Protocol>, PeerInfo), Error> {
// deserialize first part of handshake sent to us and do version negotiation // deserialize first part of handshake sent to us and do version negotiation
let hand = try!(deserialize::<Hand>(peer)); let hand = try!(deserialize::<Hand>(&mut conn));
if hand.version != 1 { if hand.version != 1 {
self.close(peer, self.close(&mut conn,
ErrCodes::UnsupportedVersion as u32, ErrCodes::UnsupportedVersion as u32,
format!("Unsupported version: {}, ours: {})", format!("Unsupported version: {}, ours: {})",
hand.version, hand.version,
@ -114,11 +119,15 @@ impl Handshake {
} }
// all good, keep peer info // all good, keep peer info
peer.capabilities = hand.capabilities; let peer_info = PeerInfo{
peer.user_agent = hand.user_agent; capabilities: hand.capabilities,
user_agent: hand.user_agent,
addr: conn.peer_addr().unwrap(),
version: hand.version,
};
// send our reply with our info // send our reply with our info
let opt_err = serialize(peer, let opt_err = serialize(&mut conn,
&Shake { &Shake {
version: PROTOCOL_VERSION, version: PROTOCOL_VERSION,
capabilities: FULL_SYNC, capabilities: FULL_SYNC,
@ -129,9 +138,9 @@ impl Handshake {
None => {} None => {}
} }
info!("Received connection from peer {}", peer); info!("Received connection from peer {:?}", peer_info);
// when more than one protocol version is supported, choosing should go here // when more than one protocol version is supported, choosing should go here
Ok(Box::new(ProtocolV1::new(peer))) Ok((Box::new(ProtocolV1::new(conn)), peer_info))
} }
/// Generate a new random nonce and store it in our ring buffer /// Generate a new random nonce and store it in our ring buffer
@ -147,12 +156,12 @@ impl Handshake {
nonce nonce
} }
fn close(&self, peer: &mut PeerConn, err_code: u32, explanation: String) { fn close(&self, conn: &mut TcpStream, err_code: u32, explanation: String) {
serialize(peer, serialize(conn,
&PeerError { &PeerError {
code: err_code, code: err_code,
message: explanation, message: explanation,
}); });
peer.close(); conn.shutdown(Shutdown::Both);
} }
} }

View file

@ -27,6 +27,7 @@ extern crate bitflags;
extern crate grin_core as core; extern crate grin_core as core;
#[macro_use] #[macro_use]
extern crate log; extern crate log;
#[macro_use]
extern crate mioco; extern crate mioco;
extern crate rand; extern crate rand;
extern crate time; extern crate time;

View file

@ -18,6 +18,8 @@ use std::net::*;
use core::ser::{self, Writeable, Readable, Writer, Reader, Error}; use core::ser::{self, Writeable, Readable, Writer, Reader, Error};
use types::*;
/// Current latest version of the protocol /// Current latest version of the protocol
pub const PROTOCOL_VERSION: u32 = 1; pub const PROTOCOL_VERSION: u32 = 1;
/// Grin's user agent with current version (TODO externalize) /// Grin's user agent with current version (TODO externalize)
@ -31,16 +33,6 @@ pub enum ErrCodes {
UnsupportedVersion = 100, UnsupportedVersion = 100,
} }
bitflags! {
/// Options for block validation
pub flags Capabilities: u32 {
/// We don't know (yet) what the peer can do.
const UNKNOWN = 0b00000000,
/// Runs with the easier version of the Proof of Work, mostly to make testing easier.
const FULL_SYNC = 0b00000001,
}
}
/// Types of messages /// Types of messages
#[derive(Clone, Copy)] #[derive(Clone, Copy)]
pub enum Type { pub enum Type {
@ -49,6 +41,8 @@ pub enum Type {
SHAKE, SHAKE,
PING, PING,
PONG, PONG,
GETPEERADDRS,
PEERADDRS,
/// Never used over the network but to detect unrecognized types. /// Never used over the network but to detect unrecognized types.
MaxMsgType, MaxMsgType,
} }
@ -92,6 +86,8 @@ impl Readable<MsgHeader> for MsgHeader {
2 => Type::SHAKE, 2 => Type::SHAKE,
3 => Type::PING, 3 => Type::PING,
4 => Type::PONG, 4 => Type::PONG,
5 => Type::GETPEERADDRS,
6 => Type::PEERADDRS,
_ => panic!(), _ => panic!(),
}, },
}) })
@ -180,6 +176,54 @@ impl Readable<Shake> for Shake {
} }
} }
/// Ask for other peers addresses, required for network discovery.
pub struct GetPeerAddrs {
/// Filters on the capabilities we'd like the peers to have
pub capabilities: Capabilities,
}
impl Writeable for GetPeerAddrs {
fn write(&self, writer: &mut Writer) -> Option<ser::Error> {
writer.write_u32(self.capabilities.bits())
}
}
impl Readable<GetPeerAddrs> for GetPeerAddrs {
fn read(reader: &mut Reader) -> Result<GetPeerAddrs, ser::Error> {
let capab = try!(reader.read_u32());
let capabilities = try!(Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData));
Ok(GetPeerAddrs { capabilities: capabilities })
}
}
/// Peer addresses we know of that are fresh enough, in response to
/// GetPeerAddrs.
pub struct PeerAddrs {
pub peers: Vec<SockAddr>,
}
impl Writeable for PeerAddrs {
fn write(&self, writer: &mut Writer) -> Option<ser::Error> {
try_o!(writer.write_u32(self.peers.len() as u32));
for p in &self.peers {
p.write(writer);
}
None
}
}
impl Readable<PeerAddrs> for PeerAddrs {
fn read(reader: &mut Reader) -> Result<PeerAddrs, ser::Error> {
let peer_count = try!(reader.read_u32());
if peer_count > 1000 {
return Err(ser::Error::TooLargeReadErr(format!("Too many peers provided: {}",
peer_count)));
}
let peers = try_map_vec!([0..peer_count], |_| SockAddr::read(reader));
Ok(PeerAddrs { peers: peers })
}
}
/// We found some issue in the communication, sending an error back, usually /// We found some issue in the communication, sending an error back, usually
/// followed by closing the connection. /// followed by closing the connection.
pub struct PeerError { pub struct PeerError {
@ -247,7 +291,7 @@ impl Readable<SockAddr> for SockAddr {
ip[3]), ip[3]),
port)))) port))))
} else { } else {
let ip = try_oap_vec!([0..8], |_| reader.read_u16()); let ip = try_map_vec!([0..8], |_| reader.read_u16());
let port = try!(reader.read_u16()); let port = try!(reader.read_u16());
Ok(SockAddr(SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::new(ip[0], Ok(SockAddr(SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::new(ip[0],
ip[1], ip[1],

View file

@ -12,88 +12,40 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::fmt; use mioco::tcp::TcpStream;
use std::io::{self, Read, Write};
use std::net::SocketAddr;
use mioco::tcp::{TcpStream, Shutdown};
use handshake::Handshake;
use core::ser::Error; use core::ser::Error;
use handshake::Handshake;
use msg::*; use msg::*;
use types::*; use types::*;
/// The local representation of a remotely connected peer. Handles most pub struct Peer {
/// low-level network communication and tracks peer information. info: PeerInfo,
pub struct PeerConn { proto: Box<Protocol>,
conn: TcpStream,
pub capabilities: Capabilities,
pub user_agent: String,
} }
impl fmt::Display for PeerConn { unsafe impl Sync for Peer {}
// This trait requires `fmt` with this exact signature. unsafe impl Send for Peer {}
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{} {}", self.peer_addr(), self.user_agent) impl Peer {
}
pub fn connect(conn: TcpStream, hs: &Handshake) -> Result<Peer, Error> {
let (proto, info) = try!(hs.connect(conn));
Ok(Peer{
info: info,
proto: proto,
})
} }
/// Make the Peer a Reader for convenient access to the underlying connection. pub fn accept(conn: TcpStream, hs: &Handshake) -> Result<Peer, Error> {
/// Allows the peer to track how much is received. let (proto, info) = try!(hs.handshake(conn));
impl Read for PeerConn { Ok(Peer{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { info: info,
self.conn.read(buf) proto: proto,
} })
} }
/// Make the Peer a Writer for convenient access to the underlying connection. pub fn run(&self, na: &NetAdapter) -> Option<Error> {
/// Allows the peer to track how much is sent. self.proto.handle(na)
impl Write for PeerConn {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.conn.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.conn.flush()
}
}
impl Close for PeerConn {
fn close(&self) {
self.conn.shutdown(Shutdown::Both);
}
}
impl PeerConn {
/// Create a new local peer instance connected to a remote peer with the
/// provided TcpStream.
pub fn new(conn: TcpStream) -> PeerConn {
// don't wait on read for more than 2 seconds by default
conn.set_keepalive(Some(2));
PeerConn {
conn: conn,
capabilities: UNKNOWN,
user_agent: "".to_string(),
}
}
pub fn connect(&mut self, hs: &Handshake, na: &NetAdapter) -> Option<Error> {
let mut proto = try_to_o!(hs.connect(self));
proto.handle(na)
}
pub fn handshake(&mut self, hs: &Handshake, na: &NetAdapter) -> Option<Error> {
let mut proto = try_to_o!(hs.handshake(self));
proto.handle(na)
}
}
impl PeerInfo for PeerConn {
fn peer_addr(&self) -> SocketAddr {
self.conn.peer_addr().unwrap()
}
fn local_addr(&self) -> SocketAddr {
// TODO likely not exactly what we want (private vs public IP)
self.conn.local_addr().unwrap()
} }
} }

View file

@ -12,37 +12,56 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::cell::RefCell;
use std::ops::DerefMut;
use std::rc::Rc;
use mioco;
use mioco::sync::mpsc::sync_channel;
use mioco::tcp::{TcpStream, Shutdown};
use core::ser; use core::ser;
use handshake::Handshake;
use msg::*; use msg::*;
use types::*; use types::*;
use peer::PeerConn;
pub struct ProtocolV1<'a> { pub struct ProtocolV1 {
peer: &'a mut PeerConn, conn: RefCell<TcpStream>,
} }
impl<'a> Protocol for ProtocolV1<'a> { impl Protocol for ProtocolV1 {
fn handle(&mut self, server: &NetAdapter) -> Option<ser::Error> { fn handle(&self, server: &NetAdapter) -> Option<ser::Error> {
// setup a channel so we can switch between reads and writes
let (send, recv) = sync_channel(10);
let mut conn = self.conn.borrow_mut();
loop { loop {
let header = try_to_o!(ser::deserialize::<MsgHeader>(self.peer)); select!(
r:conn => {
let header = try_to_o!(ser::deserialize::<MsgHeader>(conn.deref_mut()));
if !header.acceptable() { if !header.acceptable() {
continue; continue;
} }
},
r:recv => {
ser::serialize(conn.deref_mut(), recv.recv().unwrap());
}
);
} }
} }
} }
impl<'a> ProtocolV1<'a> { impl ProtocolV1 {
pub fn new(p: &mut PeerConn) -> ProtocolV1 { pub fn new(conn: TcpStream) -> ProtocolV1 {
ProtocolV1 { peer: p } ProtocolV1 { conn: RefCell::new(conn) }
} }
fn close(&mut self, err_code: u32, explanation: &'static str) { // fn close(&mut self, err_code: u32, explanation: &'static str) {
ser::serialize(self.peer, // ser::serialize(self.conn,
&PeerError { // &PeerError {
code: err_code, // code: err_code,
message: explanation.to_string(), // message: explanation.to_string(),
}); // });
self.peer.close(); // self.conn.shutdown(Shutdown::Both);
} // }
} }

View file

@ -15,17 +15,18 @@
//! Grin server implementation, accepts incoming connections and connects to //! Grin server implementation, accepts incoming connections and connects to
//! other peers in the network. //! other peers in the network.
use std::cell::RefCell;
use std::io; use std::io;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::{Arc, RwLock};
use mioco; use mioco;
use mioco::tcp::{TcpListener, TcpStream}; use mioco::tcp::{TcpListener, TcpStream};
use core::ser::Error; use core::ser::Error;
use handshake::Handshake; use handshake::Handshake;
use peer::PeerConn; use peer::Peer;
use types::*; use types::*;
pub const DEFAULT_LISTEN_ADDR: &'static str = "127.0.0.1:3414"; pub const DEFAULT_LISTEN_ADDR: &'static str = "127.0.0.1:3414";
@ -38,14 +39,20 @@ fn listen_addr() -> SocketAddr {
struct DummyAdapter {} struct DummyAdapter {}
impl NetAdapter for DummyAdapter {} impl NetAdapter for DummyAdapter {}
/// P2P server implementation, handling bootstrapping to find and connect to
/// peers, receiving connections from other peers and keep track of all of them.
pub struct Server { pub struct Server {
peers: RwLock<Vec<Arc<Peer>>>,
} }
// TODO TLS
impl Server { impl Server {
pub fn new() -> Server {
Server{peers: RwLock::new(Vec::new())}
}
/// Creates a new p2p server. Opens a TCP port to allow incoming /// Creates a new p2p server. Opens a TCP port to allow incoming
/// connections and starts the bootstrapping process to find peers. /// connections and starts the bootstrapping process to find peers.
pub fn start() -> Result<Server, Error> { pub fn start(&'static self) -> Result<(), Error> {
// TODO TLS
mioco::spawn(move || -> io::Result<()> { mioco::spawn(move || -> io::Result<()> {
let addr = DEFAULT_LISTEN_ADDR.parse().unwrap(); let addr = DEFAULT_LISTEN_ADDR.parse().unwrap();
let listener = try!(TcpListener::bind(&addr)); let listener = try!(TcpListener::bind(&addr));
@ -55,11 +62,16 @@ impl Server {
loop { loop {
let conn = try!(listener.accept()); let conn = try!(listener.accept());
let hs_child = hs.clone(); let hs = hs.clone();
mioco::spawn(move || -> io::Result<()> { mioco::spawn(move || -> io::Result<()> {
let ret = PeerConn::new(conn).handshake(&hs_child, &DummyAdapter {}); let peer = try!(Peer::connect(conn, &hs).map_err(|_| io::Error::last_os_error()));
if let Some(err) = ret { let wpeer = Arc::new(peer);
{
let mut peers = self.peers.write().unwrap();
peers.push(wpeer.clone());
}
if let Some(err) = wpeer.run(&DummyAdapter{}) {
error!("{:?}", err); error!("{:?}", err);
} }
Ok(()) Ok(())
@ -67,18 +79,13 @@ impl Server {
} }
Ok(()) Ok(())
}); });
Ok(Server {}) Ok(())
} }
/// Simulates an unrelated client connecting to our server. Mostly used for /// Simulates an unrelated client connecting to our server. Mostly used for
/// tests. /// tests.
pub fn connect_as_client(addr: SocketAddr) -> Option<Error> { pub fn connect_as_client(addr: SocketAddr) -> Result<Peer, Error> {
let tcp_client = TcpStream::connect(&addr).unwrap(); let tcp_client = TcpStream::connect(&addr).unwrap();
let mut peer = PeerConn::new(tcp_client); Peer::accept(tcp_client, &Handshake::new())
let hs = Handshake::new();
if let Err(e) = hs.connect(&mut peer) {
return Some(e);
}
None
} }
} }

View file

@ -16,17 +16,23 @@ use std::io::{Read, Write};
use std::net::SocketAddr; use std::net::SocketAddr;
use core::ser::Error; use core::ser::Error;
/// Trait for pre-emptively and forcefully closing an underlying resource. bitflags! {
pub trait Close { /// Options for block validation
fn close(&self); pub flags Capabilities: u32 {
/// We don't know (yet) what the peer can do.
const UNKNOWN = 0b00000000,
/// Runs with the easier version of the Proof of Work, mostly to make testing easier.
const FULL_SYNC = 0b00000001,
}
} }
/// General information about a connected peer that's useful to other modules. /// General information about a connected peer that's useful to other modules.
pub trait PeerInfo { #[derive(Debug)]
/// Address of the remote peer pub struct PeerInfo {
fn peer_addr(&self) -> SocketAddr; pub capabilities: Capabilities,
/// Our address, communicated to other peers pub user_agent: String,
fn local_addr(&self) -> SocketAddr; pub version: u32,
pub addr: SocketAddr,
} }
/// A given communication protocol agreed upon between 2 peers (usually /// A given communication protocol agreed upon between 2 peers (usually
@ -34,7 +40,7 @@ pub trait PeerInfo {
pub trait Protocol { pub trait Protocol {
/// Starts handling protocol communication, the peer(s) is expected to be /// Starts handling protocol communication, the peer(s) is expected to be
/// known already, usually passed during construction. /// known already, usually passed during construction.
fn handle(&mut self, na: &NetAdapter) -> Option<Error>; fn handle(&self, na: &NetAdapter) -> Option<Error>;
} }
/// Bridge between the networking layer and the rest of the system. Handles the /// Bridge between the networking layer and the rest of the system. Handles the