2017-01-30 02:52:01 +03:00
|
|
|
// Copyright 2016 The Grin Developers
|
|
|
|
//
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
// you may not use this file except in compliance with the License.
|
|
|
|
// You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
|
|
|
|
2017-02-02 06:05:17 +03:00
|
|
|
//! Provides a connection wrapper that handles the lower level tasks in sending
|
2017-03-08 04:00:34 +03:00
|
|
|
//! or receiving data from the TCP socket, as well as dealing with timeouts.
|
2017-01-30 02:52:01 +03:00
|
|
|
|
|
|
|
use std::iter;
|
2017-02-02 06:05:17 +03:00
|
|
|
use std::ops::Deref;
|
2017-11-01 02:32:33 +03:00
|
|
|
use std::sync::{Arc, Mutex};
|
|
|
|
use std::time::{Duration, Instant};
|
2017-01-30 02:52:01 +03:00
|
|
|
|
|
|
|
use futures;
|
2017-12-14 00:30:59 +03:00
|
|
|
use futures::{Future, Stream};
|
|
|
|
use futures::stream;
|
2017-11-01 02:32:33 +03:00
|
|
|
use futures::sync::mpsc::{Sender, UnboundedReceiver, UnboundedSender};
|
2017-01-30 02:52:01 +03:00
|
|
|
use tokio_core::net::TcpStream;
|
2017-08-10 03:54:10 +03:00
|
|
|
use tokio_io::{AsyncRead, AsyncWrite};
|
|
|
|
use tokio_io::io::{read_exact, write_all};
|
2017-02-02 06:05:17 +03:00
|
|
|
use tokio_timer::{Timer, TimerError};
|
2017-01-30 02:52:01 +03:00
|
|
|
|
2017-08-10 03:54:10 +03:00
|
|
|
use core::core::hash::Hash;
|
2017-01-30 02:52:01 +03:00
|
|
|
use core::ser;
|
|
|
|
use msg::*;
|
2017-02-27 07:08:40 +03:00
|
|
|
use types::Error;
|
2017-06-18 02:15:46 +03:00
|
|
|
use rate_limit::*;
|
2017-10-12 19:56:44 +03:00
|
|
|
use util::LOGGER;
|
2017-01-30 02:52:01 +03:00
|
|
|
|
2017-02-02 06:05:17 +03:00
|
|
|
/// Handler to provide to the connection, will be called back anytime a message
|
|
|
|
/// is received. The provided sender can be use to immediately send back
|
|
|
|
/// another message.
|
2017-01-30 02:52:01 +03:00
|
|
|
pub trait Handler: Sync + Send {
|
2017-02-02 06:05:17 +03:00
|
|
|
/// Handle function to implement to process incoming messages. A sender to
|
|
|
|
/// reply immediately as well as the message header and its unparsed body
|
|
|
|
/// are provided.
|
2017-10-26 20:48:51 +03:00
|
|
|
fn handle(
|
|
|
|
&self,
|
|
|
|
sender: UnboundedSender<Vec<u8>>,
|
|
|
|
header: MsgHeader,
|
|
|
|
body: Vec<u8>,
|
|
|
|
) -> Result<Option<Hash>, ser::Error>;
|
2017-01-30 02:52:01 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
impl<F> Handler for F
|
2017-10-26 20:48:51 +03:00
|
|
|
where
|
|
|
|
F: Fn(UnboundedSender<Vec<u8>>, MsgHeader, Vec<u8>)
|
2017-11-01 02:32:33 +03:00
|
|
|
-> Result<Option<Hash>, ser::Error>,
|
2017-10-26 20:48:51 +03:00
|
|
|
F: Sync + Send,
|
2017-02-02 06:05:17 +03:00
|
|
|
{
|
2017-10-26 20:48:51 +03:00
|
|
|
fn handle(
|
|
|
|
&self,
|
|
|
|
sender: UnboundedSender<Vec<u8>>,
|
|
|
|
header: MsgHeader,
|
|
|
|
body: Vec<u8>,
|
|
|
|
) -> Result<Option<Hash>, ser::Error> {
|
2017-02-02 06:05:17 +03:00
|
|
|
self(sender, header, body)
|
|
|
|
}
|
2017-01-30 02:52:01 +03:00
|
|
|
}
|
|
|
|
|
2017-02-02 06:05:17 +03:00
|
|
|
/// A higher level connection wrapping the TcpStream. Maintains the amount of
|
|
|
|
/// data transmitted and deals with the low-level task of sending and
|
|
|
|
/// receiving data, parsing message headers and timeouts.
|
2017-08-10 03:54:10 +03:00
|
|
|
#[allow(dead_code)]
|
2017-01-30 02:52:01 +03:00
|
|
|
pub struct Connection {
|
|
|
|
// Channel to push bytes to the remote peer
|
|
|
|
outbound_chan: UnboundedSender<Vec<u8>>,
|
|
|
|
|
|
|
|
// Close the connection with the remote peer
|
|
|
|
close_chan: Sender<()>,
|
|
|
|
|
|
|
|
// Bytes we've sent.
|
|
|
|
sent_bytes: Arc<Mutex<u64>>,
|
|
|
|
|
|
|
|
// Bytes we've received.
|
|
|
|
received_bytes: Arc<Mutex<u64>>,
|
|
|
|
|
|
|
|
// Counter for read errors.
|
|
|
|
error_count: Mutex<u64>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Connection {
|
2017-02-02 06:05:17 +03:00
|
|
|
/// Start listening on the provided connection and wraps it. Does not hang
|
|
|
|
/// the current thread, instead just returns a future and the Connection
|
|
|
|
/// itself.
|
2017-10-26 20:48:51 +03:00
|
|
|
pub fn listen<F>(
|
|
|
|
conn: TcpStream,
|
|
|
|
handler: F,
|
|
|
|
) -> (Connection, Box<Future<Item = (), Error = Error>>)
|
|
|
|
where
|
|
|
|
F: Handler + 'static,
|
2017-02-02 06:05:17 +03:00
|
|
|
{
|
2017-01-30 02:52:01 +03:00
|
|
|
let (reader, writer) = conn.split();
|
|
|
|
|
2017-06-18 02:15:46 +03:00
|
|
|
// Set Max Read to 12 Mb/s
|
|
|
|
let reader = ThrottledReader::new(reader, 12_000_000);
|
|
|
|
// Set Max Write to 12 Mb/s
|
|
|
|
let writer = ThrottledWriter::new(writer, 12_000_000);
|
|
|
|
|
2017-01-30 02:52:01 +03:00
|
|
|
// prepare the channel that will transmit data to the connection writer
|
|
|
|
let (tx, rx) = futures::sync::mpsc::unbounded();
|
|
|
|
|
|
|
|
// same for closing the connection
|
|
|
|
let (close_tx, close_rx) = futures::sync::mpsc::channel(1);
|
2017-11-01 02:32:33 +03:00
|
|
|
let close_conn = close_rx
|
|
|
|
.for_each(|_| Ok(()))
|
|
|
|
.map_err(|_| Error::ConnectionClose);
|
2017-01-30 02:52:01 +03:00
|
|
|
|
|
|
|
let me = Connection {
|
|
|
|
outbound_chan: tx.clone(),
|
|
|
|
close_chan: close_tx,
|
|
|
|
sent_bytes: Arc::new(Mutex::new(0)),
|
|
|
|
received_bytes: Arc::new(Mutex::new(0)),
|
|
|
|
error_count: Mutex::new(0),
|
|
|
|
};
|
|
|
|
|
|
|
|
// setup the reading future, getting messages from the peer and processing them
|
2017-12-14 00:30:59 +03:00
|
|
|
let read_msg = me.read_msg(tx, reader, handler).map(|_| ());
|
2017-01-30 02:52:01 +03:00
|
|
|
|
2017-12-14 00:30:59 +03:00
|
|
|
// setting the writing future, getting messages from our system and sending
|
|
|
|
// them out
|
2017-01-30 02:52:01 +03:00
|
|
|
let write_msg = me.write_msg(rx, writer).map(|_| ());
|
|
|
|
|
|
|
|
// select between our different futures and return them
|
2017-09-29 21:44:25 +03:00
|
|
|
let fut = Box::new(
|
|
|
|
close_conn
|
|
|
|
.select(read_msg.select(write_msg).map(|_| ()).map_err(|(e, _)| e))
|
2017-02-02 06:05:17 +03:00
|
|
|
.map(|_| ())
|
2017-09-29 21:44:25 +03:00
|
|
|
.map_err(|(e, _)| e),
|
|
|
|
);
|
2017-01-30 02:52:01 +03:00
|
|
|
|
2017-02-02 06:05:17 +03:00
|
|
|
(me, fut)
|
2017-01-30 02:52:01 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Prepares the future that gets message data produced by our system and
|
|
|
|
/// sends it to the peer connection
|
2017-10-26 20:48:51 +03:00
|
|
|
fn write_msg<W>(
|
|
|
|
&self,
|
|
|
|
rx: UnboundedReceiver<Vec<u8>>,
|
|
|
|
writer: W,
|
|
|
|
) -> Box<Future<Item = W, Error = Error>>
|
|
|
|
where
|
|
|
|
W: AsyncWrite + 'static,
|
2017-06-18 02:15:46 +03:00
|
|
|
{
|
2017-01-30 02:52:01 +03:00
|
|
|
let sent_bytes = self.sent_bytes.clone();
|
2017-02-27 07:08:40 +03:00
|
|
|
let send_data = rx
|
|
|
|
.map_err(|_| Error::ConnectionClose)
|
2017-12-14 00:30:59 +03:00
|
|
|
.map(move |data| {
|
|
|
|
// add the count of bytes sent
|
2017-01-30 02:52:01 +03:00
|
|
|
let mut sent_bytes = sent_bytes.lock().unwrap();
|
|
|
|
*sent_bytes += data.len() as u64;
|
|
|
|
data
|
|
|
|
})
|
2017-12-14 00:30:59 +03:00
|
|
|
// write the data and make sure the future returns the right types
|
2017-02-28 01:17:53 +03:00
|
|
|
.fold(writer, |writer, data| {
|
2017-12-14 00:30:59 +03:00
|
|
|
write_all(writer, data).map_err(|e| Error::Connection(e)).map(|(writer, _)| writer)
|
|
|
|
});
|
2017-01-30 02:52:01 +03:00
|
|
|
Box::new(send_data)
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Prepares the future reading from the peer connection, parsing each
|
|
|
|
/// message and forwarding them appropriately based on their type
|
2017-10-26 20:48:51 +03:00
|
|
|
fn read_msg<F, R>(
|
|
|
|
&self,
|
|
|
|
sender: UnboundedSender<Vec<u8>>,
|
|
|
|
reader: R,
|
|
|
|
handler: F,
|
|
|
|
) -> Box<Future<Item = R, Error = Error>>
|
|
|
|
where
|
|
|
|
F: Handler + 'static,
|
2017-12-14 00:30:59 +03:00
|
|
|
R: AsyncRead + 'static,
|
2017-02-02 06:05:17 +03:00
|
|
|
{
|
2017-01-30 02:52:01 +03:00
|
|
|
// infinite iterator stream so we repeat the message reading logic until the
|
2017-12-14 00:30:59 +03:00
|
|
|
// peer is stopped
|
2017-10-02 00:34:14 +03:00
|
|
|
let iter = stream::iter_ok(iter::repeat(()).map(Ok::<(), Error>));
|
2017-01-30 02:52:01 +03:00
|
|
|
|
|
|
|
// setup the reading future, getting messages from the peer and processing them
|
|
|
|
let recv_bytes = self.received_bytes.clone();
|
2017-02-02 06:05:17 +03:00
|
|
|
let handler = Arc::new(handler);
|
2017-01-30 02:52:01 +03:00
|
|
|
|
2017-12-14 00:30:59 +03:00
|
|
|
let read_msg = iter.fold(reader, move |reader, _| {
|
2017-01-30 02:52:01 +03:00
|
|
|
let recv_bytes = recv_bytes.clone();
|
2017-02-02 06:05:17 +03:00
|
|
|
let handler = handler.clone();
|
|
|
|
let sender_inner = sender.clone();
|
2017-01-30 02:52:01 +03:00
|
|
|
|
|
|
|
// first read the message header
|
|
|
|
read_exact(reader, vec![0u8; HEADER_LEN as usize])
|
2017-02-27 07:08:40 +03:00
|
|
|
.from_err()
|
2017-01-30 02:52:01 +03:00
|
|
|
.and_then(move |(reader, buf)| {
|
|
|
|
let header = try!(ser::deserialize::<MsgHeader>(&mut &buf[..]));
|
|
|
|
Ok((reader, header))
|
|
|
|
})
|
|
|
|
.and_then(move |(reader, header)| {
|
|
|
|
// now that we have a size, proceed with the body
|
|
|
|
read_exact(reader, vec![0u8; header.msg_len as usize])
|
|
|
|
.map(|(reader, buf)| (reader, header, buf))
|
2017-02-27 07:08:40 +03:00
|
|
|
.from_err()
|
2017-01-30 02:52:01 +03:00
|
|
|
})
|
2017-02-27 07:08:40 +03:00
|
|
|
.and_then(move |(reader, header, buf)| {
|
2017-01-30 02:52:01 +03:00
|
|
|
// add the count of bytes received
|
|
|
|
let mut recv_bytes = recv_bytes.lock().unwrap();
|
|
|
|
*recv_bytes += header.serialized_len() + header.msg_len;
|
|
|
|
|
2017-12-14 00:30:59 +03:00
|
|
|
// and handle the different message types
|
|
|
|
let msg_type = header.msg_type;
|
|
|
|
if let Err(e) = handler.handle(sender_inner.clone(), header, buf) {
|
|
|
|
debug!(LOGGER, "Invalid {:?} message: {}", msg_type, e);
|
|
|
|
return Err(Error::Serialization(e));
|
|
|
|
}
|
2017-01-30 02:52:01 +03:00
|
|
|
|
2017-12-14 00:30:59 +03:00
|
|
|
Ok(reader)
|
2017-01-30 02:52:01 +03:00
|
|
|
})
|
|
|
|
});
|
|
|
|
Box::new(read_msg)
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Utility function to send any Writeable. Handles adding the header and
|
|
|
|
/// serialization.
|
2017-04-25 04:47:36 +03:00
|
|
|
pub fn send_msg<W: ser::Writeable>(&self, t: Type, body: &W) -> Result<(), Error> {
|
2017-01-30 02:52:01 +03:00
|
|
|
let mut body_data = vec![];
|
|
|
|
try!(ser::serialize(&mut body_data, body));
|
|
|
|
let mut data = vec![];
|
2017-10-26 20:48:51 +03:00
|
|
|
try!(ser::serialize(
|
|
|
|
&mut data,
|
|
|
|
&MsgHeader::new(t, body_data.len() as u64),
|
|
|
|
));
|
2017-01-30 02:52:01 +03:00
|
|
|
data.append(&mut body_data);
|
|
|
|
|
2017-11-01 02:32:33 +03:00
|
|
|
self.outbound_chan
|
|
|
|
.unbounded_send(data)
|
|
|
|
.map_err(|_| Error::ConnectionClose)
|
2017-01-30 02:52:01 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Bytes sent and received by this peer to the remote peer.
|
|
|
|
pub fn transmitted_bytes(&self) -> (u64, u64) {
|
|
|
|
let sent = *self.sent_bytes.lock().unwrap();
|
|
|
|
let recv = *self.received_bytes.lock().unwrap();
|
|
|
|
(sent, recv)
|
|
|
|
}
|
|
|
|
}
|
2017-02-02 06:05:17 +03:00
|
|
|
|
|
|
|
/// Connection wrapper that handles a request/response oriented interaction with
|
|
|
|
/// a timeout.
|
|
|
|
pub struct TimeoutConnection {
|
|
|
|
underlying: Connection,
|
|
|
|
|
2017-02-19 05:42:34 +03:00
|
|
|
expected_responses: Arc<Mutex<Vec<(Type, Option<Hash>, Instant)>>>,
|
2017-02-02 06:05:17 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
impl TimeoutConnection {
|
|
|
|
/// Same as Connection
|
2017-10-26 20:48:51 +03:00
|
|
|
pub fn listen<F>(
|
|
|
|
conn: TcpStream,
|
|
|
|
handler: F,
|
|
|
|
) -> (TimeoutConnection, Box<Future<Item = (), Error = Error>>)
|
|
|
|
where
|
|
|
|
F: Handler + 'static,
|
2017-02-02 06:05:17 +03:00
|
|
|
{
|
|
|
|
let expects = Arc::new(Mutex::new(vec![]));
|
|
|
|
|
|
|
|
// Decorates the handler to remove the "subscription" from the expected
|
2017-11-20 04:33:45 +03:00
|
|
|
// responses. We got our replies, so no timeout should occur.
|
2017-02-02 06:05:17 +03:00
|
|
|
let exp = expects.clone();
|
2017-12-14 00:30:59 +03:00
|
|
|
let (conn, fut) = Connection::listen(conn, move |sender, header: MsgHeader, data| {
|
2017-02-02 06:05:17 +03:00
|
|
|
let msg_type = header.msg_type;
|
|
|
|
let recv_h = try!(handler.handle(sender, header, data));
|
|
|
|
|
|
|
|
let mut expects = exp.lock().unwrap();
|
2017-09-29 21:44:25 +03:00
|
|
|
let filtered = expects
|
|
|
|
.iter()
|
2017-02-19 05:42:34 +03:00
|
|
|
.filter(|&&(typ, h, _): &&(Type, Option<Hash>, Instant)| {
|
|
|
|
msg_type != typ || h.is_some() && recv_h != h
|
2017-02-08 00:52:17 +03:00
|
|
|
})
|
2017-02-02 06:05:17 +03:00
|
|
|
.map(|&x| x)
|
|
|
|
.collect::<Vec<_>>();
|
|
|
|
*expects = filtered;
|
|
|
|
|
|
|
|
Ok(recv_h)
|
|
|
|
});
|
|
|
|
|
|
|
|
// Registers a timer with the event loop to regularly check for timeouts.
|
|
|
|
let exp = expects.clone();
|
|
|
|
let timer = Timer::default()
|
|
|
|
.interval(Duration::new(2, 0))
|
|
|
|
.fold((), move |_, _| {
|
|
|
|
let exp = exp.lock().unwrap();
|
2017-11-20 04:33:45 +03:00
|
|
|
for &(ty, h, t) in exp.deref() {
|
|
|
|
if Instant::now() - t > Duration::new(5, 0) {
|
|
|
|
trace!(LOGGER, "Too long: {:?} {:?}", ty, h);
|
2017-02-02 06:05:17 +03:00
|
|
|
return Err(TimerError::TooLong);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
})
|
2017-02-27 07:08:40 +03:00
|
|
|
.from_err();
|
2017-02-02 06:05:17 +03:00
|
|
|
|
|
|
|
let me = TimeoutConnection {
|
|
|
|
underlying: conn,
|
|
|
|
expected_responses: expects,
|
|
|
|
};
|
2017-10-26 20:48:51 +03:00
|
|
|
(
|
|
|
|
me,
|
|
|
|
Box::new(fut.select(timer).map(|_| ()).map_err(|(e1, _)| e1)),
|
|
|
|
)
|
2017-02-02 06:05:17 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Sends a request and registers a timer on the provided message type and
|
|
|
|
/// optionally the hash of the sent data.
|
2017-10-26 20:48:51 +03:00
|
|
|
pub fn send_request<W: ser::Writeable>(
|
|
|
|
&self,
|
|
|
|
t: Type,
|
|
|
|
rt: Type,
|
|
|
|
body: &W,
|
|
|
|
expect_h: Option<(Hash)>,
|
|
|
|
) -> Result<(), Error> {
|
2017-08-10 03:54:10 +03:00
|
|
|
let _sent = try!(self.underlying.send_msg(t, body));
|
2017-02-02 06:05:17 +03:00
|
|
|
|
|
|
|
let mut expects = self.expected_responses.lock().unwrap();
|
2017-02-19 05:42:34 +03:00
|
|
|
expects.push((rt, expect_h, Instant::now()));
|
2017-02-02 06:05:17 +03:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Same as Connection
|
2017-04-07 08:54:54 +03:00
|
|
|
pub fn send_msg<W: ser::Writeable>(&self, t: Type, body: &W) -> Result<(), Error> {
|
2017-02-02 06:05:17 +03:00
|
|
|
self.underlying.send_msg(t, body)
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Same as Connection
|
|
|
|
pub fn transmitted_bytes(&self) -> (u64, u64) {
|
|
|
|
self.underlying.transmitted_bytes()
|
|
|
|
}
|
|
|
|
}
|