Read block and transaction messages and forward to the adapter.

This commit is contained in:
Ignotus Peverell 2016-12-15 14:57:04 -08:00
parent d395b3d128
commit bc61c5dae7
No known key found for this signature in database
GPG key ID: 99CD25F39F8F8211
5 changed files with 51 additions and 26 deletions

View file

@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use futures::Future;
use tokio_core::net::TcpStream;
@ -55,7 +57,7 @@ impl Peer {
Box::new(hs_peer)
}
pub fn run(&self, conn: TcpStream, na: &NetAdapter) -> Box<Future<Item = (), Error = Error>> {
pub fn run(&self, conn: TcpStream, na: Arc<NetAdapter>) -> Box<Future<Item = (), Error = Error>> {
self.proto.handle(conn, na)
}

View file

@ -21,7 +21,7 @@ use futures;
use futures::{Stream, Future};
use futures::stream;
use futures::sync::mpsc::{UnboundedSender, UnboundedReceiver};
use tokio_core::io::{Io, WriteHalf, ReadHalf, write_all, read_exact, read_to_end};
use tokio_core::io::{Io, WriteHalf, ReadHalf, write_all, read_exact};
use tokio_core::net::TcpStream;
use core::core;
@ -56,7 +56,7 @@ impl ProtocolV1 {
impl Protocol for ProtocolV1 {
fn handle(&self,
conn: TcpStream,
adapter: &NetAdapter)
adapter: Arc<NetAdapter>)
-> Box<Future<Item = (), Error = ser::Error>> {
let (reader, writer) = conn.split();
@ -68,7 +68,7 @@ impl Protocol for ProtocolV1 {
}
// setup the reading future, getting messages from the peer and processing them
let read_msg = self.read_msg(tx, reader).map(|_| ());
let read_msg = self.read_msg(tx, reader, adapter).map(|_| ());
// setting the writing future, getting messages from our system and sending
// them out
@ -111,8 +111,9 @@ impl ProtocolV1 {
/// Prepares the future reading from the peer connection, parsing each
/// message and forwarding them appropriately based on their type
fn read_msg(&self,
tx: UnboundedSender<Vec<u8>>,
reader: ReadHalf<TcpStream>)
sender: UnboundedSender<Vec<u8>>,
reader: ReadHalf<TcpStream>,
adapter: Arc<NetAdapter>)
-> Box<Future<Item = ReadHalf<TcpStream>, Error = ser::Error>> {
// infinite iterator stream so we repeat the message reading logic until the
@ -122,8 +123,9 @@ impl ProtocolV1 {
// setup the reading future, getting messages from the peer and processing them
let recv_bytes = self.received_bytes.clone();
let read_msg = iter.fold(reader, move |reader, _| {
let mut tx_inner = tx.clone();
let mut sender_inner = sender.clone();
let recv_bytes = recv_bytes.clone();
let adapter = adapter.clone();
// first read the message header
read_exact(reader, vec![0u8; HEADER_LEN as usize])
@ -132,24 +134,20 @@ impl ProtocolV1 {
let header = try!(ser::deserialize::<MsgHeader>(&mut &buf[..]));
Ok((reader, header))
})
.map(move |(reader, header)| {
// add the count of bytes sent
.and_then(move |(reader, header)| {
// now that we have a size, proceed with the body
read_exact(reader, vec![0u8; header.msg_len as usize]).map(|(reader, buf)| { (reader, header, buf) }).map_err(|e| ser::Error::IOErr(e))
})
.map(move |(reader, header, buf)| {
// add the count of bytes received
let mut recv_bytes = recv_bytes.lock().unwrap();
*recv_bytes += header.serialized_len() + header.msg_len;
// and handle the different message types
match header.msg_type {
Type::Ping => {
let data = ser::ser_vec(&MsgHeader::new(Type::Pong, 0)).unwrap();
if let Err(e) = tx_inner.send(data) {
warn!("Couldn't send pong to remote peer: {}", e);
}
}
Type::Pong => {}
_ => {
error!("unknown message type {:?}", header.msg_type);
}
};
if let Err(e) = handle_payload(adapter, &header, buf, &mut sender_inner) {
debug!("Invalid {:?} message: {}", header.msg_type, e);
}
reader
})
});
@ -193,3 +191,25 @@ impl ProtocolV1 {
Ok(())
}
}
fn handle_payload(adapter: Arc<NetAdapter>, header: &MsgHeader, buf: Vec<u8>, sender: &mut UnboundedSender<Vec<u8>>) -> Result<(), ser::Error> {
match header.msg_type {
Type::Ping => {
let data = try!(ser::ser_vec(&MsgHeader::new(Type::Pong, 0)));
sender.send(data);
}
Type::Pong => {}
Type::Transaction => {
let tx = try!(ser::deserialize::<core::Transaction>(&mut &buf[..]));
adapter.transaction_received(tx);
}
Type::Block => {
let b = try!(ser::deserialize::<core::Block>(&mut &buf[..]));
adapter.block_received(b);
}
_ => {
debug!("unknown message type {:?}", header.msg_type);
}
};
Ok(())
}

View file

@ -33,6 +33,7 @@ use handshake::Handshake;
use peer::Peer;
use types::*;
/// A no-op network adapter used for testing.
pub struct DummyAdapter {}
impl NetAdapter for DummyAdapter {
fn transaction_received(&self, tx: core::Transaction) {}
@ -83,7 +84,7 @@ impl Server {
let timed_peer = with_timeout(Box::new(peer_accept), &hp);
// run the main peer protocol
timed_peer.and_then(|(conn, peer)| peer.clone().run(conn, &DummyAdapter {}))
timed_peer.and_then(|(conn, peer)| peer.clone().run(conn, Arc::new(DummyAdapter {})))
});
// spawn each peer future to its own task
@ -128,7 +129,7 @@ impl Server {
let peer_connect = add_to_peers(peers, Peer::connect(socket, &Handshake::new()));
with_timeout(Box::new(peer_connect), &h)
})
.and_then(|(socket, peer)| peer.run(socket, &DummyAdapter {}));
.and_then(|(socket, peer)| peer.run(socket, Arc::new(DummyAdapter {})));
Box::new(request)
}

View file

@ -13,6 +13,7 @@
// limitations under the License.
use std::net::{SocketAddr, IpAddr};
use std::sync::Arc;
use futures::Future;
use tokio_core::net::TcpStream;
@ -65,7 +66,7 @@ pub trait Protocol {
/// be known already, usually passed during construction. Will typically
/// block so needs to be called withing a coroutine. Should also be called
/// only once.
fn handle(&self, conn: TcpStream, na: &NetAdapter) -> Box<Future<Item = (), Error = Error>>;
fn handle(&self, conn: TcpStream, na: Arc<NetAdapter>) -> Box<Future<Item = (), Error = Error>>;
/// Sends a ping message to the remote peer.
fn send_ping(&self) -> Result<(), Error>;
@ -86,7 +87,7 @@ pub trait Protocol {
/// Bridge between the networking layer and the rest of the system. Handles the
/// forwarding or querying of blocks and transactions among other things.
pub trait NetAdapter {
/// A transaction has been received from one of our peers
/// A valid transaction has been received from one of our peers
fn transaction_received(&self, tx: core::Transaction);
/// A block has been received from one of our peers

View file

@ -19,6 +19,7 @@ extern crate futures;
extern crate tokio_core;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time;
use futures::future::Future;
@ -50,7 +51,7 @@ fn peer_handshake() {
socket.and_then(move |socket| {
Peer::connect(socket, &p2p::handshake::Handshake::new())
}).and_then(move |(socket, peer)| {
rhandle.spawn(peer.run(socket, &p2p::DummyAdapter {}).map_err(|e| {
rhandle.spawn(peer.run(socket, Arc::new(p2p::DummyAdapter {})).map_err(|e| {
panic!("Client run failed: {}", e);
}));
peer.send_ping().unwrap();