2016-10-25 07:35:10 +03:00
|
|
|
// Copyright 2016 The Grin Developers
|
|
|
|
//
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
// you may not use this file except in compliance with the License.
|
|
|
|
// You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
|
|
|
|
2016-10-29 22:36:45 +03:00
|
|
|
use std::cell::RefCell;
|
2016-12-11 06:11:49 +03:00
|
|
|
use std::iter;
|
|
|
|
use std::ops::DerefMut;
|
|
|
|
use std::sync::{Mutex, Arc};
|
2016-10-29 22:36:45 +03:00
|
|
|
|
2016-12-11 06:11:49 +03:00
|
|
|
use futures;
|
|
|
|
use futures::{Stream, Future};
|
|
|
|
use futures::stream;
|
2016-12-12 00:04:52 +03:00
|
|
|
use futures::sync::mpsc::{UnboundedSender, UnboundedReceiver};
|
2016-12-16 01:57:04 +03:00
|
|
|
use tokio_core::io::{Io, WriteHalf, ReadHalf, write_all, read_exact};
|
2016-12-11 06:11:49 +03:00
|
|
|
use tokio_core::net::TcpStream;
|
2016-10-29 22:36:45 +03:00
|
|
|
|
2016-11-01 20:42:33 +03:00
|
|
|
use core::core;
|
2016-10-25 07:35:10 +03:00
|
|
|
use core::ser;
|
2016-10-26 08:06:13 +03:00
|
|
|
use msg::*;
|
|
|
|
use types::*;
|
2016-10-25 07:35:10 +03:00
|
|
|
|
2016-10-29 22:36:45 +03:00
|
|
|
pub struct ProtocolV1 {
|
2016-12-11 06:11:49 +03:00
|
|
|
outbound_chan: RefCell<Option<UnboundedSender<Vec<u8>>>>,
|
2016-11-03 00:19:40 +03:00
|
|
|
|
2016-12-11 06:11:49 +03:00
|
|
|
// Bytes we've sent.
|
|
|
|
sent_bytes: Arc<Mutex<u64>>,
|
2016-11-03 00:19:40 +03:00
|
|
|
|
2016-11-01 20:42:33 +03:00
|
|
|
// Bytes we've received.
|
2016-12-11 06:11:49 +03:00
|
|
|
received_bytes: Arc<Mutex<u64>>,
|
2016-11-03 00:19:40 +03:00
|
|
|
|
|
|
|
// Counter for read errors.
|
|
|
|
error_count: Mutex<u64>,
|
2016-10-31 22:29:08 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
impl ProtocolV1 {
|
2016-12-11 06:11:49 +03:00
|
|
|
pub fn new() -> ProtocolV1 {
|
2016-10-31 22:29:08 +03:00
|
|
|
ProtocolV1 {
|
2016-12-11 06:11:49 +03:00
|
|
|
outbound_chan: RefCell::new(None),
|
|
|
|
sent_bytes: Arc::new(Mutex::new(0)),
|
|
|
|
received_bytes: Arc::new(Mutex::new(0)),
|
2016-11-06 02:31:45 +03:00
|
|
|
error_count: Mutex::new(0),
|
2016-10-31 22:29:08 +03:00
|
|
|
}
|
|
|
|
}
|
2016-10-25 07:35:10 +03:00
|
|
|
}
|
|
|
|
|
2016-10-29 22:36:45 +03:00
|
|
|
impl Protocol for ProtocolV1 {
|
2016-12-11 06:11:49 +03:00
|
|
|
fn handle(&self,
|
|
|
|
conn: TcpStream,
|
2016-12-16 01:57:04 +03:00
|
|
|
adapter: Arc<NetAdapter>)
|
2016-12-11 06:11:49 +03:00
|
|
|
-> Box<Future<Item = (), Error = ser::Error>> {
|
|
|
|
let (reader, writer) = conn.split();
|
|
|
|
|
|
|
|
// prepare the channel that will transmit data to the connection writer
|
|
|
|
let (tx, rx) = futures::sync::mpsc::unbounded();
|
|
|
|
{
|
|
|
|
let mut out_mut = self.outbound_chan.borrow_mut();
|
|
|
|
*out_mut = Some(tx.clone());
|
2016-10-25 07:35:10 +03:00
|
|
|
}
|
2016-12-11 06:11:49 +03:00
|
|
|
|
2016-12-12 00:04:52 +03:00
|
|
|
// setup the reading future, getting messages from the peer and processing them
|
2016-12-16 01:57:04 +03:00
|
|
|
let read_msg = self.read_msg(tx, reader, adapter).map(|_| ());
|
2016-12-12 00:04:52 +03:00
|
|
|
|
|
|
|
// setting the writing future, getting messages from our system and sending
|
|
|
|
// them out
|
|
|
|
let write_msg = self.write_msg(rx, writer).map(|_| ());
|
|
|
|
|
|
|
|
// select between our different futures and return them
|
|
|
|
Box::new(read_msg.select(write_msg).map(|_| ()).map_err(|(e, _)| e))
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Bytes sent and received by this peer to the remote peer.
|
|
|
|
fn transmitted_bytes(&self) -> (u64, u64) {
|
|
|
|
let sent = *self.sent_bytes.lock().unwrap();
|
|
|
|
let recv = *self.received_bytes.lock().unwrap();
|
|
|
|
(sent, recv)
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Sends a ping message to the remote peer. Will panic if handle has never
|
|
|
|
/// been called on this protocol.
|
|
|
|
fn send_ping(&self) -> Result<(), ser::Error> {
|
|
|
|
self.send_msg(Type::Ping, &Empty {})
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Serializes and sends a block to our remote peer
|
|
|
|
fn send_block(&self, b: &core::Block) -> Result<(), ser::Error> {
|
|
|
|
self.send_msg(Type::Block, b)
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Serializes and sends a transaction to our remote peer
|
|
|
|
fn send_transaction(&self, tx: &core::Transaction) -> Result<(), ser::Error> {
|
|
|
|
self.send_msg(Type::Transaction, tx)
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Close the connection to the remote peer
|
|
|
|
fn close(&self) {
|
|
|
|
// TODO some kind of shutdown signal
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl ProtocolV1 {
|
|
|
|
/// Prepares the future reading from the peer connection, parsing each
|
|
|
|
/// message and forwarding them appropriately based on their type
|
|
|
|
fn read_msg(&self,
|
2016-12-16 01:57:04 +03:00
|
|
|
sender: UnboundedSender<Vec<u8>>,
|
|
|
|
reader: ReadHalf<TcpStream>,
|
|
|
|
adapter: Arc<NetAdapter>)
|
2016-12-12 00:04:52 +03:00
|
|
|
-> Box<Future<Item = ReadHalf<TcpStream>, Error = ser::Error>> {
|
|
|
|
|
2016-12-11 06:11:49 +03:00
|
|
|
// infinite iterator stream so we repeat the message reading logic until the
|
|
|
|
// peer is stopped
|
|
|
|
let iter = stream::iter(iter::repeat(()).map(Ok::<(), ser::Error>));
|
|
|
|
|
|
|
|
// setup the reading future, getting messages from the peer and processing them
|
|
|
|
let recv_bytes = self.received_bytes.clone();
|
|
|
|
let read_msg = iter.fold(reader, move |reader, _| {
|
2016-12-16 01:57:04 +03:00
|
|
|
let mut sender_inner = sender.clone();
|
2016-12-12 00:04:52 +03:00
|
|
|
let recv_bytes = recv_bytes.clone();
|
2016-12-19 02:51:54 +03:00
|
|
|
let adapter = adapter.clone();
|
2016-12-12 00:04:52 +03:00
|
|
|
|
|
|
|
// first read the message header
|
2016-12-11 06:11:49 +03:00
|
|
|
read_exact(reader, vec![0u8; HEADER_LEN as usize])
|
|
|
|
.map_err(|e| ser::Error::IOErr(e))
|
|
|
|
.and_then(move |(reader, buf)| {
|
|
|
|
let header = try!(ser::deserialize::<MsgHeader>(&mut &buf[..]));
|
|
|
|
Ok((reader, header))
|
|
|
|
})
|
2016-12-16 01:57:04 +03:00
|
|
|
.and_then(move |(reader, header)| {
|
2016-12-19 02:51:54 +03:00
|
|
|
// now that we have a size, proceed with the body
|
|
|
|
read_exact(reader, vec![0u8; header.msg_len as usize])
|
|
|
|
.map(|(reader, buf)| (reader, header, buf))
|
|
|
|
.map_err(|e| ser::Error::IOErr(e))
|
2016-12-16 01:57:04 +03:00
|
|
|
})
|
|
|
|
.map(move |(reader, header, buf)| {
|
|
|
|
// add the count of bytes received
|
2016-12-12 00:04:52 +03:00
|
|
|
let mut recv_bytes = recv_bytes.lock().unwrap();
|
|
|
|
*recv_bytes += header.serialized_len() + header.msg_len;
|
2016-12-11 06:11:49 +03:00
|
|
|
|
|
|
|
// and handle the different message types
|
2016-12-19 02:51:54 +03:00
|
|
|
if let Err(e) = handle_payload(adapter, &header, buf, &mut sender_inner) {
|
|
|
|
debug!("Invalid {:?} message: {}", header.msg_type, e);
|
|
|
|
}
|
2016-12-16 01:57:04 +03:00
|
|
|
|
2016-12-11 06:11:49 +03:00
|
|
|
reader
|
|
|
|
})
|
|
|
|
});
|
2016-12-12 00:04:52 +03:00
|
|
|
Box::new(read_msg)
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Prepares the future that gets message data produced by our system and
|
|
|
|
/// sends it to the peer connection
|
|
|
|
fn write_msg(&self,
|
|
|
|
rx: UnboundedReceiver<Vec<u8>>,
|
|
|
|
writer: WriteHalf<TcpStream>)
|
|
|
|
-> Box<Future<Item = WriteHalf<TcpStream>, Error = ser::Error>> {
|
2016-12-11 06:11:49 +03:00
|
|
|
|
|
|
|
let sent_bytes = self.sent_bytes.clone();
|
|
|
|
let send_data = rx.map(move |data| {
|
|
|
|
// add the count of bytes sent
|
|
|
|
let mut sent_bytes = sent_bytes.lock().unwrap();
|
|
|
|
*sent_bytes += data.len() as u64;
|
|
|
|
data
|
|
|
|
})
|
|
|
|
// write the data and make sure the future returns the right types
|
|
|
|
.fold(writer,
|
|
|
|
|writer, data| write_all(writer, data).map_err(|_| ()).map(|(writer, buf)| writer))
|
|
|
|
.map_err(|_| ser::Error::CorruptedData);
|
2016-12-12 00:04:52 +03:00
|
|
|
Box::new(send_data)
|
2016-10-25 07:35:10 +03:00
|
|
|
}
|
2016-10-30 18:24:19 +03:00
|
|
|
|
2016-12-12 00:04:52 +03:00
|
|
|
/// Utility function to send any Writeable. Handles adding the header and
|
|
|
|
/// serialization.
|
|
|
|
fn send_msg(&self, t: Type, body: &ser::Writeable) -> Result<(), ser::Error> {
|
|
|
|
let mut body_data = vec![];
|
|
|
|
try!(ser::serialize(&mut body_data, body));
|
|
|
|
let mut data = vec![];
|
|
|
|
try!(ser::serialize(&mut data, &MsgHeader::new(t, body_data.len() as u64)));
|
|
|
|
data.append(&mut body_data);
|
2016-12-11 06:11:49 +03:00
|
|
|
|
|
|
|
let mut msg_send = self.outbound_chan.borrow_mut();
|
|
|
|
if let Err(e) = msg_send.deref_mut().as_mut().unwrap().send(data) {
|
|
|
|
warn!("Couldn't send message to remote peer: {}", e);
|
|
|
|
}
|
2016-11-01 20:42:33 +03:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
2016-12-16 01:57:04 +03:00
|
|
|
|
2016-12-19 02:51:54 +03:00
|
|
|
fn handle_payload(adapter: Arc<NetAdapter>,
|
|
|
|
header: &MsgHeader,
|
|
|
|
buf: Vec<u8>,
|
|
|
|
sender: &mut UnboundedSender<Vec<u8>>)
|
|
|
|
-> Result<(), ser::Error> {
|
|
|
|
match header.msg_type {
|
|
|
|
Type::Ping => {
|
|
|
|
let data = try!(ser::ser_vec(&MsgHeader::new(Type::Pong, 0)));
|
|
|
|
sender.send(data);
|
|
|
|
}
|
|
|
|
Type::Pong => {}
|
|
|
|
Type::Transaction => {
|
|
|
|
let tx = try!(ser::deserialize::<core::Transaction>(&mut &buf[..]));
|
|
|
|
adapter.transaction_received(tx);
|
|
|
|
}
|
|
|
|
Type::Block => {
|
|
|
|
let b = try!(ser::deserialize::<core::Block>(&mut &buf[..]));
|
|
|
|
adapter.block_received(b);
|
|
|
|
}
|
|
|
|
_ => {
|
|
|
|
debug!("unknown message type {:?}", header.msg_type);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
Ok(())
|
2016-12-16 01:57:04 +03:00
|
|
|
}
|