From edc6c6257769a4fa1b2fe5453899244b94383c26 Mon Sep 17 00:00:00 2001 From: Ignotus Peverell Date: Tue, 1 Nov 2016 10:42:33 -0700 Subject: [PATCH] Fixed message serialization following changes in core. Sending of block and transaction. --- core/src/ser.rs | 6 +++++ p2p/src/msg.rs | 55 ++++++++++++++++++++++------------------ p2p/src/peer.rs | 15 +++++++++-- p2p/src/protocol.rs | 62 ++++++++++++++++++++++++++++++--------------- p2p/src/server.rs | 37 ++++++++++++++++++++++----- p2p/src/types.rs | 13 +++++++--- 6 files changed, 132 insertions(+), 56 deletions(-) diff --git a/core/src/ser.rs b/core/src/ser.rs index 54ed99255..fbf0a2406 100644 --- a/core/src/ser.rs +++ b/core/src/ser.rs @@ -303,6 +303,12 @@ impl<'a> AsFixedBytes for &'a [u8] { } } +impl<'a> AsFixedBytes for String { + fn as_fixed_bytes(&self) -> &[u8] { + self.as_bytes() + } +} + impl AsFixedBytes for ::core::hash::Hash { fn as_fixed_bytes(&self) -> &[u8] { self.to_slice() diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index 8921d7c0b..7b3116e45 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -37,16 +37,18 @@ pub enum ErrCodes { /// Types of messages enum_from_primitive! { -#[derive(Clone, Copy)] -pub enum Type { - Error, - Hand, - Shake, - Ping, - Pong, - GetPeerAddrs, - PeerAddrs, -} + #[derive(Clone, Copy)] + pub enum Type { + Error, + Hand, + Shake, + Ping, + Pong, + GetPeerAddrs, + PeerAddrs, + Block, + Transaction, + } } /// Header of any protocol message, used to identify incoming messages. @@ -67,8 +69,10 @@ impl MsgHeader { Type::from_u8(self.msg_type as u8).is_some() } - /// Serialized length of the header in bytes - pub fn serialized_len(&self) -> u64 { 3 } + /// Serialized length of the header in bytes + pub fn serialized_len(&self) -> u64 { + 3 + } } impl Writeable for MsgHeader { @@ -86,9 +90,14 @@ impl Readable for MsgHeader { try!(reader.expect_u8(MAGIC[0])); try!(reader.expect_u8(MAGIC[1])); let t = try!(reader.read_u8()); - match Type::from_u8(t) { - Some(ty) => Ok(MsgHeader {magic: MAGIC, msg_type: ty}), - None => Err(ser::Error::CorruptedData) + match Type::from_u8(t) { + Some(ty) => { + Ok(MsgHeader { + magic: MAGIC, + msg_type: ty, + }) + } + None => Err(ser::Error::CorruptedData), } } } @@ -118,7 +127,7 @@ impl Writeable for Hand { [write_u64, self.nonce]); self.sender_addr.write(writer); self.receiver_addr.write(writer); - writer.write_bytes(self.user_agent.as_bytes()) + writer.write_bytes(&self.user_agent) } } @@ -157,7 +166,7 @@ impl Writeable for Shake { ser_multiwrite!(writer, [write_u32, self.version], [write_u32, self.capabilities.bits()], - [write_bytes, self.user_agent.as_bytes()]); + [write_bytes, &self.user_agent]); Ok(()) } } @@ -182,7 +191,7 @@ pub struct GetPeerAddrs { } impl Writeable for GetPeerAddrs { - fn write(&self, writer: &mut Writer) -> Option { + fn write(&self, writer: &mut Writer) -> Result<(), ser::Error> { writer.write_u32(self.capabilities.bits()) } } @@ -202,12 +211,12 @@ pub struct PeerAddrs { } impl Writeable for PeerAddrs { - fn write(&self, writer: &mut Writer) -> Option { - try_o!(writer.write_u32(self.peers.len() as u32)); + fn write(&self, writer: &mut Writer) -> Result<(), ser::Error> { + try!(writer.write_u32(self.peers.len() as u32)); for p in &self.peers { p.write(writer); } - None + Ok(()) } } @@ -234,9 +243,7 @@ pub struct PeerError { impl Writeable for PeerError { fn write(&self, writer: &mut Writer) -> Result<(), ser::Error> { - ser_multiwrite!(writer, - [write_u32, self.code], - [write_bytes, self.message.as_bytes()]); + ser_multiwrite!(writer, [write_u32, self.code], [write_bytes, &self.message]); Ok(()) } } diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index bd9f97349..99c9b4b0d 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -14,6 +14,7 @@ use mioco::tcp::TcpStream; +use core::core; use core::ser::Error; use handshake::Handshake; use types::*; @@ -43,14 +44,24 @@ impl Peer { }) } - pub fn run(&self, na: &NetAdapter) -> Option { + pub fn run(&self, na: &NetAdapter) -> Result<(), Error> { self.proto.handle(na) } - pub fn send_ping(&self) -> Option { + pub fn send_ping(&self) -> Result<(), Error> { self.proto.send_ping() } + pub fn send_block(&self, b: &core::Block) -> Result<(), Error> { + // TODO don't send if we already got the block from peer + self.proto.send_block(b) + } + + pub fn send_transaction(&self, tx: &core::Transaction) -> Result<(), Error> { + // TODO don't relay if we already got the tx from peer + self.proto.send_transaction(tx) + } + pub fn transmitted_bytes(&self) -> (u64, u64) { self.proto.transmitted_bytes() } diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index 41651c433..fb6d803f9 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -13,17 +13,16 @@ // limitations under the License. use std::cell::RefCell; -use std::io::{Read, Write}; +use std::io::Write; use std::ops::{Deref, DerefMut}; -use std::rc::Rc; use std::sync::Mutex; use mioco; use mioco::sync::mpsc::{sync_channel, SyncSender}; use mioco::tcp::{TcpStream, Shutdown}; +use core::core; use core::ser; -use handshake::Handshake; use msg::*; use types::*; @@ -39,12 +38,12 @@ pub struct ProtocolV1 { // Used both to count the amount of data sent and lock writing to the conn. We can't wrap conn with // the lock as we're always listening to receive. sent_bytes: Mutex, - // Bytes we've received. + // Bytes we've received. received_bytes: Mutex, } impl ProtocolV1 { - /// Creates a new protocol v1 + /// Creates a new protocol v1 pub fn new(conn: TcpStream) -> ProtocolV1 { ProtocolV1 { conn: RefCell::new(conn), @@ -62,7 +61,7 @@ impl Protocol for ProtocolV1 { /// to send. Must be called before any interaction with a protocol instance /// and should only be called once. Will block so also needs to be called /// within a coroutine. - fn handle(&self, server: &NetAdapter) -> Option { + fn handle(&self, server: &NetAdapter) -> Result<(), ser::Error> { // setup channels so we can switch between reads, writes and close let (msg_send, msg_recv) = sync_channel(10); let (stop_send, stop_recv) = sync_channel(1); @@ -75,23 +74,23 @@ impl Protocol for ProtocolV1 { let mut conn = self.conn.borrow_mut(); loop { - // main select loop, switches between listening, sending or stopping + // main select loop, switches between listening, sending or stopping select!( r:conn => { - // deser the header ot get the message type - let header = try_to_o!(ser::deserialize::(conn.deref_mut())); + // deser the header ot get the message type + let header = try!(ser::deserialize::(conn.deref_mut())); if !header.acceptable() { continue; } let recv = header.serialized_len(); - // check the message and hopefully do what's expected + // check the message and hopefully do what's expected match header.msg_type { Type::Ping => { - // respond with pong - let data = try_to_o!(ser::ser_vec(&MsgHeader::new(Type::Pong))); + // respond with pong + let data = try!(ser::ser_vec(&MsgHeader::new(Type::Pong))); let mut sent_bytes = self.sent_bytes.lock().unwrap(); *sent_bytes += data.len() as u64; - try_to_o!(conn.deref_mut().write_all(&data[..]).map_err(&ser::Error::IOErr)); + try!(conn.deref_mut().write_all(&data[..]).map_err(&ser::Error::IOErr)); }, Type::Pong => {}, _ => error!("uncaught unknown"), @@ -100,17 +99,17 @@ impl Protocol for ProtocolV1 { *received += recv; }, r:msg_recv => { - // relay a message originated from the rest of the local system + // relay a message originated from the rest of the local system let data = &msg_recv.recv().unwrap()[..]; let mut sent_bytes = self.sent_bytes.lock().unwrap(); *sent_bytes += data.len() as u64; - try_to_o!(conn.deref_mut().write_all(data).map_err(&ser::Error::IOErr)); + try!(conn.deref_mut().write_all(data).map_err(&ser::Error::IOErr)); }, r:stop_recv => { - // shuts the connection don and end the loop + // shuts the connection don and end the loop stop_recv.recv(); conn.shutdown(Shutdown::Both); - return None; + return Ok(()); } ); } @@ -118,17 +117,25 @@ impl Protocol for ProtocolV1 { /// Sends a ping message to the remote peer. Will panic if handle has never /// been called on this protocol. - fn send_ping(&self) -> Option { - let data = try_to_o!(ser::ser_vec(&MsgHeader::new(Type::Ping))); + fn send_ping(&self) -> Result<(), ser::Error> { + let data = try!(ser::ser_vec(&MsgHeader::new(Type::Ping))); let msg_send = self.msg_send.borrow(); msg_send.as_ref().unwrap().send(data); - None + Ok(()) + } + + fn send_block(&self, b: &core::Block) -> Result<(), ser::Error> { + self.send_msg(Type::Block, b) + } + + fn send_transaction(&self, tx: &core::Transaction) -> Result<(), ser::Error> { + self.send_msg(Type::Transaction, tx) } fn transmitted_bytes(&self) -> (u64, u64) { let sent = *self.sent_bytes.lock().unwrap().deref(); let received = *self.received_bytes.lock().unwrap().deref(); - (sent, received) + (sent, received) } fn close(&self) { @@ -136,3 +143,16 @@ impl Protocol for ProtocolV1 { stop_send.as_ref().unwrap().send(0); } } + +impl ProtocolV1 { + /// Helper function to avoid boilerplate, builds a header followed by the + /// Writeable body and send the whole thing. + fn send_msg(&self, t: Type, body: &ser::Writeable) -> Result<(), ser::Error> { + let mut data = Vec::new(); + try!(ser::serialize(&mut data, &MsgHeader::new(t))); + try!(ser::serialize(&mut data, body)); + let msg_send = self.msg_send.borrow(); + msg_send.as_ref().unwrap().send(data); + Ok(()) + } +} diff --git a/p2p/src/server.rs b/p2p/src/server.rs index 3bd93286e..5b32ff469 100644 --- a/p2p/src/server.rs +++ b/p2p/src/server.rs @@ -26,11 +26,14 @@ use mioco; use mioco::sync::mpsc::{sync_channel, SyncSender}; use mioco::tcp::{TcpListener, TcpStream}; +use core::core; use core::ser::Error; use handshake::Handshake; use peer::Peer; use types::*; +/// Default address for peer-to-peer connections, placeholder until better +/// configuration is in place. pub const DEFAULT_LISTEN_ADDR: &'static str = "127.0.0.1:3414"; // replace with some config lookup or something @@ -53,7 +56,7 @@ unsafe impl Send for Server {} // TODO TLS impl Server { - /// Creates a new idle p2p server with no peers + /// Creates a new idle p2p server with no peers pub fn new() -> Server { Server { peers: RwLock::new(Vec::new()), @@ -89,7 +92,7 @@ impl Server { } mioco::spawn(move || -> io::Result<()> { - if let Some(err) = wpeer.run(&DummyAdapter{}) { + if let Err(err) = wpeer.run(&DummyAdapter{}) { error!("{:?}", err); } Ok(()) @@ -103,7 +106,29 @@ impl Server { } } - /// Stops the server. Disconnect from all peers at the same time. + /// Asks all the peers to relay the provided block. A peer may choose to + /// ignore the relay request if it has knowledge that the remote peer + /// already knows the block. + pub fn relay_block(&self, b: &core::Block) -> Result<(), Error> { + let peers = self.peers.write().unwrap(); + for p in peers.deref() { + try!(p.send_block(b)); + } + Ok(()) + } + + /// Asks all the peers to relay the provided transaction. A peer may choose + /// to ignore the relay request if it has knowledge that the remote peer + /// already knows the transaction. + pub fn relay_transaction(&self, tx: &core::Transaction) -> Result<(), Error> { + let peers = self.peers.write().unwrap(); + for p in peers.deref() { + try!(p.send_transaction(tx)); + } + Ok(()) + } + + /// Stops the server. Disconnect from all peers at the same time. pub fn stop(&self) { let peers = self.peers.write().unwrap(); for p in peers.deref() { @@ -120,7 +145,7 @@ impl Server { Peer::accept(tcp_client, &Handshake::new()) } - pub fn peers_count(&self) -> u32 { - self.peers.read().unwrap().len() as u32 - } + pub fn peers_count(&self) -> u32 { + self.peers.read().unwrap().len() as u32 + } } diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 0ba8522b1..893260ce5 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::net::SocketAddr; +use core::core; use core::ser::Error; bitflags! { @@ -42,13 +43,19 @@ pub trait Protocol { /// be known already, usually passed during construction. Will typically /// block so needs to be called withing a coroutine. Should also be called /// only once. - fn handle(&self, na: &NetAdapter) -> Option; + fn handle(&self, na: &NetAdapter) -> Result<(), Error>; /// Sends a ping message to the remote peer. - fn send_ping(&self) -> Option; + fn send_ping(&self) -> Result<(), Error>; + + /// Relays a block to the remote peer. + fn send_block(&self, b: &core::Block) -> Result<(), Error>; + + /// Relays a transaction to the remote peer. + fn send_transaction(&self, tx: &core::Transaction) -> Result<(), Error>; /// How many bytes have been sent/received to/from the remote peer. - fn transmitted_bytes(&self) -> (u64, u64); + fn transmitted_bytes(&self) -> (u64, u64); /// Close the connection to the remote peer. fn close(&self);