mirror of
https://github.com/mimblewimble/grin.git
synced 2025-01-21 03:21:08 +03:00
Various cleanup and reorganization.
This commit is contained in:
parent
4b5c010b05
commit
7bea7341ab
4 changed files with 122 additions and 63 deletions
|
@ -50,6 +50,7 @@ impl Handshake {
|
||||||
pub fn connect(&self,
|
pub fn connect(&self,
|
||||||
conn: TcpStream)
|
conn: TcpStream)
|
||||||
-> Box<Future<Item = (TcpStream, ProtocolV1, PeerInfo), Error = Error>> {
|
-> Box<Future<Item = (TcpStream, ProtocolV1, PeerInfo), Error = Error>> {
|
||||||
|
// prepare the first part of the hanshake
|
||||||
let nonce = self.next_nonce();
|
let nonce = self.next_nonce();
|
||||||
let hand = Hand {
|
let hand = Hand {
|
||||||
version: PROTOCOL_VERSION,
|
version: PROTOCOL_VERSION,
|
||||||
|
@ -59,10 +60,10 @@ impl Handshake {
|
||||||
receiver_addr: SockAddr(conn.peer_addr().unwrap()),
|
receiver_addr: SockAddr(conn.peer_addr().unwrap()),
|
||||||
user_agent: USER_AGENT.to_string(),
|
user_agent: USER_AGENT.to_string(),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// write and read the handshake response
|
||||||
Box::new(write_msg(conn, hand, Type::Hand)
|
Box::new(write_msg(conn, hand, Type::Hand)
|
||||||
.and_then(|conn| {
|
.and_then(|conn| read_msg::<Shake>(conn))
|
||||||
read_msg::<Shake>(conn)
|
|
||||||
})
|
|
||||||
.and_then(|(conn, shake)| {
|
.and_then(|(conn, shake)| {
|
||||||
if shake.version != 1 {
|
if shake.version != 1 {
|
||||||
Err(Error::UnexpectedData {
|
Err(Error::UnexpectedData {
|
||||||
|
|
|
@ -100,10 +100,12 @@ pub fn write_msg<T>(conn: TcpStream,
|
||||||
let mut body_buf = vec![];
|
let mut body_buf = vec![];
|
||||||
ser::serialize(&mut body_buf, &msg);
|
ser::serialize(&mut body_buf, &msg);
|
||||||
|
|
||||||
// build and send the header using the body size
|
// build and serialize the header using the body size
|
||||||
let mut header_buf = vec![];
|
let mut header_buf = vec![];
|
||||||
let blen = body_buf.len() as u64;
|
let blen = body_buf.len() as u64;
|
||||||
ser::serialize(&mut header_buf, &MsgHeader::new(msg_type, blen));
|
ser::serialize(&mut header_buf, &MsgHeader::new(msg_type, blen));
|
||||||
|
|
||||||
|
// send the whole thing
|
||||||
write_all(conn, header_buf)
|
write_all(conn, header_buf)
|
||||||
.and_then(|(conn, _)| write_all(conn, body_buf))
|
.and_then(|(conn, _)| write_all(conn, body_buf))
|
||||||
.map(|(conn, _)| conn)
|
.map(|(conn, _)| conn)
|
||||||
|
|
|
@ -20,8 +20,8 @@ use std::sync::{Mutex, Arc};
|
||||||
use futures;
|
use futures;
|
||||||
use futures::{Stream, Future};
|
use futures::{Stream, Future};
|
||||||
use futures::stream;
|
use futures::stream;
|
||||||
use futures::sync::mpsc::UnboundedSender;
|
use futures::sync::mpsc::{UnboundedSender, UnboundedReceiver};
|
||||||
use tokio_core::io::{Io, write_all, read_exact, read_to_end};
|
use tokio_core::io::{Io, WriteHalf, ReadHalf, write_all, read_exact, read_to_end};
|
||||||
use tokio_core::net::TcpStream;
|
use tokio_core::net::TcpStream;
|
||||||
|
|
||||||
use core::core;
|
use core::core;
|
||||||
|
@ -67,6 +67,54 @@ impl Protocol for ProtocolV1 {
|
||||||
*out_mut = Some(tx.clone());
|
*out_mut = Some(tx.clone());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// setup the reading future, getting messages from the peer and processing them
|
||||||
|
let read_msg = self.read_msg(tx, reader).map(|_| ());
|
||||||
|
|
||||||
|
// setting the writing future, getting messages from our system and sending
|
||||||
|
// them out
|
||||||
|
let write_msg = self.write_msg(rx, writer).map(|_| ());
|
||||||
|
|
||||||
|
// select between our different futures and return them
|
||||||
|
Box::new(read_msg.select(write_msg).map(|_| ()).map_err(|(e, _)| e))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Bytes sent and received by this peer to the remote peer.
|
||||||
|
fn transmitted_bytes(&self) -> (u64, u64) {
|
||||||
|
let sent = *self.sent_bytes.lock().unwrap();
|
||||||
|
let recv = *self.received_bytes.lock().unwrap();
|
||||||
|
(sent, recv)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sends a ping message to the remote peer. Will panic if handle has never
|
||||||
|
/// been called on this protocol.
|
||||||
|
fn send_ping(&self) -> Result<(), ser::Error> {
|
||||||
|
self.send_msg(Type::Ping, &Empty {})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Serializes and sends a block to our remote peer
|
||||||
|
fn send_block(&self, b: &core::Block) -> Result<(), ser::Error> {
|
||||||
|
self.send_msg(Type::Block, b)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Serializes and sends a transaction to our remote peer
|
||||||
|
fn send_transaction(&self, tx: &core::Transaction) -> Result<(), ser::Error> {
|
||||||
|
self.send_msg(Type::Transaction, tx)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Close the connection to the remote peer
|
||||||
|
fn close(&self) {
|
||||||
|
// TODO some kind of shutdown signal
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ProtocolV1 {
|
||||||
|
/// Prepares the future reading from the peer connection, parsing each
|
||||||
|
/// message and forwarding them appropriately based on their type
|
||||||
|
fn read_msg(&self,
|
||||||
|
tx: UnboundedSender<Vec<u8>>,
|
||||||
|
reader: ReadHalf<TcpStream>)
|
||||||
|
-> Box<Future<Item = ReadHalf<TcpStream>, Error = ser::Error>> {
|
||||||
|
|
||||||
// infinite iterator stream so we repeat the message reading logic until the
|
// infinite iterator stream so we repeat the message reading logic until the
|
||||||
// peer is stopped
|
// peer is stopped
|
||||||
let iter = stream::iter(iter::repeat(()).map(Ok::<(), ser::Error>));
|
let iter = stream::iter(iter::repeat(()).map(Ok::<(), ser::Error>));
|
||||||
|
@ -75,18 +123,19 @@ impl Protocol for ProtocolV1 {
|
||||||
let recv_bytes = self.received_bytes.clone();
|
let recv_bytes = self.received_bytes.clone();
|
||||||
let read_msg = iter.fold(reader, move |reader, _| {
|
let read_msg = iter.fold(reader, move |reader, _| {
|
||||||
let mut tx_inner = tx.clone();
|
let mut tx_inner = tx.clone();
|
||||||
let recv_bytes = recv_bytes.clone();
|
let recv_bytes = recv_bytes.clone();
|
||||||
|
|
||||||
|
// first read the message header
|
||||||
read_exact(reader, vec![0u8; HEADER_LEN as usize])
|
read_exact(reader, vec![0u8; HEADER_LEN as usize])
|
||||||
.map_err(|e| ser::Error::IOErr(e))
|
.map_err(|e| ser::Error::IOErr(e))
|
||||||
.and_then(move |(reader, buf)| {
|
.and_then(move |(reader, buf)| {
|
||||||
// first read the message header
|
|
||||||
let header = try!(ser::deserialize::<MsgHeader>(&mut &buf[..]));
|
let header = try!(ser::deserialize::<MsgHeader>(&mut &buf[..]));
|
||||||
Ok((reader, header))
|
Ok((reader, header))
|
||||||
})
|
})
|
||||||
.map(move |(reader, header)| {
|
.map(move |(reader, header)| {
|
||||||
// add the count of bytes sent
|
// add the count of bytes sent
|
||||||
let mut recv_bytes = recv_bytes.lock().unwrap();
|
let mut recv_bytes = recv_bytes.lock().unwrap();
|
||||||
*recv_bytes += header.serialized_len() + header.msg_len;
|
*recv_bytes += header.serialized_len() + header.msg_len;
|
||||||
|
|
||||||
// and handle the different message types
|
// and handle the different message types
|
||||||
match header.msg_type {
|
match header.msg_type {
|
||||||
|
@ -104,9 +153,16 @@ impl Protocol for ProtocolV1 {
|
||||||
reader
|
reader
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
|
Box::new(read_msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Prepares the future that gets message data produced by our system and
|
||||||
|
/// sends it to the peer connection
|
||||||
|
fn write_msg(&self,
|
||||||
|
rx: UnboundedReceiver<Vec<u8>>,
|
||||||
|
writer: WriteHalf<TcpStream>)
|
||||||
|
-> Box<Future<Item = WriteHalf<TcpStream>, Error = ser::Error>> {
|
||||||
|
|
||||||
// setting the writing future, getting messages from our system and sending
|
|
||||||
// them out
|
|
||||||
let sent_bytes = self.sent_bytes.clone();
|
let sent_bytes = self.sent_bytes.clone();
|
||||||
let send_data = rx.map(move |data| {
|
let send_data = rx.map(move |data| {
|
||||||
// add the count of bytes sent
|
// add the count of bytes sent
|
||||||
|
@ -117,43 +173,23 @@ impl Protocol for ProtocolV1 {
|
||||||
// write the data and make sure the future returns the right types
|
// write the data and make sure the future returns the right types
|
||||||
.fold(writer,
|
.fold(writer,
|
||||||
|writer, data| write_all(writer, data).map_err(|_| ()).map(|(writer, buf)| writer))
|
|writer, data| write_all(writer, data).map_err(|_| ()).map(|(writer, buf)| writer))
|
||||||
.map(|_| ())
|
|
||||||
.map_err(|_| ser::Error::CorruptedData);
|
.map_err(|_| ser::Error::CorruptedData);
|
||||||
|
Box::new(send_data)
|
||||||
// select between our different futures and return them
|
|
||||||
Box::new(read_msg.map(|_| ()).select(send_data).map(|_| ()).map_err(|(e, _)| e))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Bytes sent and received by this peer to the remote peer.
|
/// Utility function to send any Writeable. Handles adding the header and
|
||||||
fn transmitted_bytes(&self) -> (u64, u64) {
|
/// serialization.
|
||||||
let sent = *self.sent_bytes.lock().unwrap();
|
fn send_msg(&self, t: Type, body: &ser::Writeable) -> Result<(), ser::Error> {
|
||||||
let recv = *self.received_bytes.lock().unwrap();
|
let mut body_data = vec![];
|
||||||
(sent, recv)
|
try!(ser::serialize(&mut body_data, body));
|
||||||
}
|
let mut data = vec![];
|
||||||
|
try!(ser::serialize(&mut data, &MsgHeader::new(t, body_data.len() as u64)));
|
||||||
|
data.append(&mut body_data);
|
||||||
|
|
||||||
/// Sends a ping message to the remote peer. Will panic if handle has never
|
|
||||||
/// been called on this protocol.
|
|
||||||
fn send_ping(&self) -> Result<(), ser::Error> {
|
|
||||||
let data = try!(ser::ser_vec(&MsgHeader::new(Type::Ping, 0)));
|
|
||||||
let mut msg_send = self.outbound_chan.borrow_mut();
|
let mut msg_send = self.outbound_chan.borrow_mut();
|
||||||
if let Err(e) = msg_send.deref_mut().as_mut().unwrap().send(data) {
|
if let Err(e) = msg_send.deref_mut().as_mut().unwrap().send(data) {
|
||||||
warn!("Couldn't send message to remote peer: {}", e);
|
warn!("Couldn't send message to remote peer: {}", e);
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Serializes and sends a block to our remote peer
|
|
||||||
fn send_block(&self, b: &core::Block) -> Result<(), ser::Error> {
|
|
||||||
unimplemented!();
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Serializes and sends a transaction to our remote peer
|
|
||||||
fn send_transaction(&self, tx: &core::Transaction) -> Result<(), ser::Error> {
|
|
||||||
unimplemented!();
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Close the connection to the remote peer
|
|
||||||
fn close(&self) {
|
|
||||||
// TODO some kind of shutdown signal
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,6 +23,7 @@ use std::time::Duration;
|
||||||
|
|
||||||
use futures;
|
use futures;
|
||||||
use futures::{Future, Stream};
|
use futures::{Future, Stream};
|
||||||
|
use futures::future::IntoFuture;
|
||||||
use tokio_core::net::{TcpListener, TcpStream};
|
use tokio_core::net::{TcpListener, TcpStream};
|
||||||
use tokio_core::reactor;
|
use tokio_core::reactor;
|
||||||
|
|
||||||
|
@ -74,24 +75,12 @@ impl Server {
|
||||||
let hp = h.clone();
|
let hp = h.clone();
|
||||||
let peers = socket.incoming().map_err(|e| Error::IOErr(e)).map(move |(conn, addr)| {
|
let peers = socket.incoming().map_err(|e| Error::IOErr(e)).map(move |(conn, addr)| {
|
||||||
let peers = peers.clone();
|
let peers = peers.clone();
|
||||||
|
|
||||||
// accept the peer and add it to the server map
|
// accept the peer and add it to the server map
|
||||||
let peer_accept = Peer::accept(conn, &hs.clone()).map(move |(conn, peer)| {
|
let peer_accept = add_to_peers(peers, Peer::accept(conn, &hs.clone()));
|
||||||
let apeer = Arc::new(peer);
|
|
||||||
let mut peers = peers.write().unwrap();
|
|
||||||
peers.push(apeer.clone());
|
|
||||||
Ok((conn, apeer))
|
|
||||||
});
|
|
||||||
|
|
||||||
// wire in a future to timeout the accept after 5 secs
|
// wire in a future to timeout the accept after 5 secs
|
||||||
let timeout = reactor::Timeout::new(Duration::new(5, 0), &hp).unwrap();
|
let timed_peer = with_timeout(Box::new(peer_accept), &hp);
|
||||||
let timed_peer = peer_accept.select(timeout.map(Err).map_err(|e| Error::IOErr(e)))
|
|
||||||
.then(|res| {
|
|
||||||
match res {
|
|
||||||
Ok((Ok((conn, p)), _timeout)) => Ok((conn, p)),
|
|
||||||
Ok((_, _accept)) => Err(Error::TooLargeReadErr),
|
|
||||||
Err((e, _other)) => Err(e),
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// run the main peer protocol
|
// run the main peer protocol
|
||||||
timed_peer.and_then(|(conn, peer)| peer.clone().run(conn, &DummyAdapter {}))
|
timed_peer.and_then(|(conn, peer)| peer.clone().run(conn, &DummyAdapter {}))
|
||||||
|
@ -124,6 +113,7 @@ impl Server {
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Asks the server to connect to a new peer.
|
||||||
pub fn connect_peer(&self,
|
pub fn connect_peer(&self,
|
||||||
addr: SocketAddr,
|
addr: SocketAddr,
|
||||||
h: reactor::Handle)
|
h: reactor::Handle)
|
||||||
|
@ -132,12 +122,11 @@ impl Server {
|
||||||
let peers = self.peers.clone();
|
let peers = self.peers.clone();
|
||||||
let request = socket.and_then(move |socket| {
|
let request = socket.and_then(move |socket| {
|
||||||
let peers = peers.clone();
|
let peers = peers.clone();
|
||||||
Peer::connect(socket, &Handshake::new()).map(move |(conn, peer)| {
|
|
||||||
let apeer = Arc::new(peer);
|
// connect to the peer and add it to the server map, wiring it a timeout for
|
||||||
let mut peers = peers.write().unwrap();
|
// the handhake
|
||||||
peers.push(apeer.clone());
|
let peer_connect = add_to_peers(peers, Peer::connect(socket, &Handshake::new()));
|
||||||
(conn, apeer)
|
with_timeout(Box::new(peer_connect), &h)
|
||||||
})
|
|
||||||
})
|
})
|
||||||
.and_then(|(socket, peer)| peer.run(socket, &DummyAdapter {}));
|
.and_then(|(socket, peer)| peer.run(socket, &DummyAdapter {}));
|
||||||
Box::new(request)
|
Box::new(request)
|
||||||
|
@ -152,3 +141,34 @@ impl Server {
|
||||||
self.stop.into_inner().unwrap().complete(());
|
self.stop.into_inner().unwrap().complete(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Adds the peer built by the provided future in the peers map
|
||||||
|
fn add_to_peers<A>(peers: Arc<RwLock<Vec<Arc<Peer>>>>,
|
||||||
|
peer_fut: A)
|
||||||
|
-> Box<Future<Item = Result<(TcpStream, Arc<Peer>), ()>, Error = Error>>
|
||||||
|
where A: IntoFuture<Item = (TcpStream, Peer), Error = Error> + 'static
|
||||||
|
{
|
||||||
|
let peer_add = peer_fut.into_future().map(move |(conn, peer)| {
|
||||||
|
let apeer = Arc::new(peer);
|
||||||
|
let mut peers = peers.write().unwrap();
|
||||||
|
peers.push(apeer.clone());
|
||||||
|
Ok((conn, apeer))
|
||||||
|
});
|
||||||
|
Box::new(peer_add)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Adds a timeout to a future
|
||||||
|
fn with_timeout<T: 'static>(fut: Box<Future<Item = Result<T, ()>, Error = Error>>,
|
||||||
|
h: &reactor::Handle)
|
||||||
|
-> Box<Future<Item = T, Error = Error>> {
|
||||||
|
let timeout = reactor::Timeout::new(Duration::new(5, 0), h).unwrap();
|
||||||
|
let timed = fut.select(timeout.map(Err).map_err(|e| Error::IOErr(e)))
|
||||||
|
.then(|res| {
|
||||||
|
match res {
|
||||||
|
Ok((Ok(inner), _timeout)) => Ok(inner),
|
||||||
|
Ok((_, _accept)) => Err(Error::TooLargeReadErr),
|
||||||
|
Err((e, _other)) => Err(e),
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Box::new(timed)
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue