From 7c72ccec7cc4b1bfece069977b22f5be6d04e3a3 Mon Sep 17 00:00:00 2001 From: Ignotus Peverell Date: Sun, 26 Feb 2017 20:08:40 -0800 Subject: [PATCH] Cleaned up error handling in p2p module. Setting peer status on disconnect. --- grin/src/lib.rs | 1 + grin/src/seed.rs | 10 ++++----- grin/src/sync.rs | 9 ++++---- grin/src/types.rs | 43 +++++++++++++++++++++++++++++++++++++ p2p/src/conn.rs | 39 +++++++++++++++++---------------- p2p/src/handshake.rs | 14 ++++++------ p2p/src/lib.rs | 2 +- p2p/src/msg.rs | 12 +++++------ p2p/src/peer.rs | 35 +++++++++++++++++++++++++----- p2p/src/protocol.rs | 18 ++++++++-------- p2p/src/server.rs | 34 ++++++++++++----------------- p2p/src/types.rs | 29 ++++++++++++++++++++++++- p2p/tests/peer_handshake.rs | 10 ++++----- 13 files changed, 175 insertions(+), 81 deletions(-) create mode 100644 grin/src/types.rs diff --git a/grin/src/lib.rs b/grin/src/lib.rs index 7b197001d..13487115b 100644 --- a/grin/src/lib.rs +++ b/grin/src/lib.rs @@ -45,5 +45,6 @@ mod miner; mod server; mod seed; mod sync; +mod types; pub use server::{Server, ServerConfig, Seeding}; diff --git a/grin/src/seed.rs b/grin/src/seed.rs index 77c5a56d0..f6ed4fe92 100644 --- a/grin/src/seed.rs +++ b/grin/src/seed.rs @@ -54,14 +54,14 @@ impl Seeder { } pub fn connect_and_monitor(&self, - h: reactor::Handle, - seed_list: Box, Error = String>>) { + h: reactor::Handle, + seed_list: Box, Error = String>>) { // open a channel with a listener that connects every peer address sent below // max peer count let (tx, rx) = futures::sync::mpsc::unbounded(); h.spawn(self.listen_for_addrs(h.clone(), rx)); - // check seeds and start monitoring connections + // check seeds and start monitoring connections let seeder = self.connect_to_seeds(tx.clone(), seed_list) .join(self.monitor_peers(tx.clone())); @@ -97,8 +97,8 @@ impl Seeder { Box::new(mon_loop) } - // Check if we have any pre-existing peer in db. If so, start with those, - // otherwise use the seeds provided. + // Check if we have any pre-existing peer in db. If so, start with those, + // otherwise use the seeds provided. fn connect_to_seeds(&self, tx: mpsc::UnboundedSender, seed_list: Box, Error = String>>) diff --git a/grin/src/sync.rs b/grin/src/sync.rs index 6efa49a70..92324a51d 100644 --- a/grin/src/sync.rs +++ b/grin/src/sync.rs @@ -28,6 +28,7 @@ use std::time::{Instant, Duration}; use core::core::hash::{Hash, Hashed}; use chain; use p2p; +use types::Error; pub struct Syncer { chain_store: Arc, @@ -57,7 +58,7 @@ impl Syncer { /// Checks the local chain state, comparing it with our peers and triggers /// syncing if required. - pub fn run(&self) -> Result<(), chain::Error> { + pub fn run(&self) -> Result<(), Error> { debug!("Starting syncer."); let start = Instant::now(); loop { @@ -114,7 +115,7 @@ impl Syncer { /// Checks the gap between the header chain and the full block chain and /// initializes the blocks_to_download structure with the missing full /// blocks - fn init_download(&self) -> Result<(), chain::Error> { + fn init_download(&self) -> Result<(), Error> { // compare the header's head to the full one to see what we're missing let header_head = self.chain_store.get_header_head()?; let full_head = self.chain_store.head()?; @@ -167,7 +168,7 @@ impl Syncer { } /// Request some block headers from a peer to advance us - fn request_headers(&self) -> Result<(), chain::Error> { + fn request_headers(&self) -> Result<(), Error> { { let mut last_header_req = self.last_header_req.lock().unwrap(); *last_header_req = Instant::now(); @@ -201,7 +202,7 @@ impl Syncer { /// Builds a vector of block hashes that should help the remote peer sending /// us the right block headers. - fn get_locator(&self, tip: &chain::Tip) -> Result, chain::Error> { + fn get_locator(&self, tip: &chain::Tip) -> Result, Error> { // Prepare the heights we want as the latests height minus increasing powers // of 2 up to max. let mut heights = vec![tip.height]; diff --git a/grin/src/types.rs b/grin/src/types.rs new file mode 100644 index 000000000..3a93d97c3 --- /dev/null +++ b/grin/src/types.rs @@ -0,0 +1,43 @@ +// Copyright 2016 The Grin Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::convert::From; + +use chain; +use p2p; +use store; + +pub enum Error { + Store(store::Error), + Chain(chain::Error), + P2P(p2p::Error), +} + +impl From for Error { + fn from(e: chain::Error) -> Error { + Error::Chain(e) + } +} + +impl From for Error { + fn from(e: p2p::Error) -> Error { + Error::P2P(e) + } +} + +impl From for Error { + fn from(e: store::Error) -> Error { + Error::Store(e) + } +} diff --git a/p2p/src/conn.rs b/p2p/src/conn.rs index a7ff9f3d5..cdecda818 100644 --- a/p2p/src/conn.rs +++ b/p2p/src/conn.rs @@ -32,6 +32,7 @@ use tokio_timer::{Timer, TimerError}; use core::core::hash::{Hash, ZERO_HASH}; use core::ser; use msg::*; +use types::Error; /// Handler to provide to the connection, will be called back anytime a message /// is received. The provided sender can be use to immediately send back @@ -86,7 +87,7 @@ impl Connection { /// itself. pub fn listen(conn: TcpStream, handler: F) - -> (Connection, Box>) + -> (Connection, Box>) where F: Handler + 'static { @@ -97,7 +98,7 @@ impl Connection { // same for closing the connection let (close_tx, close_rx) = futures::sync::mpsc::channel(1); - let close_conn = close_rx.for_each(|_| Ok(())).map_err(|_| ser::Error::CorruptedData); + let close_conn = close_rx.for_each(|_| Ok(())).map_err(|_| Error::ConnectionClose); let me = Connection { outbound_chan: tx.clone(), @@ -128,10 +129,12 @@ impl Connection { fn write_msg(&self, rx: UnboundedReceiver>, writer: WriteHalf) - -> Box, Error = ser::Error>> { + -> Box, Error = Error>> { let sent_bytes = self.sent_bytes.clone(); - let send_data = rx.map(move |data| { + let send_data = rx + .map_err(|_| Error::ConnectionClose) + .map(move |data| { // add the count of bytes sent let mut sent_bytes = sent_bytes.lock().unwrap(); *sent_bytes += data.len() as u64; @@ -139,8 +142,7 @@ impl Connection { }) // write the data and make sure the future returns the right types .fold(writer, - |writer, data| write_all(writer, data).map_err(|_| ()).map(|(writer, buf)| writer)) - .map_err(|_| ser::Error::CorruptedData); + |writer, data| write_all(writer, data).map_err(|e| Error::Connection(e)).map(|(writer, buf)| writer)); Box::new(send_data) } @@ -150,13 +152,13 @@ impl Connection { sender: UnboundedSender>, reader: ReadHalf, handler: F) - -> Box, Error = ser::Error>> + -> Box, Error = Error>> where F: Handler + 'static { // infinite iterator stream so we repeat the message reading logic until the // peer is stopped - let iter = stream::iter(iter::repeat(()).map(Ok::<(), ser::Error>)); + let iter = stream::iter(iter::repeat(()).map(Ok::<(), Error>)); // setup the reading future, getting messages from the peer and processing them let recv_bytes = self.received_bytes.clone(); @@ -169,7 +171,7 @@ impl Connection { // first read the message header read_exact(reader, vec![0u8; HEADER_LEN as usize]) - .map_err(|e| ser::Error::IOErr(e)) + .from_err() .and_then(move |(reader, buf)| { let header = try!(ser::deserialize::(&mut &buf[..])); Ok((reader, header)) @@ -178,9 +180,9 @@ impl Connection { // now that we have a size, proceed with the body read_exact(reader, vec![0u8; header.msg_len as usize]) .map(|(reader, buf)| (reader, header, buf)) - .map_err(|e| ser::Error::IOErr(e)) + .from_err() }) - .map(move |(reader, header, buf)| { + .and_then(move |(reader, header, buf)| { // add the count of bytes received let mut recv_bytes = recv_bytes.lock().unwrap(); *recv_bytes += header.serialized_len() + header.msg_len; @@ -189,9 +191,10 @@ impl Connection { let msg_type = header.msg_type; if let Err(e) = handler.handle(sender_inner.clone(), header, buf) { debug!("Invalid {:?} message: {}", msg_type, e); + return Err(Error::Serialization(e)); } - reader + Ok(reader) }) }); Box::new(read_msg) @@ -199,7 +202,7 @@ impl Connection { /// Utility function to send any Writeable. Handles adding the header and /// serialization. - pub fn send_msg(&self, t: Type, body: &ser::Writeable) -> Result<(), ser::Error> { + pub fn send_msg(&self, t: Type, body: &ser::Writeable) -> Result<(), Error> { let mut body_data = vec![]; try!(ser::serialize(&mut body_data, body)); @@ -207,7 +210,7 @@ impl Connection { try!(ser::serialize(&mut data, &MsgHeader::new(t, body_data.len() as u64))); data.append(&mut body_data); - self.outbound_chan.send(data).map_err(|_| ser::Error::CorruptedData) + self.outbound_chan.send(data).map_err(|_| Error::ConnectionClose) } /// Bytes sent and received by this peer to the remote peer. @@ -230,7 +233,7 @@ impl TimeoutConnection { /// Same as Connection pub fn listen(conn: TcpStream, handler: F) - -> (TimeoutConnection, Box>) + -> (TimeoutConnection, Box>) where F: Handler + 'static { @@ -268,7 +271,7 @@ impl TimeoutConnection { } Ok(()) }) - .map_err(|_| ser::Error::CorruptedData); + .from_err(); let me = TimeoutConnection { underlying: conn, @@ -284,7 +287,7 @@ impl TimeoutConnection { rt: Type, body: &ser::Writeable, expect_h: Option<(Hash)>) - -> Result<(), ser::Error> { + -> Result<(), Error> { let sent = try!(self.underlying.send_msg(t, body)); let mut expects = self.expected_responses.lock().unwrap(); @@ -293,7 +296,7 @@ impl TimeoutConnection { } /// Same as Connection - pub fn send_msg(&self, t: Type, body: &ser::Writeable) -> Result<(), ser::Error> { + pub fn send_msg(&self, t: Type, body: &ser::Writeable) -> Result<(), Error> { self.underlying.send_msg(t, body) } diff --git a/p2p/src/handshake.rs b/p2p/src/handshake.rs index 03a1c7f97..dc4ee8cff 100644 --- a/p2p/src/handshake.rs +++ b/p2p/src/handshake.rs @@ -21,8 +21,8 @@ use rand::Rng; use rand::os::OsRng; use tokio_core::net::TcpStream; -use core::ser::Error; use core::core::target::Difficulty; +use core::ser; use msg::*; use types::*; use protocol::ProtocolV1; @@ -70,10 +70,10 @@ impl Handshake { .and_then(|conn| read_msg::(conn)) .and_then(|(conn, shake)| { if shake.version != 1 { - Err(Error::UnexpectedData { + Err(Error::Serialization(ser::Error::UnexpectedData { expected: vec![PROTOCOL_VERSION as u8], received: vec![shake.version as u8], - }) + })) } else { let peer_info = PeerInfo { capabilities: shake.capabilities, @@ -101,19 +101,19 @@ impl Handshake { Box::new(read_msg::(conn) .and_then(move |(conn, hand)| { if hand.version != 1 { - return Err(Error::UnexpectedData { + return Err(Error::Serialization(ser::Error::UnexpectedData { expected: vec![PROTOCOL_VERSION as u8], received: vec![hand.version as u8], - }); + })); } { // check the nonce to see if we could be trying to connect to ourselves let nonces = nonces.read().unwrap(); if nonces.contains(&hand.nonce) { - return Err(Error::UnexpectedData { + return Err(Error::Serialization(ser::Error::UnexpectedData { expected: vec![], received: vec![], - }); + })); } } // all good, keep peer info diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index b2ead4cf4..ddcde0d77 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -50,5 +50,5 @@ mod types; pub use server::{Server, DummyAdapter}; pub use peer::Peer; pub use types::{P2PConfig, NetAdapter, MAX_LOCATORS, MAX_BLOCK_HEADERS, MAX_PEER_ADDRS, - Capabilities, UNKNOWN, FULL_NODE, FULL_HIST, PeerInfo}; + Capabilities, UNKNOWN, FULL_NODE, FULL_HIST, PeerInfo, Error}; pub use store::{PeerStore, PeerData, State}; diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index da54a8bcf..d9859a075 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -68,23 +68,23 @@ enum_from_primitive! { /// Future combinator to read any message where the body is a Readable. Reads /// the header first, handles its validation and then reads the Readable body, /// allocating buffers of the right size. -pub fn read_msg(conn: TcpStream) -> Box> +pub fn read_msg(conn: TcpStream) -> Box> where T: Readable + 'static { let read_header = read_exact(conn, vec![0u8; HEADER_LEN as usize]) - .map_err(|e| ser::Error::IOErr(e)) + .from_err() .and_then(|(reader, buf)| { let header = try!(ser::deserialize::(&mut &buf[..])); if header.msg_len > MAX_MSG_LEN { // TODO add additional restrictions on a per-message-type basis to avoid 20MB // pings - return Err(ser::Error::TooLargeReadErr); + return Err(Error::Serialization(ser::Error::TooLargeReadErr)); } Ok((reader, header)) }); let read_msg = read_header.and_then(|(reader, header)| { - read_exact(reader, vec![0u8; header.msg_len as usize]).map_err(|e| ser::Error::IOErr(e)) + read_exact(reader, vec![0u8; header.msg_len as usize]).from_err() }) .and_then(|(reader, buf)| { let body = try!(ser::deserialize(&mut &buf[..])); @@ -99,7 +99,7 @@ pub fn read_msg(conn: TcpStream) -> Box(conn: TcpStream, msg: T, msg_type: Type) - -> Box> + -> Box> where T: Writeable + 'static { let write_msg = ok((conn)).and_then(move |conn| { @@ -116,7 +116,7 @@ pub fn write_msg(conn: TcpStream, write_all(conn, header_buf) .and_then(|(conn, _)| write_all(conn, body_buf)) .map(|(conn, _)| conn) - .map_err(|e| ser::Error::IOErr(e)) + .from_err() }); Box::new(write_msg) } diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index e3a59de62..11d3c9eea 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::net::SocketAddr; -use std::sync::Arc; +use std::sync::{RwLock, Arc}; use futures::Future; use tokio_core::net::TcpStream; @@ -21,13 +21,19 @@ use tokio_core::net::TcpStream; use core::core; use core::core::hash::Hash; use core::core::target::Difficulty; -use core::ser::Error; use handshake::Handshake; use types::*; +enum State { + Connected, + Disconnected, + Banned, +} + pub struct Peer { pub info: PeerInfo, proto: Box, + state: Arc>, } unsafe impl Sync for Peer {} @@ -47,6 +53,7 @@ impl Peer { Peer { info: info, proto: Box::new(proto), + state: Arc::new(RwLock::new(State::Connected)), })) }); Box::new(connect_peer) @@ -64,6 +71,7 @@ impl Peer { Peer { info: info, proto: Box::new(proto), + state: Arc::new(RwLock::new(State::Connected)), })) }); Box::new(hs_peer) @@ -77,9 +85,26 @@ impl Peer { -> Box> { let addr = self.info.addr; - Box::new(self.proto.handle(conn, na).and_then(move |_| { - info!("Client {} disconnected.", addr); - Ok(()) + let state = self.state.clone(); + Box::new(self.proto.handle(conn, na).then(move |res| { + let mut state = state.write().unwrap(); + match res { + Ok(res) => { + *state = State::Disconnected; + info!("Client {} disconnected.", addr); + Ok(()) + } + Err(Error::Serialization(e)) => { + *state = State::Banned; + info!("Client {} corrupted, ban.", addr); + Err(Error::Serialization(e)) + } + Err(_) => { + *state = State::Disconnected; + info!("Client {} connection lost.", addr); + Ok(()) + } + } })) } diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index 28ff3d49f..c6e9cc72d 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -48,7 +48,7 @@ impl Protocol for ProtocolV1 { fn handle(&self, conn: TcpStream, adapter: Arc) - -> Box> { + -> Box> { let (conn, listener) = TimeoutConnection::listen(conn, move |sender, header, data| { let adapt = adapter.as_ref(); @@ -67,32 +67,32 @@ impl Protocol for ProtocolV1 { /// Sends a ping message to the remote peer. Will panic if handle has never /// been called on this protocol. - fn send_ping(&self) -> Result<(), ser::Error> { + fn send_ping(&self) -> Result<(), Error> { self.send_request(Type::Ping, Type::Pong, &Empty {}, None) } /// Serializes and sends a block to our remote peer - fn send_block(&self, b: &core::Block) -> Result<(), ser::Error> { + fn send_block(&self, b: &core::Block) -> Result<(), Error> { self.send_msg(Type::Block, b) } /// Serializes and sends a transaction to our remote peer - fn send_transaction(&self, tx: &core::Transaction) -> Result<(), ser::Error> { + fn send_transaction(&self, tx: &core::Transaction) -> Result<(), Error> { self.send_msg(Type::Transaction, tx) } - fn send_header_request(&self, locator: Vec) -> Result<(), ser::Error> { + fn send_header_request(&self, locator: Vec) -> Result<(), Error> { self.send_request(Type::GetHeaders, Type::Headers, &Locator { hashes: locator }, None) } - fn send_block_request(&self, h: Hash) -> Result<(), ser::Error> { + fn send_block_request(&self, h: Hash) -> Result<(), Error> { self.send_request(Type::GetBlock, Type::Block, &h, Some(h)) } - fn send_peer_request(&self, capab: Capabilities) -> Result<(), ser::Error> { + fn send_peer_request(&self, capab: Capabilities) -> Result<(), Error> { self.send_request(Type::GetPeerAddrs, Type::PeerAddrs, &GetPeerAddrs { capabilities: capab }, @@ -106,7 +106,7 @@ impl Protocol for ProtocolV1 { } impl ProtocolV1 { - fn send_msg(&self, t: Type, body: &ser::Writeable) -> Result<(), ser::Error> { + fn send_msg(&self, t: Type, body: &ser::Writeable) -> Result<(), Error> { self.conn.borrow().send_msg(t, body) } @@ -115,7 +115,7 @@ impl ProtocolV1 { rt: Type, body: &ser::Writeable, expect_resp: Option) - -> Result<(), ser::Error> { + -> Result<(), Error> { self.conn.borrow().send_request(t, rt, body, expect_resp) } } diff --git a/p2p/src/server.rs b/p2p/src/server.rs index b8f43e421..f4d610a67 100644 --- a/p2p/src/server.rs +++ b/p2p/src/server.rs @@ -31,7 +31,6 @@ use tokio_core::reactor; use core::core; use core::core::hash::Hash; use core::core::target::Difficulty; -use core::ser::Error; use handshake::Handshake; use peer::Peer; use types::*; @@ -98,18 +97,17 @@ impl Server { // main peer acceptance future handling handshake let hp = h.clone(); - let peers = socket.incoming().map_err(|e| Error::IOErr(e)).map(move |(conn, addr)| { + let peers = socket.incoming().map_err(From::from).map(move |(conn, addr)| { let adapter = adapter.clone(); let total_diff = adapter.total_difficulty(); let peers = peers.clone(); // accept the peer and add it to the server map - let peer_accept = add_to_peers(peers, - adapter.clone(), - Peer::accept(conn, capab, total_diff, &hs.clone())); + let accept = Peer::accept(conn, capab, total_diff, &hs.clone()); + let added = add_to_peers(peers, adapter.clone(), accept); // wire in a future to timeout the accept after 5 secs - let timed_peer = with_timeout(Box::new(peer_accept), &hp); + let timed_peer = with_timeout(Box::new(added), &hp); // run the main peer protocol timed_peer.and_then(move |(conn, peer)| peer.clone().run(conn, adapter)) @@ -120,7 +118,7 @@ impl Server { let server = peers.for_each(move |peer| { hs.spawn(peer.then(|res| { match res { - Err(e) => info!("Client error: {}", e), + Err(e) => info!("Client error: {:?}", e), _ => {} } futures::finished(()) @@ -134,7 +132,7 @@ impl Server { let mut stop_mut = self.stop.borrow_mut(); *stop_mut = Some(stop); } - Box::new(server.select(stop_rx.map_err(|_| Error::CorruptedData)).then(|res| { + Box::new(server.select(stop_rx.map_err(|_| Error::ConnectionClose)).then(|res| { match res { Ok((_, _)) => Ok(()), Err((e, _)) => Err(e), @@ -165,7 +163,7 @@ impl Server { debug!("{} connecting to {}", self_addr, addr); - let socket = TcpStream::connect(&addr, &h).map_err(|e| Error::IOErr(e)); + let socket = TcpStream::connect(&addr, &h).map_err(|e| Error::Connection(e)); let h2 = h.clone(); let request = socket.and_then(move |socket| { let peers = peers.clone(); @@ -173,14 +171,10 @@ impl Server { // connect to the peer and add it to the server map, wiring it a timeout for // the handhake - let peer_connect = add_to_peers(peers, - adapter1, - Peer::connect(socket, - capab, - total_diff, - self_addr, - &Handshake::new())); - with_timeout(Box::new(peer_connect), &h) + let connect = + Peer::connect(socket, capab, total_diff, self_addr, &Handshake::new()); + let added = add_to_peers(peers, adapter1, connect); + with_timeout(Box::new(added), &h) }) .and_then(move |(socket, peer)| { h2.spawn(peer.run(socket, adapter2).map_err(|e| { @@ -226,7 +220,7 @@ impl Server { let peers = self.peers.write().unwrap(); for p in peers.deref() { if let Err(e) = p.send_block(b) { - debug!("Error sending block to peer: {}", e); + debug!("Error sending block to peer: {:?}", e); } } } @@ -268,11 +262,11 @@ fn with_timeout(fut: Box, Error = Error> h: &reactor::Handle) -> Box> { let timeout = reactor::Timeout::new(Duration::new(5, 0), h).unwrap(); - let timed = fut.select(timeout.map(Err).map_err(|e| Error::IOErr(e))) + let timed = fut.select(timeout.map(Err).from_err()) .then(|res| { match res { Ok((Ok(inner), _timeout)) => Ok(inner), - Ok((_, _accept)) => Err(Error::TooLargeReadErr), + Ok((_, _accept)) => Err(Error::Timeout), Err((e, _other)) => Err(e), } }); diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 7e420eed1..5513adc71 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -12,16 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::convert::From; +use std::io; use std::net::{SocketAddr, IpAddr}; use std::sync::Arc; use futures::Future; use tokio_core::net::TcpStream; +use tokio_timer::TimerError; use core::core; use core::core::hash::Hash; use core::core::target::Difficulty; -use core::ser::Error; +use core::ser; /// Maximum number of hashes in a block header locator request pub const MAX_LOCATORS: u32 = 10; @@ -35,6 +38,30 @@ pub const MAX_BLOCK_BODIES: u32 = 16; /// Maximum number of peer addresses a peer should ever send pub const MAX_PEER_ADDRS: u32 = 256; +#[derive(Debug)] +pub enum Error { + Serialization(ser::Error), + Connection(io::Error), + ConnectionClose, + Timeout, +} + +impl From for Error { + fn from(e: ser::Error) -> Error { + Error::Serialization(e) + } +} +impl From for Error { + fn from(e: io::Error) -> Error { + Error::Connection(e) + } +} +impl From for Error { + fn from(e: TimerError) -> Error { + Error::Timeout + } +} + /// Configuration for the peer-to-peer server. #[derive(Debug, Clone, Copy)] pub struct P2PConfig { diff --git a/p2p/tests/peer_handshake.rs b/p2p/tests/peer_handshake.rs index 6c85566f4..28cad305f 100644 --- a/p2p/tests/peer_handshake.rs +++ b/p2p/tests/peer_handshake.rs @@ -47,18 +47,18 @@ fn peer_handshake() { let rhandle = handle.clone(); let timeout = reactor::Timeout::new(time::Duration::new(1, 0), &handle).unwrap(); let timeout_send = reactor::Timeout::new(time::Duration::new(2, 0), &handle).unwrap(); - handle.spawn(timeout.map_err(|e| ser::Error::IOErr(e)).and_then(move |_| { + handle.spawn(timeout.from_err().and_then(move |_| { let p2p_conf = p2p::P2PConfig::default(); let addr = SocketAddr::new(p2p_conf.host, p2p_conf.port); - let socket = TcpStream::connect(&addr, &phandle).map_err(|e| ser::Error::IOErr(e)); + let socket = TcpStream::connect(&addr, &phandle).map_err(|e| p2p::Error::Connection(e)); socket.and_then(move |socket| { Peer::connect(socket, p2p::UNKNOWN, Difficulty::one(), my_addr, &p2p::handshake::Handshake::new()) }).and_then(move |(socket, peer)| { rhandle.spawn(peer.run(socket, net_adapter.clone()).map_err(|e| { - panic!("Client run failed: {}", e); + panic!("Client run failed: {:?}", e); })); peer.send_ping().unwrap(); - timeout_send.map_err(|e| ser::Error::IOErr(e)).map(|_| peer) + timeout_send.from_err().map(|_| peer) }).and_then(|peer| { let (sent, recv) = peer.transmitted_bytes(); assert!(sent > 0); @@ -70,7 +70,7 @@ fn peer_handshake() { Ok(()) }) }).map_err(|e| { - panic!("Client connection failed: {}", e); + panic!("Client connection failed: {:?}", e); })); evtlp.run(run_server).unwrap();