Rewrote most of p2p code to use futures-rs instead of mioco. Need some cleanup and support for a few more message types.

This commit is contained in:
Ignotus Peverell 2016-12-10 19:11:49 -08:00
parent 9ea3389aea
commit 4b5c010b05
No known key found for this signature in database
GPG key ID: 99CD25F39F8F8211
13 changed files with 439 additions and 655 deletions

View file

@ -36,7 +36,7 @@ pub enum Error {
/// Data wasn't in a consumable format
CorruptedData,
/// When asked to read too much data
TooLargeReadErr(String),
TooLargeReadErr,
}
impl From<io::Error> for Error {
@ -53,7 +53,7 @@ impl fmt::Display for Error {
write!(f, "expected {:?}, got {:?}", e, r)
}
Error::CorruptedData => f.write_str("corrupted data"),
Error::TooLargeReadErr(ref s) => f.write_str(&s),
Error::TooLargeReadErr => f.write_str("too large read"),
}
}
}
@ -71,7 +71,7 @@ impl error::Error for Error {
Error::IOErr(ref e) => error::Error::description(e),
Error::UnexpectedData { expected: _, received: _ } => "unexpected data",
Error::CorruptedData => "corrupted data",
Error::TooLargeReadErr(ref s) => s,
Error::TooLargeReadErr => "too large read",
}
}
}
@ -238,7 +238,7 @@ impl<'a> Reader for BinReader<'a> {
fn read_fixed_bytes(&mut self, length: usize) -> Result<Vec<u8>, Error> {
// not reading more than 100k in a single read
if length > 100000 {
return Err(Error::TooLargeReadErr(format!("fixed bytes length too large: {}", length)));
return Err(Error::TooLargeReadErr);
}
let mut buf = vec![0; length];
self.source.read_exact(&mut buf).map(move |_| buf).map_err(Error::IOErr)

View file

@ -1,14 +1,15 @@
[package]
name = "grin_p2p"
name = "grin_p2p_fut"
version = "0.1.0"
authors = ["Ignotus Peverell <igno.peverell@protonmail.com>"]
[dependencies]
bitflags = "^0.7.0"
byteorder = "^0.5"
futures = "^0.1.6"
log = "^0.3"
rand = "^0.3"
mioco = "^0.8"
tokio-core="^0.1.1"
time = "^0.1"
enum_primitive = "^0.1.0"
num = "^0.1.36"

View file

@ -13,11 +13,14 @@
// limitations under the License.
use std::collections::VecDeque;
use std::sync::RwLock;
use std::sync::{Arc, RwLock};
use futures::Future;
use futures::future::ok;
use rand::Rng;
use rand::os::OsRng;
use mioco::tcp::{TcpStream, Shutdown};
use tokio_core::net::TcpStream;
use tokio_core::io::{write_all, read_exact, read_to_end};
use core::ser::{serialize, deserialize, Error};
use msg::*;
@ -31,7 +34,7 @@ const NONCES_CAP: usize = 100;
pub struct Handshake {
/// Ring buffer of nonces sent to detect self connections without requiring
/// a node id.
nonces: RwLock<VecDeque<u64>>,
nonces: Arc<RwLock<VecDeque<u64>>>,
}
unsafe impl Sync for Handshake {}
@ -40,64 +43,56 @@ unsafe impl Send for Handshake {}
impl Handshake {
/// Creates a new handshake handler
pub fn new() -> Handshake {
Handshake { nonces: RwLock::new(VecDeque::with_capacity(NONCES_CAP)) }
Handshake { nonces: Arc::new(RwLock::new(VecDeque::with_capacity(NONCES_CAP))) }
}
/// Handles connecting to a new remote peer, starting the version handshake.
pub fn connect(&self, mut conn: TcpStream) -> Result<(Box<Protocol>, PeerInfo), Error> {
// get a new nonce that can be used on handshake to detect self-connection
pub fn connect(&self,
conn: TcpStream)
-> Box<Future<Item = (TcpStream, ProtocolV1, PeerInfo), Error = Error>> {
let nonce = self.next_nonce();
// send the first part of the handshake
let sender_addr = conn.local_addr().unwrap();
let receiver_addr = conn.peer_addr().unwrap();
try!(serialize(&mut conn,
&Hand {
let hand = Hand {
version: PROTOCOL_VERSION,
capabilities: FULL_SYNC,
nonce: nonce,
sender_addr: SockAddr(sender_addr),
receiver_addr: SockAddr(receiver_addr),
sender_addr: SockAddr(conn.local_addr().unwrap()),
receiver_addr: SockAddr(conn.peer_addr().unwrap()),
user_agent: USER_AGENT.to_string(),
}));
// deserialize the handshake response and do version negotiation
let shake = try!(deserialize::<Shake>(&mut conn));
};
Box::new(write_msg(conn, hand, Type::Hand)
.and_then(|conn| {
read_msg::<Shake>(conn)
})
.and_then(|(conn, shake)| {
if shake.version != 1 {
self.close(&mut conn,
ErrCodes::UnsupportedVersion as u32,
format!("Unsupported version: {}, ours: {})",
shake.version,
PROTOCOL_VERSION));
return Err(Error::UnexpectedData {
Err(Error::UnexpectedData {
expected: vec![PROTOCOL_VERSION as u8],
received: vec![shake.version as u8],
});
}
})
} else {
let peer_info = PeerInfo {
capabilities: shake.capabilities,
user_agent: shake.user_agent,
addr: receiver_addr,
addr: conn.peer_addr().unwrap(),
version: shake.version,
};
info!("Connected to peer {:?}", peer_info);
// when more than one protocol version is supported, choosing should go here
Ok((Box::new(ProtocolV1::new(conn)), peer_info))
Ok((conn, ProtocolV1::new(), peer_info))
}
}))
}
/// Handles receiving a connection from a new remote peer that started the
/// version handshake.
pub fn handshake(&self, mut conn: TcpStream) -> Result<(Box<Protocol>, PeerInfo), Error> {
// deserialize first part of handshake sent to us and do version negotiation
let hand = try!(deserialize::<Hand>(&mut conn));
pub fn handshake(&self,
conn: TcpStream)
-> Box<Future<Item = (TcpStream, ProtocolV1, PeerInfo), Error = Error>> {
let nonces = self.nonces.clone();
Box::new(read_msg::<Hand>(conn)
.and_then(move |(conn, hand)| {
if hand.version != 1 {
self.close(&mut conn,
ErrCodes::UnsupportedVersion as u32,
format!("Unsupported version: {}, ours: {})",
hand.version,
PROTOCOL_VERSION));
return Err(Error::UnexpectedData {
expected: vec![PROTOCOL_VERSION as u8],
received: vec![hand.version as u8],
@ -105,7 +100,7 @@ impl Handshake {
}
{
// check the nonce to see if we could be trying to connect to ourselves
let nonces = self.nonces.read().unwrap();
let nonces = nonces.read().unwrap();
if nonces.contains(&hand.nonce) {
return Err(Error::UnexpectedData {
expected: vec![],
@ -113,7 +108,6 @@ impl Handshake {
});
}
}
// all good, keep peer info
let peer_info = PeerInfo {
capabilities: hand.capabilities,
@ -121,18 +115,19 @@ impl Handshake {
addr: conn.peer_addr().unwrap(),
version: hand.version,
};
// send our reply with our info
try!(serialize(&mut conn,
&Shake {
let shake = Shake {
version: PROTOCOL_VERSION,
capabilities: FULL_SYNC,
user_agent: USER_AGENT.to_string(),
}));
info!("Received connection from peer {:?}", peer_info);
};
Ok((conn, shake, peer_info))
})
.and_then(|(conn, shake, peer_info)| {
write_msg(conn, shake, Type::Shake)
// when more than one protocol version is supported, choosing should go here
Ok((Box::new(ProtocolV1::new(conn)), peer_info))
.map(|conn| (conn, ProtocolV1::new(), peer_info))
}))
}
/// Generate a new random nonce and store it in our ring buffer
@ -147,13 +142,4 @@ impl Handshake {
}
nonce
}
fn close(&self, conn: &mut TcpStream, err_code: u32, explanation: String) {
serialize(conn,
&PeerError {
code: err_code,
message: explanation,
});
conn.shutdown(Shutdown::Both);
}
}

View file

@ -28,19 +28,19 @@ extern crate enum_primitive;
extern crate grin_core as core;
#[macro_use]
extern crate log;
extern crate futures;
#[macro_use]
extern crate mioco;
extern crate tokio_core;
extern crate rand;
extern crate time;
extern crate num;
mod types;
mod msg;
pub mod handshake;
mod rw;
mod msg;
mod peer;
mod protocol;
mod server;
mod peer;
mod types;
pub use server::{Server, DummyAdapter};
pub use peer::Peer;

View file

@ -14,11 +14,15 @@
//! Message types that transit over the network and related serialization code.
use std::net::*;
use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6, Ipv4Addr, Ipv6Addr};
use num::FromPrimitive;
use futures::future::{Future, ok};
use tokio_core::net::TcpStream;
use tokio_core::io::{write_all, read_exact};
use core::ser::{self, Writeable, Readable, Writer, Reader};
use core::consensus::MAX_MSG_LEN;
use types::*;
@ -30,6 +34,9 @@ pub const USER_AGENT: &'static str = "MW/Grin 0.1";
/// Magic number expected in the header of every message
const MAGIC: [u8; 2] = [0x1e, 0xc5];
/// Size in bytes of a message header
pub const HEADER_LEN: u64 = 11;
/// Codes for each error that can be produced reading a message.
pub enum ErrCodes {
UnsupportedVersion = 100,
@ -51,27 +58,79 @@ enum_from_primitive! {
}
}
/// Future combinator to read any message where the body is a Readable. Reads
/// the header first, handles its validation and then reads the Readable body,
/// allocating buffers of the right size.
pub fn read_msg<T>(conn: TcpStream) -> Box<Future<Item = (TcpStream, T), Error = ser::Error>>
where T: Readable<T> + 'static
{
let read_header = read_exact(conn, vec![0u8; HEADER_LEN as usize])
.map_err(|e| ser::Error::IOErr(e))
.and_then(|(reader, buf)| {
let header = try!(ser::deserialize::<MsgHeader>(&mut &buf[..]));
if header.msg_len > MAX_MSG_LEN {
// TODO add additional restrictions on a per-message-type basis to avoid 20MB
// pings
return Err(ser::Error::TooLargeReadErr);
}
Ok((reader, header))
});
let read_msg = read_header.and_then(|(reader, header)| {
read_exact(reader, vec![0u8; header.msg_len as usize]).map_err(|e| ser::Error::IOErr(e))
})
.and_then(|(reader, buf)| {
let body = try!(ser::deserialize(&mut &buf[..]));
Ok((reader, body))
});
Box::new(read_msg)
}
/// Future combinator to write a full message from a Writeable payload.
/// Serializes the payload first and then sends the message header and that
/// payload.
pub fn write_msg<T>(conn: TcpStream,
msg: T,
msg_type: Type)
-> Box<Future<Item = TcpStream, Error = ser::Error>>
where T: Writeable + 'static
{
let write_msg = ok((conn)).and_then(move |conn| {
// prepare the body first so we know its serialized length
let mut body_buf = vec![];
ser::serialize(&mut body_buf, &msg);
// build and send the header using the body size
let mut header_buf = vec![];
let blen = body_buf.len() as u64;
ser::serialize(&mut header_buf, &MsgHeader::new(msg_type, blen));
write_all(conn, header_buf)
.and_then(|(conn, _)| write_all(conn, body_buf))
.map(|(conn, _)| conn)
.map_err(|e| ser::Error::IOErr(e))
});
Box::new(write_msg)
}
/// Header of any protocol message, used to identify incoming messages.
pub struct MsgHeader {
magic: [u8; 2],
pub msg_type: Type,
pub msg_len: u64,
}
impl MsgHeader {
pub fn new(msg_type: Type) -> MsgHeader {
pub fn new(msg_type: Type, len: u64) -> MsgHeader {
MsgHeader {
magic: MAGIC,
msg_type: msg_type,
msg_len: len,
}
}
pub fn acceptable(&self) -> bool {
Type::from_u8(self.msg_type as u8).is_some()
}
/// Serialized length of the header in bytes
pub fn serialized_len(&self) -> u64 {
3
HEADER_LEN
}
}
@ -80,7 +139,8 @@ impl Writeable for MsgHeader {
ser_multiwrite!(writer,
[write_u8, self.magic[0]],
[write_u8, self.magic[1]],
[write_u8, self.msg_type as u8]);
[write_u8, self.msg_type as u8],
[write_u64, self.msg_len]);
Ok(())
}
}
@ -89,12 +149,13 @@ impl Readable<MsgHeader> for MsgHeader {
fn read(reader: &mut Reader) -> Result<MsgHeader, ser::Error> {
try!(reader.expect_u8(MAGIC[0]));
try!(reader.expect_u8(MAGIC[1]));
let t = try!(reader.read_u8());
let (t, len) = ser_multiread!(reader, read_u8, read_u64);
match Type::from_u8(t) {
Some(ty) => {
Ok(MsgHeader {
magic: MAGIC,
msg_type: ty,
msg_len: len,
})
}
None => Err(ser::Error::CorruptedData),
@ -224,8 +285,7 @@ impl Readable<PeerAddrs> for PeerAddrs {
fn read(reader: &mut Reader) -> Result<PeerAddrs, ser::Error> {
let peer_count = try!(reader.read_u32());
if peer_count > 1000 {
return Err(ser::Error::TooLargeReadErr(format!("Too many peers provided: {}",
peer_count)));
return Err(ser::Error::TooLargeReadErr);
}
let peers = try_map_vec!([0..peer_count], |_| SockAddr::read(reader));
Ok(PeerAddrs { peers: peers })
@ -313,3 +373,19 @@ impl Readable<SockAddr> for SockAddr {
}
}
}
/// Placeholder for messages like Ping and Pong that don't send anything but
/// the header.
pub struct Empty {}
impl Writeable for Empty {
fn write(&self, writer: &mut Writer) -> Result<(), ser::Error> {
Ok(())
}
}
impl Readable<Empty> for Empty {
fn read(reader: &mut Reader) -> Result<Empty, ser::Error> {
Ok(Empty {})
}
}

View file

@ -12,7 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use mioco::tcp::TcpStream;
use futures::Future;
use tokio_core::net::TcpStream;
use core::core;
use core::ser::Error;
@ -28,45 +29,45 @@ unsafe impl Sync for Peer {}
unsafe impl Send for Peer {}
impl Peer {
pub fn connect(conn: TcpStream, hs: &Handshake) -> Result<Peer, Error> {
let (proto, info) = try!(hs.connect(conn));
Ok(Peer {
pub fn connect(conn: TcpStream,
hs: &Handshake)
-> Box<Future<Item = (TcpStream, Peer), Error = Error>> {
let connect_peer = hs.connect(conn).and_then(|(conn, proto, info)| {
Ok((conn,
Peer {
info: info,
proto: proto,
})
proto: Box::new(proto),
}))
});
Box::new(connect_peer)
}
pub fn accept(conn: TcpStream, hs: &Handshake) -> Result<Peer, Error> {
let (proto, info) = try!(hs.handshake(conn));
Ok(Peer {
pub fn accept(conn: TcpStream,
hs: &Handshake)
-> Box<Future<Item = (TcpStream, Peer), Error = Error>> {
let hs_peer = hs.handshake(conn).and_then(|(conn, proto, info)| {
Ok((conn,
Peer {
info: info,
proto: proto,
})
proto: Box::new(proto),
}))
});
Box::new(hs_peer)
}
pub fn run(&self, na: &NetAdapter) -> Result<(), Error> {
self.proto.handle(na)
}
pub fn send_ping(&self) -> Result<(), Error> {
self.proto.send_ping()
}
pub fn send_block(&self, b: &core::Block) -> Result<(), Error> {
// TODO don't send if we already got the block from peer
self.proto.send_block(b)
}
pub fn send_transaction(&self, tx: &core::Transaction) -> Result<(), Error> {
// TODO don't relay if we already got the tx from peer
self.proto.send_transaction(tx)
pub fn run(&self, conn: TcpStream, na: &NetAdapter) -> Box<Future<Item = (), Error = Error>> {
self.proto.handle(conn, na)
}
pub fn transmitted_bytes(&self) -> (u64, u64) {
self.proto.transmitted_bytes()
}
pub fn send_ping(&self) -> Result<(), Error> {
self.proto.send_ping()
}
pub fn stop(&self) {
self.proto.as_ref().close()
self.proto.close();
}
}

View file

@ -13,210 +13,147 @@
// limitations under the License.
use std::cell::RefCell;
use std::io::Write;
use std::ops::{Deref, DerefMut};
use std::sync::Mutex;
use std::iter;
use std::ops::DerefMut;
use std::sync::{Mutex, Arc};
use mioco;
use mioco::sync::mpsc::{sync_channel, SyncSender, Receiver};
use mioco::tcp::{TcpStream, Shutdown};
use futures;
use futures::{Stream, Future};
use futures::stream;
use futures::sync::mpsc::UnboundedSender;
use tokio_core::io::{Io, write_all, read_exact, read_to_end};
use tokio_core::net::TcpStream;
use core::core;
use core::ser;
use msg::*;
use rw;
use types::*;
/// In normal peer operation we don't want to be sent more than 100Mb in a
/// single message.
const MAX_DATA_BYTES: usize = 100 * 1000 * 1000;
/// Number of errors before we disconnect from a peer.
const MAX_ERRORS: u64 = 5;
/// First version of our communication protocol. Manages the underlying
/// connection, listening to incoming messages and transmitting outgoing ones.
pub struct ProtocolV1 {
// The underlying tcp connection.
conn: RefCell<TcpStream>,
outbound_chan: RefCell<Option<UnboundedSender<Vec<u8>>>>,
// Send channel for the rest of the local system to send messages to the peer we're connected to.
msg_send: RefCell<Option<SyncSender<Vec<u8>>>>,
// Stop channel to exit the send/listen loop.
stop_send: RefCell<Option<SyncSender<u8>>>,
// Used both to count the amount of data sent and lock writing to the conn. We can't wrap conn with
// the lock as we're always listening to receive.
sent_bytes: Mutex<u64>,
// Bytes we've sent.
sent_bytes: Arc<Mutex<u64>>,
// Bytes we've received.
received_bytes: Mutex<u64>,
received_bytes: Arc<Mutex<u64>>,
// Counter for read errors.
error_count: Mutex<u64>,
}
impl ProtocolV1 {
/// Creates a new protocol v1
pub fn new(conn: TcpStream) -> ProtocolV1 {
pub fn new() -> ProtocolV1 {
ProtocolV1 {
conn: RefCell::new(conn),
msg_send: RefCell::new(None),
stop_send: RefCell::new(None),
sent_bytes: Mutex::new(0),
received_bytes: Mutex::new(0),
outbound_chan: RefCell::new(None),
sent_bytes: Arc::new(Mutex::new(0)),
received_bytes: Arc::new(Mutex::new(0)),
error_count: Mutex::new(0),
}
}
}
impl Protocol for ProtocolV1 {
/// Main protocol connection handling loop, starts listening to incoming
/// messages and transmitting messages the rest of the local system wants
/// to send. Must be called before any interaction with a protocol instance
/// and should only be called once. Will block so also needs to be called
/// within a coroutine.
fn handle(&self, adapter: &NetAdapter) -> Result<(), ser::Error> {
// setup channels so we can switch between reads, writes and close
let (msg_recv, stop_recv) = self.setup_channels();
fn handle(&self,
conn: TcpStream,
adapter: &NetAdapter)
-> Box<Future<Item = (), Error = ser::Error>> {
let (reader, writer) = conn.split();
let mut conn = self.conn.borrow_mut();
loop {
// main select loop, switches between listening, sending or stopping
select!(
r:conn => {
let res = self.process_msg(&mut conn, adapter);
if let Err(_) = res {
let mut cnt = self.error_count.lock().unwrap();
*cnt += 1;
if *cnt > MAX_ERRORS {
return res.map(|_| ());
// prepare the channel that will transmit data to the connection writer
let (tx, rx) = futures::sync::mpsc::unbounded();
{
let mut out_mut = self.outbound_chan.borrow_mut();
*out_mut = Some(tx.clone());
}
// infinite iterator stream so we repeat the message reading logic until the
// peer is stopped
let iter = stream::iter(iter::repeat(()).map(Ok::<(), ser::Error>));
// setup the reading future, getting messages from the peer and processing them
let recv_bytes = self.received_bytes.clone();
let read_msg = iter.fold(reader, move |reader, _| {
let mut tx_inner = tx.clone();
let recv_bytes = recv_bytes.clone();
read_exact(reader, vec![0u8; HEADER_LEN as usize])
.map_err(|e| ser::Error::IOErr(e))
.and_then(move |(reader, buf)| {
// first read the message header
let header = try!(ser::deserialize::<MsgHeader>(&mut &buf[..]));
Ok((reader, header))
})
.map(move |(reader, header)| {
// add the count of bytes sent
let mut recv_bytes = recv_bytes.lock().unwrap();
*recv_bytes += header.serialized_len() + header.msg_len;
// and handle the different message types
match header.msg_type {
Type::Ping => {
let data = ser::ser_vec(&MsgHeader::new(Type::Pong, 0)).unwrap();
if let Err(e) = tx_inner.send(data) {
warn!("Couldn't send pong to remote peer: {}", e);
}
}
},
r:msg_recv => {
// relay a message originated from the rest of the local system
let data = &msg_recv.recv().unwrap()[..];
let mut sent_bytes = self.sent_bytes.lock().unwrap();
Type::Pong => {}
_ => {
error!("unknown message type {:?}", header.msg_type);
}
};
reader
})
});
// setting the writing future, getting messages from our system and sending
// them out
let sent_bytes = self.sent_bytes.clone();
let send_data = rx.map(move |data| {
// add the count of bytes sent
let mut sent_bytes = sent_bytes.lock().unwrap();
*sent_bytes += data.len() as u64;
try!(conn.deref_mut().write_all(data).map_err(&ser::Error::IOErr));
},
r:stop_recv => {
// shuts the connection don and end the loop
stop_recv.recv();
conn.shutdown(Shutdown::Both);
return Ok(());
}
);
data
})
// write the data and make sure the future returns the right types
.fold(writer,
|writer, data| write_all(writer, data).map_err(|_| ()).map(|(writer, buf)| writer))
.map(|_| ())
.map_err(|_| ser::Error::CorruptedData);
// select between our different futures and return them
Box::new(read_msg.map(|_| ()).select(send_data).map(|_| ()).map_err(|(e, _)| e))
}
/// Bytes sent and received by this peer to the remote peer.
fn transmitted_bytes(&self) -> (u64, u64) {
let sent = *self.sent_bytes.lock().unwrap();
let recv = *self.received_bytes.lock().unwrap();
(sent, recv)
}
/// Sends a ping message to the remote peer. Will panic if handle has never
/// been called on this protocol.
fn send_ping(&self) -> Result<(), ser::Error> {
let data = try!(ser::ser_vec(&MsgHeader::new(Type::Ping)));
let msg_send = self.msg_send.borrow();
msg_send.as_ref().unwrap().send(data);
let data = try!(ser::ser_vec(&MsgHeader::new(Type::Ping, 0)));
let mut msg_send = self.outbound_chan.borrow_mut();
if let Err(e) = msg_send.deref_mut().as_mut().unwrap().send(data) {
warn!("Couldn't send message to remote peer: {}", e);
}
Ok(())
}
/// Serializes and sends a block to our remote peer
fn send_block(&self, b: &core::Block) -> Result<(), ser::Error> {
self.send_msg(Type::Block, b)
unimplemented!();
}
/// Serializes and sends a transaction to our remote peer
fn send_transaction(&self, tx: &core::Transaction) -> Result<(), ser::Error> {
self.send_msg(Type::Transaction, tx)
}
/// Bytes sent and received by this peer to the remote peer.
fn transmitted_bytes(&self) -> (u64, u64) {
let sent = *self.sent_bytes.lock().unwrap().deref();
let received = *self.received_bytes.lock().unwrap().deref();
(sent, received)
unimplemented!();
}
/// Close the connection to the remote peer
fn close(&self) {
let stop_send = self.stop_send.borrow();
stop_send.as_ref().unwrap().send(0);
}
}
impl ProtocolV1 {
/// Reads a message from the peer tcp stream and process it, usually simply
/// forwarding to the adapter.
fn process_msg(&self,
mut conn: &mut TcpStream,
adapter: &NetAdapter)
-> Result<(), ser::Error> {
// deser the header to get the message type
let header = try!(ser::deserialize::<MsgHeader>(conn.deref_mut()));
if !header.acceptable() {
return Err(ser::Error::CorruptedData);
}
// wrap our connection with limited byte-counting readers
let mut limit_conn = rw::LimitedRead::new(conn.deref_mut(), MAX_DATA_BYTES);
let mut read_conn = rw::CountingRead::new(&mut limit_conn);
// check the message type and hopefully do what's expected with it
match header.msg_type {
Type::Ping => {
// respond with pong
try!(self.send_pong());
}
Type::Pong => {}
Type::Transaction => {
let tx = try!(ser::deserialize(&mut read_conn));
adapter.transaction_received(tx);
}
Type::Block => {
let b = try!(ser::deserialize(&mut read_conn));
adapter.block_received(b);
}
_ => error!("uncaught unknown"),
}
// update total of bytes sent
let mut received_bytes = self.received_bytes.lock().unwrap();
*received_bytes += header.serialized_len() + (read_conn.bytes_read() as u64);
Ok(())
}
/// Helper function to avoid boilerplate, builds a header followed by the
/// Writeable body and send the whole thing.
// TODO serialize straight to the connection
fn send_msg(&self, t: Type, body: &ser::Writeable) -> Result<(), ser::Error> {
let mut data = Vec::new();
try!(ser::serialize(&mut data, &MsgHeader::new(t)));
try!(ser::serialize(&mut data, body));
let msg_send = self.msg_send.borrow();
msg_send.as_ref().unwrap().send(data);
Ok(())
}
/// Sends a pong message (usually in reply to ping)
fn send_pong(&self) -> Result<(), ser::Error> {
let data = try!(ser::ser_vec(&MsgHeader::new(Type::Pong)));
let msg_send = self.msg_send.borrow();
msg_send.as_ref().unwrap().send(data);
Ok(())
}
/// Setup internal communication channels to select over
fn setup_channels(&self) -> (Receiver<Vec<u8>>, Receiver<u8>) {
let (msg_send, msg_recv) = sync_channel(10);
let (stop_send, stop_recv) = sync_channel(1);
{
let mut msg_mut = self.msg_send.borrow_mut();
*msg_mut = Some(msg_send);
let mut stop_mut = self.stop_send.borrow_mut();
*stop_mut = Some(stop_send);
}
(msg_recv, stop_recv)
// TODO some kind of shutdown signal
}
}

View file

@ -1,87 +0,0 @@
use std::io::{self, Read, Write, Result};
use core::ser;
/// A Read implementation that counts the number of bytes consumed from an
/// underlying Read.
pub struct CountingRead<'a> {
counter: usize,
source: &'a mut Read,
}
impl<'a> CountingRead<'a> {
/// Creates a new Read wrapping the underlying one, counting bytes consumed
pub fn new(source: &mut Read) -> CountingRead {
CountingRead {
counter: 0,
source: source,
}
}
/// Number of bytes that have been read from the underlying reader
pub fn bytes_read(&self) -> usize {
self.counter
}
}
impl<'a> Read for CountingRead<'a> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
let r = self.source.read(buf);
if let Ok(sz) = r {
self.counter += sz;
}
r
}
}
/// A Read implementation that errors out past a maximum number of bytes read.
pub struct LimitedRead<'a> {
counter: usize,
max: usize,
source: &'a mut Read,
}
impl<'a> LimitedRead<'a> {
/// Creates a new Read wrapping the underlying one, erroring once the
/// max_read bytes has been reached.
pub fn new(source: &mut Read, max_read: usize) -> LimitedRead {
LimitedRead {
counter: 0,
max: max_read,
source: source,
}
}
}
impl<'a> Read for LimitedRead<'a> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
let r = self.source.read(buf);
if let Ok(sz) = r {
self.counter += sz;
}
if self.counter > self.max {
Err(io::Error::new(io::ErrorKind::Interrupted, "Reached read limit."))
} else {
r
}
}
}
/// A Write implementation that counts the number of bytes wrote to an
/// underlying Write.
struct CountingWrite<'a> {
counter: usize,
dest: &'a mut Write,
}
impl<'a> Write for CountingWrite<'a> {
fn write(&mut self, buf: &[u8]) -> Result<usize> {
let w = self.dest.write(buf);
if let Ok(sz) = w {
self.counter += sz;
}
w
}
fn flush(&mut self) -> Result<()> {
self.dest.flush()
}
}

View file

@ -15,17 +15,16 @@
//! Grin server implementation, accepts incoming connections and connects to
//! other peers in the network.
use rand::{self, Rng};
use std::cell::RefCell;
use std::io;
use std::net::{SocketAddr, ToSocketAddrs};
use std::net::SocketAddr;
use std::ops::Deref;
use std::str::FromStr;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use mioco;
use mioco::sync::mpsc::{sync_channel, SyncSender};
use mioco::tcp::{TcpListener, TcpStream};
use futures;
use futures::{Future, Stream};
use tokio_core::net::{TcpListener, TcpStream};
use tokio_core::reactor;
use core::core;
use core::ser::Error;
@ -43,8 +42,8 @@ impl NetAdapter for DummyAdapter {
/// peers, receiving connections from other peers and keep track of all of them.
pub struct Server {
config: P2PConfig,
peers: RwLock<Vec<Arc<Peer>>>,
stop_send: RefCell<Option<SyncSender<u8>>>,
peers: Arc<RwLock<Vec<Arc<Peer>>>>,
stop: RefCell<Option<futures::sync::oneshot::Sender<()>>>,
}
unsafe impl Sync for Server {}
@ -56,111 +55,100 @@ impl Server {
pub fn new(config: P2PConfig) -> Server {
Server {
config: config,
peers: RwLock::new(Vec::new()),
stop_send: RefCell::new(None),
peers: Arc::new(RwLock::new(Vec::new())),
stop: RefCell::new(None),
}
}
/// Starts the p2p server. Opens a TCP port to allow incoming
/// connections and starts the bootstrapping process to find peers.
pub fn start(&self) -> Result<(), Error> {
pub fn start(&self, h: reactor::Handle) -> Box<Future<Item = (), Error = Error>> {
let addr = SocketAddr::new(self.config.host, self.config.port);
let listener = try!(TcpListener::bind(&addr).map_err(&Error::IOErr));
let socket = TcpListener::bind(&addr, &h.clone()).unwrap();
warn!("P2P server started on {}", addr);
let hs = Arc::new(Handshake::new());
let (stop_send, stop_recv) = sync_channel(1);
{
let mut stop_mut = self.stop_send.borrow_mut();
*stop_mut = Some(stop_send);
}
let peers = self.peers.clone();
loop {
select!(
r:listener => {
let conn = try!(listener.accept().map_err(&Error::IOErr));
let hs = hs.clone();
// main peer acceptance future handling handshake
let hp = h.clone();
let peers = socket.incoming().map_err(|e| Error::IOErr(e)).map(move |(conn, addr)| {
let peers = peers.clone();
// accept the peer and add it to the server map
let peer_accept = Peer::accept(conn, &hs.clone()).map(move |(conn, peer)| {
let apeer = Arc::new(peer);
let mut peers = peers.write().unwrap();
peers.push(apeer.clone());
Ok((conn, apeer))
});
let peer = try!(Peer::accept(conn, &hs));
let wpeer = Arc::new(peer);
{
let mut peers = self.peers.write().unwrap();
peers.push(wpeer.clone());
// wire in a future to timeout the accept after 5 secs
let timeout = reactor::Timeout::new(Duration::new(5, 0), &hp).unwrap();
let timed_peer = peer_accept.select(timeout.map(Err).map_err(|e| Error::IOErr(e)))
.then(|res| {
match res {
Ok((Ok((conn, p)), _timeout)) => Ok((conn, p)),
Ok((_, _accept)) => Err(Error::TooLargeReadErr),
Err((e, _other)) => Err(e),
}
});
mioco::spawn(move || -> io::Result<()> {
if let Err(err) = wpeer.run(&DummyAdapter{}) {
error!("{:?}", err);
// run the main peer protocol
timed_peer.and_then(|(conn, peer)| peer.clone().run(conn, &DummyAdapter {}))
});
// spawn each peer future to its own task
let hs = h.clone();
let server = peers.for_each(move |peer| {
hs.spawn(peer.then(|res| {
match res {
Err(e) => info!("Client error: {}", e),
_ => {}
}
futures::finished(())
}));
Ok(())
});
},
r:stop_recv => {
stop_recv.recv();
return Ok(());
// setup the stopping oneshot on the server and join it with the peer future
let (stop, stop_rx) = futures::sync::oneshot::channel();
{
let mut stop_mut = self.stop.borrow_mut();
*stop_mut = Some(stop);
}
);
Box::new(server.select(stop_rx.map_err(|_| Error::CorruptedData)).then(|res| {
match res {
Ok((_, _)) => Ok(()),
Err((e, _)) => Err(e),
}
}))
}
pub fn connect_peer<A: ToSocketAddrs>(&self, addr: A) -> Result<(), Error> {
for sock_addr in addr.to_socket_addrs().unwrap() {
info!("Connecting to peer {}", sock_addr);
let tcp_client = TcpStream::connect(&sock_addr).unwrap();
let peer = try!(Peer::connect(tcp_client, &Handshake::new())
.map_err(|_| io::Error::last_os_error()));
let peer = Arc::new(peer);
let in_peer = peer.clone();
mioco::spawn(move || -> io::Result<()> {
in_peer.run(&DummyAdapter {});
Ok(())
});
self.peers.write().unwrap().push(peer);
}
Ok(())
}
/// Asks all the peers to relay the provided block. A peer may choose to
/// ignore the relay request if it has knowledge that the remote peer
/// already knows the block.
pub fn relay_block(&self, b: &core::Block) -> Result<(), Error> {
let peers = self.peers.write().unwrap();
for p in peers.deref() {
try!(p.send_block(b));
}
Ok(())
}
/// Asks all the peers to relay the provided transaction. A peer may choose
/// to ignore the relay request if it has knowledge that the remote peer
/// already knows the transaction.
pub fn relay_transaction(&self, tx: &core::Transaction) -> Result<(), Error> {
let peers = self.peers.write().unwrap();
for p in peers.deref() {
try!(p.send_transaction(tx));
}
Ok(())
}
/// Number of peers this server is connected to.
pub fn peers_count(&self) -> u32 {
self.peers.read().unwrap().len() as u32
}
/// Gets a random peer from our set of connected peers.
pub fn get_any_peer(&self) -> Arc<Peer> {
let mut rng = rand::thread_rng();
let peers = self.peers.read().unwrap();
peers[rng.gen_range(0, peers.len())].clone()
pub fn connect_peer(&self,
addr: SocketAddr,
h: reactor::Handle)
-> Box<Future<Item = (), Error = Error>> {
let socket = TcpStream::connect(&addr, &h).map_err(|e| Error::IOErr(e));
let peers = self.peers.clone();
let request = socket.and_then(move |socket| {
let peers = peers.clone();
Peer::connect(socket, &Handshake::new()).map(move |(conn, peer)| {
let apeer = Arc::new(peer);
let mut peers = peers.write().unwrap();
peers.push(apeer.clone());
(conn, apeer)
})
})
.and_then(|(socket, peer)| peer.run(socket, &DummyAdapter {}));
Box::new(request)
}
/// Stops the server. Disconnect from all peers at the same time.
pub fn stop(&self) {
pub fn stop(self) {
let peers = self.peers.write().unwrap();
for p in peers.deref() {
p.stop();
}
let stop_send = self.stop_send.borrow();
stop_send.as_ref().unwrap().send(0);
self.stop.into_inner().unwrap().complete(());
}
}

View file

@ -13,6 +13,10 @@
// limitations under the License.
use std::net::{SocketAddr, IpAddr};
use futures::Future;
use tokio_core::net::TcpStream;
use core::core;
use core::ser::Error;
@ -29,7 +33,7 @@ impl Default for P2PConfig {
let ipaddr = "127.0.0.1".parse().unwrap();
P2PConfig {
host: ipaddr,
port: 3414,
port: 13414,
}
}
}
@ -61,7 +65,7 @@ pub trait Protocol {
/// be known already, usually passed during construction. Will typically
/// block so needs to be called withing a coroutine. Should also be called
/// only once.
fn handle(&self, na: &NetAdapter) -> Result<(), Error>;
fn handle(&self, conn: TcpStream, na: &NetAdapter) -> Box<Future<Item = (), Error = Error>>;
/// Sends a ping message to the remote peer.
fn send_ping(&self) -> Result<(), Error>;

View file

@ -1,58 +0,0 @@
// Copyright 2016 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use mioco;
use mioco::tcp::TcpStream;
use std::io;
use std::sync::Arc;
use std::time;
use p2p;
use p2p::Peer;
/// Server setup and teardown around the provided closure.
pub fn with_server<F>(closure: F) where F: Fn(Arc<p2p::Server>) -> io::Result<()>, F: Send + 'static {
mioco::start(move || -> io::Result<()> {
// start a server in its own coroutine
let server = Arc::new(p2p::Server::new());
let in_server = server.clone();
mioco::spawn(move || -> io::Result<()> {
try!(in_server.start().map_err(|_| io::Error::last_os_error()));
Ok(())
});
// giving server a little time to start
mioco::sleep(time::Duration::from_millis(50));
try!(closure(server.clone()));
server.stop();
Ok(())
}).unwrap().unwrap();
}
pub fn connect_peer() -> io::Result<Arc<Peer>> {
let addr = p2p::DEFAULT_LISTEN_ADDR.parse().unwrap();
let tcp_client = TcpStream::connect(&addr).unwrap();
let peer = try!(Peer::accept(tcp_client, &p2p::handshake::Handshake::new()).map_err(|_| io::Error::last_os_error()));
mioco::sleep(time::Duration::from_millis(50));
let peer = Arc::new(peer);
let in_peer = peer.clone();
mioco::spawn(move || -> io::Result<()> {
in_peer.run(&p2p::DummyAdapter{});
Ok(())
});
mioco::sleep(time::Duration::from_millis(50));
Ok(peer.clone())
}

View file

@ -13,40 +13,58 @@
// limitations under the License.
extern crate grin_core as core;
extern crate grin_p2p as p2p;
extern crate mioco;
extern crate grin_p2p_fut as p2p;
extern crate env_logger;
extern crate futures;
extern crate tokio_core;
mod common;
use std::io;
use std::net::SocketAddr;
use std::time;
use core::core::*;
use futures::future::Future;
use tokio_core::net::TcpStream;
use tokio_core::reactor::{self, Core};
use core::ser;
use p2p::Peer;
use common::*;
// Starts a server and connects a client peer to it to check handshake, followed by a ping/pong exchange to make sure the connection is live.
#[test]
fn peer_handshake() {
env_logger::init().unwrap();
with_server(|server| -> io::Result<()> {
// connect a client peer to the server
let peer = try!(connect_peer());
let mut evtlp = Core::new().unwrap();
let handle = evtlp.handle();
let p2p_conf = p2p::P2PConfig::default();
let server = p2p::Server::new(p2p_conf);
let run_server = server.start(handle.clone());
// check server peer count
let pc = server.peers_count();
assert_eq!(pc, 1);
// send a ping and check we got ponged (received data back)
peer.send_ping();
mioco::sleep(time::Duration::from_millis(50));
let phandle = handle.clone();
let rhandle = handle.clone();
let timeout = reactor::Timeout::new(time::Duration::new(1, 0), &handle).unwrap();
let timeout_send = reactor::Timeout::new(time::Duration::new(2, 0), &handle).unwrap();
handle.spawn(timeout.map_err(|e| ser::Error::IOErr(e)).and_then(move |_| {
let p2p_conf = p2p::P2PConfig::default();
let addr = SocketAddr::new(p2p_conf.host, p2p_conf.port);
let socket = TcpStream::connect(&addr, &phandle).map_err(|e| ser::Error::IOErr(e));
socket.and_then(move |socket| {
Peer::connect(socket, &p2p::handshake::Handshake::new())
}).and_then(move |(socket, peer)| {
rhandle.spawn(peer.run(socket, &p2p::DummyAdapter {}).map_err(|e| {
panic!("Client run failed: {}", e);
}));
peer.send_ping().unwrap();
timeout_send.map_err(|e| ser::Error::IOErr(e)).map(|_| peer)
}).and_then(|peer| {
let (sent, recv) = peer.transmitted_bytes();
assert!(sent > 0);
assert!(recv > 0);
peer.stop();
Ok(())
});
}).and_then(|_| {server.stop(); Ok(())})
}).map_err(|e| {
panic!("Client connection failed: {}", e);
}));
evtlp.run(run_server).unwrap();
}

View file

@ -1,82 +0,0 @@
// Copyright 2016 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
extern crate grin_core as core;
extern crate grin_p2p as p2p;
extern crate mioco;
extern crate env_logger;
extern crate rand;
extern crate secp256k1zkp as secp;
mod common;
use rand::Rng;
use rand::os::OsRng;
use std::io;
use std::sync::Arc;
use std::time;
use mioco::tcp::TcpStream;
use secp::Secp256k1;
use secp::key::SecretKey;
use core::core::*;
use p2p::Peer;
use common::*;
// Connects a client peer and send a transaction.
#[test]
fn peer_tx_send() {
with_server(|server| -> io::Result<()> {
// connect a client peer to the server
let peer = try!(connect_peer());
let tx1 = tx2i1o();
peer.send_transaction(&tx1);
mioco::sleep(time::Duration::from_millis(50));
let (sent,_) = peer.transmitted_bytes();
assert!(sent > 1000);
let s_peer = server.get_any_peer();
let (_, recv) = s_peer.transmitted_bytes();
assert!(recv > 1000);
peer.stop();
Ok(())
});
}
// utility producing a transaction with 2 inputs and a single outputs
pub fn tx2i1o() -> Transaction {
let mut rng = OsRng::new().unwrap();
let ref secp = secp::Secp256k1::with_caps(secp::ContextFlag::Commit);
let outh = core::core::hash::ZERO_HASH;
Transaction::new(vec![Input::OvertInput {
output: outh,
value: 10,
blindkey: SecretKey::new(secp, &mut rng),
},
Input::OvertInput {
output: outh,
value: 11,
blindkey: SecretKey::new(secp, &mut rng),
}],
vec![Output::OvertOutput {
value: 20,
blindkey: SecretKey::new(secp, &mut rng),
}],
1).blind(&secp).unwrap()
}