Cleaned up error handling in p2p module. Setting peer status on disconnect.

This commit is contained in:
Ignotus Peverell 2017-02-26 20:08:40 -08:00
parent 78b223b4f0
commit 7c72ccec7c
No known key found for this signature in database
GPG key ID: 99CD25F39F8F8211
13 changed files with 175 additions and 81 deletions

View file

@ -45,5 +45,6 @@ mod miner;
mod server; mod server;
mod seed; mod seed;
mod sync; mod sync;
mod types;
pub use server::{Server, ServerConfig, Seeding}; pub use server::{Server, ServerConfig, Seeding};

View file

@ -54,14 +54,14 @@ impl Seeder {
} }
pub fn connect_and_monitor(&self, pub fn connect_and_monitor(&self,
h: reactor::Handle, h: reactor::Handle,
seed_list: Box<Future<Item = Vec<SocketAddr>, Error = String>>) { seed_list: Box<Future<Item = Vec<SocketAddr>, Error = String>>) {
// open a channel with a listener that connects every peer address sent below // open a channel with a listener that connects every peer address sent below
// max peer count // max peer count
let (tx, rx) = futures::sync::mpsc::unbounded(); let (tx, rx) = futures::sync::mpsc::unbounded();
h.spawn(self.listen_for_addrs(h.clone(), rx)); h.spawn(self.listen_for_addrs(h.clone(), rx));
// check seeds and start monitoring connections // check seeds and start monitoring connections
let seeder = self.connect_to_seeds(tx.clone(), seed_list) let seeder = self.connect_to_seeds(tx.clone(), seed_list)
.join(self.monitor_peers(tx.clone())); .join(self.monitor_peers(tx.clone()));
@ -97,8 +97,8 @@ impl Seeder {
Box::new(mon_loop) Box::new(mon_loop)
} }
// Check if we have any pre-existing peer in db. If so, start with those, // Check if we have any pre-existing peer in db. If so, start with those,
// otherwise use the seeds provided. // otherwise use the seeds provided.
fn connect_to_seeds(&self, fn connect_to_seeds(&self,
tx: mpsc::UnboundedSender<SocketAddr>, tx: mpsc::UnboundedSender<SocketAddr>,
seed_list: Box<Future<Item = Vec<SocketAddr>, Error = String>>) seed_list: Box<Future<Item = Vec<SocketAddr>, Error = String>>)

View file

@ -28,6 +28,7 @@ use std::time::{Instant, Duration};
use core::core::hash::{Hash, Hashed}; use core::core::hash::{Hash, Hashed};
use chain; use chain;
use p2p; use p2p;
use types::Error;
pub struct Syncer { pub struct Syncer {
chain_store: Arc<chain::ChainStore>, chain_store: Arc<chain::ChainStore>,
@ -57,7 +58,7 @@ impl Syncer {
/// Checks the local chain state, comparing it with our peers and triggers /// Checks the local chain state, comparing it with our peers and triggers
/// syncing if required. /// syncing if required.
pub fn run(&self) -> Result<(), chain::Error> { pub fn run(&self) -> Result<(), Error> {
debug!("Starting syncer."); debug!("Starting syncer.");
let start = Instant::now(); let start = Instant::now();
loop { loop {
@ -114,7 +115,7 @@ impl Syncer {
/// Checks the gap between the header chain and the full block chain and /// Checks the gap between the header chain and the full block chain and
/// initializes the blocks_to_download structure with the missing full /// initializes the blocks_to_download structure with the missing full
/// blocks /// blocks
fn init_download(&self) -> Result<(), chain::Error> { fn init_download(&self) -> Result<(), Error> {
// compare the header's head to the full one to see what we're missing // compare the header's head to the full one to see what we're missing
let header_head = self.chain_store.get_header_head()?; let header_head = self.chain_store.get_header_head()?;
let full_head = self.chain_store.head()?; let full_head = self.chain_store.head()?;
@ -167,7 +168,7 @@ impl Syncer {
} }
/// Request some block headers from a peer to advance us /// Request some block headers from a peer to advance us
fn request_headers(&self) -> Result<(), chain::Error> { fn request_headers(&self) -> Result<(), Error> {
{ {
let mut last_header_req = self.last_header_req.lock().unwrap(); let mut last_header_req = self.last_header_req.lock().unwrap();
*last_header_req = Instant::now(); *last_header_req = Instant::now();
@ -201,7 +202,7 @@ impl Syncer {
/// Builds a vector of block hashes that should help the remote peer sending /// Builds a vector of block hashes that should help the remote peer sending
/// us the right block headers. /// us the right block headers.
fn get_locator(&self, tip: &chain::Tip) -> Result<Vec<Hash>, chain::Error> { fn get_locator(&self, tip: &chain::Tip) -> Result<Vec<Hash>, Error> {
// Prepare the heights we want as the latests height minus increasing powers // Prepare the heights we want as the latests height minus increasing powers
// of 2 up to max. // of 2 up to max.
let mut heights = vec![tip.height]; let mut heights = vec![tip.height];

43
grin/src/types.rs Normal file
View file

@ -0,0 +1,43 @@
// Copyright 2016 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::convert::From;
use chain;
use p2p;
use store;
pub enum Error {
Store(store::Error),
Chain(chain::Error),
P2P(p2p::Error),
}
impl From<chain::Error> for Error {
fn from(e: chain::Error) -> Error {
Error::Chain(e)
}
}
impl From<p2p::Error> for Error {
fn from(e: p2p::Error) -> Error {
Error::P2P(e)
}
}
impl From<store::Error> for Error {
fn from(e: store::Error) -> Error {
Error::Store(e)
}
}

View file

@ -32,6 +32,7 @@ use tokio_timer::{Timer, TimerError};
use core::core::hash::{Hash, ZERO_HASH}; use core::core::hash::{Hash, ZERO_HASH};
use core::ser; use core::ser;
use msg::*; use msg::*;
use types::Error;
/// Handler to provide to the connection, will be called back anytime a message /// Handler to provide to the connection, will be called back anytime a message
/// is received. The provided sender can be use to immediately send back /// is received. The provided sender can be use to immediately send back
@ -86,7 +87,7 @@ impl Connection {
/// itself. /// itself.
pub fn listen<F>(conn: TcpStream, pub fn listen<F>(conn: TcpStream,
handler: F) handler: F)
-> (Connection, Box<Future<Item = (), Error = ser::Error>>) -> (Connection, Box<Future<Item = (), Error = Error>>)
where F: Handler + 'static where F: Handler + 'static
{ {
@ -97,7 +98,7 @@ impl Connection {
// same for closing the connection // same for closing the connection
let (close_tx, close_rx) = futures::sync::mpsc::channel(1); let (close_tx, close_rx) = futures::sync::mpsc::channel(1);
let close_conn = close_rx.for_each(|_| Ok(())).map_err(|_| ser::Error::CorruptedData); let close_conn = close_rx.for_each(|_| Ok(())).map_err(|_| Error::ConnectionClose);
let me = Connection { let me = Connection {
outbound_chan: tx.clone(), outbound_chan: tx.clone(),
@ -128,10 +129,12 @@ impl Connection {
fn write_msg(&self, fn write_msg(&self,
rx: UnboundedReceiver<Vec<u8>>, rx: UnboundedReceiver<Vec<u8>>,
writer: WriteHalf<TcpStream>) writer: WriteHalf<TcpStream>)
-> Box<Future<Item = WriteHalf<TcpStream>, Error = ser::Error>> { -> Box<Future<Item = WriteHalf<TcpStream>, Error = Error>> {
let sent_bytes = self.sent_bytes.clone(); let sent_bytes = self.sent_bytes.clone();
let send_data = rx.map(move |data| { let send_data = rx
.map_err(|_| Error::ConnectionClose)
.map(move |data| {
// add the count of bytes sent // add the count of bytes sent
let mut sent_bytes = sent_bytes.lock().unwrap(); let mut sent_bytes = sent_bytes.lock().unwrap();
*sent_bytes += data.len() as u64; *sent_bytes += data.len() as u64;
@ -139,8 +142,7 @@ impl Connection {
}) })
// write the data and make sure the future returns the right types // write the data and make sure the future returns the right types
.fold(writer, .fold(writer,
|writer, data| write_all(writer, data).map_err(|_| ()).map(|(writer, buf)| writer)) |writer, data| write_all(writer, data).map_err(|e| Error::Connection(e)).map(|(writer, buf)| writer));
.map_err(|_| ser::Error::CorruptedData);
Box::new(send_data) Box::new(send_data)
} }
@ -150,13 +152,13 @@ impl Connection {
sender: UnboundedSender<Vec<u8>>, sender: UnboundedSender<Vec<u8>>,
reader: ReadHalf<TcpStream>, reader: ReadHalf<TcpStream>,
handler: F) handler: F)
-> Box<Future<Item = ReadHalf<TcpStream>, Error = ser::Error>> -> Box<Future<Item = ReadHalf<TcpStream>, Error = Error>>
where F: Handler + 'static where F: Handler + 'static
{ {
// infinite iterator stream so we repeat the message reading logic until the // infinite iterator stream so we repeat the message reading logic until the
// peer is stopped // peer is stopped
let iter = stream::iter(iter::repeat(()).map(Ok::<(), ser::Error>)); let iter = stream::iter(iter::repeat(()).map(Ok::<(), Error>));
// setup the reading future, getting messages from the peer and processing them // setup the reading future, getting messages from the peer and processing them
let recv_bytes = self.received_bytes.clone(); let recv_bytes = self.received_bytes.clone();
@ -169,7 +171,7 @@ impl Connection {
// first read the message header // first read the message header
read_exact(reader, vec![0u8; HEADER_LEN as usize]) read_exact(reader, vec![0u8; HEADER_LEN as usize])
.map_err(|e| ser::Error::IOErr(e)) .from_err()
.and_then(move |(reader, buf)| { .and_then(move |(reader, buf)| {
let header = try!(ser::deserialize::<MsgHeader>(&mut &buf[..])); let header = try!(ser::deserialize::<MsgHeader>(&mut &buf[..]));
Ok((reader, header)) Ok((reader, header))
@ -178,9 +180,9 @@ impl Connection {
// now that we have a size, proceed with the body // now that we have a size, proceed with the body
read_exact(reader, vec![0u8; header.msg_len as usize]) read_exact(reader, vec![0u8; header.msg_len as usize])
.map(|(reader, buf)| (reader, header, buf)) .map(|(reader, buf)| (reader, header, buf))
.map_err(|e| ser::Error::IOErr(e)) .from_err()
}) })
.map(move |(reader, header, buf)| { .and_then(move |(reader, header, buf)| {
// add the count of bytes received // add the count of bytes received
let mut recv_bytes = recv_bytes.lock().unwrap(); let mut recv_bytes = recv_bytes.lock().unwrap();
*recv_bytes += header.serialized_len() + header.msg_len; *recv_bytes += header.serialized_len() + header.msg_len;
@ -189,9 +191,10 @@ impl Connection {
let msg_type = header.msg_type; let msg_type = header.msg_type;
if let Err(e) = handler.handle(sender_inner.clone(), header, buf) { if let Err(e) = handler.handle(sender_inner.clone(), header, buf) {
debug!("Invalid {:?} message: {}", msg_type, e); debug!("Invalid {:?} message: {}", msg_type, e);
return Err(Error::Serialization(e));
} }
reader Ok(reader)
}) })
}); });
Box::new(read_msg) Box::new(read_msg)
@ -199,7 +202,7 @@ impl Connection {
/// Utility function to send any Writeable. Handles adding the header and /// Utility function to send any Writeable. Handles adding the header and
/// serialization. /// serialization.
pub fn send_msg(&self, t: Type, body: &ser::Writeable) -> Result<(), ser::Error> { pub fn send_msg(&self, t: Type, body: &ser::Writeable) -> Result<(), Error> {
let mut body_data = vec![]; let mut body_data = vec![];
try!(ser::serialize(&mut body_data, body)); try!(ser::serialize(&mut body_data, body));
@ -207,7 +210,7 @@ impl Connection {
try!(ser::serialize(&mut data, &MsgHeader::new(t, body_data.len() as u64))); try!(ser::serialize(&mut data, &MsgHeader::new(t, body_data.len() as u64)));
data.append(&mut body_data); data.append(&mut body_data);
self.outbound_chan.send(data).map_err(|_| ser::Error::CorruptedData) self.outbound_chan.send(data).map_err(|_| Error::ConnectionClose)
} }
/// Bytes sent and received by this peer to the remote peer. /// Bytes sent and received by this peer to the remote peer.
@ -230,7 +233,7 @@ impl TimeoutConnection {
/// Same as Connection /// Same as Connection
pub fn listen<F>(conn: TcpStream, pub fn listen<F>(conn: TcpStream,
handler: F) handler: F)
-> (TimeoutConnection, Box<Future<Item = (), Error = ser::Error>>) -> (TimeoutConnection, Box<Future<Item = (), Error = Error>>)
where F: Handler + 'static where F: Handler + 'static
{ {
@ -268,7 +271,7 @@ impl TimeoutConnection {
} }
Ok(()) Ok(())
}) })
.map_err(|_| ser::Error::CorruptedData); .from_err();
let me = TimeoutConnection { let me = TimeoutConnection {
underlying: conn, underlying: conn,
@ -284,7 +287,7 @@ impl TimeoutConnection {
rt: Type, rt: Type,
body: &ser::Writeable, body: &ser::Writeable,
expect_h: Option<(Hash)>) expect_h: Option<(Hash)>)
-> Result<(), ser::Error> { -> Result<(), Error> {
let sent = try!(self.underlying.send_msg(t, body)); let sent = try!(self.underlying.send_msg(t, body));
let mut expects = self.expected_responses.lock().unwrap(); let mut expects = self.expected_responses.lock().unwrap();
@ -293,7 +296,7 @@ impl TimeoutConnection {
} }
/// Same as Connection /// Same as Connection
pub fn send_msg(&self, t: Type, body: &ser::Writeable) -> Result<(), ser::Error> { pub fn send_msg(&self, t: Type, body: &ser::Writeable) -> Result<(), Error> {
self.underlying.send_msg(t, body) self.underlying.send_msg(t, body)
} }

View file

@ -21,8 +21,8 @@ use rand::Rng;
use rand::os::OsRng; use rand::os::OsRng;
use tokio_core::net::TcpStream; use tokio_core::net::TcpStream;
use core::ser::Error;
use core::core::target::Difficulty; use core::core::target::Difficulty;
use core::ser;
use msg::*; use msg::*;
use types::*; use types::*;
use protocol::ProtocolV1; use protocol::ProtocolV1;
@ -70,10 +70,10 @@ impl Handshake {
.and_then(|conn| read_msg::<Shake>(conn)) .and_then(|conn| read_msg::<Shake>(conn))
.and_then(|(conn, shake)| { .and_then(|(conn, shake)| {
if shake.version != 1 { if shake.version != 1 {
Err(Error::UnexpectedData { Err(Error::Serialization(ser::Error::UnexpectedData {
expected: vec![PROTOCOL_VERSION as u8], expected: vec![PROTOCOL_VERSION as u8],
received: vec![shake.version as u8], received: vec![shake.version as u8],
}) }))
} else { } else {
let peer_info = PeerInfo { let peer_info = PeerInfo {
capabilities: shake.capabilities, capabilities: shake.capabilities,
@ -101,19 +101,19 @@ impl Handshake {
Box::new(read_msg::<Hand>(conn) Box::new(read_msg::<Hand>(conn)
.and_then(move |(conn, hand)| { .and_then(move |(conn, hand)| {
if hand.version != 1 { if hand.version != 1 {
return Err(Error::UnexpectedData { return Err(Error::Serialization(ser::Error::UnexpectedData {
expected: vec![PROTOCOL_VERSION as u8], expected: vec![PROTOCOL_VERSION as u8],
received: vec![hand.version as u8], received: vec![hand.version as u8],
}); }));
} }
{ {
// check the nonce to see if we could be trying to connect to ourselves // check the nonce to see if we could be trying to connect to ourselves
let nonces = nonces.read().unwrap(); let nonces = nonces.read().unwrap();
if nonces.contains(&hand.nonce) { if nonces.contains(&hand.nonce) {
return Err(Error::UnexpectedData { return Err(Error::Serialization(ser::Error::UnexpectedData {
expected: vec![], expected: vec![],
received: vec![], received: vec![],
}); }));
} }
} }
// all good, keep peer info // all good, keep peer info

View file

@ -50,5 +50,5 @@ mod types;
pub use server::{Server, DummyAdapter}; pub use server::{Server, DummyAdapter};
pub use peer::Peer; pub use peer::Peer;
pub use types::{P2PConfig, NetAdapter, MAX_LOCATORS, MAX_BLOCK_HEADERS, MAX_PEER_ADDRS, pub use types::{P2PConfig, NetAdapter, MAX_LOCATORS, MAX_BLOCK_HEADERS, MAX_PEER_ADDRS,
Capabilities, UNKNOWN, FULL_NODE, FULL_HIST, PeerInfo}; Capabilities, UNKNOWN, FULL_NODE, FULL_HIST, PeerInfo, Error};
pub use store::{PeerStore, PeerData, State}; pub use store::{PeerStore, PeerData, State};

View file

@ -68,23 +68,23 @@ enum_from_primitive! {
/// Future combinator to read any message where the body is a Readable. Reads /// Future combinator to read any message where the body is a Readable. Reads
/// the header first, handles its validation and then reads the Readable body, /// the header first, handles its validation and then reads the Readable body,
/// allocating buffers of the right size. /// allocating buffers of the right size.
pub fn read_msg<T>(conn: TcpStream) -> Box<Future<Item = (TcpStream, T), Error = ser::Error>> pub fn read_msg<T>(conn: TcpStream) -> Box<Future<Item = (TcpStream, T), Error = Error>>
where T: Readable<T> + 'static where T: Readable<T> + 'static
{ {
let read_header = read_exact(conn, vec![0u8; HEADER_LEN as usize]) let read_header = read_exact(conn, vec![0u8; HEADER_LEN as usize])
.map_err(|e| ser::Error::IOErr(e)) .from_err()
.and_then(|(reader, buf)| { .and_then(|(reader, buf)| {
let header = try!(ser::deserialize::<MsgHeader>(&mut &buf[..])); let header = try!(ser::deserialize::<MsgHeader>(&mut &buf[..]));
if header.msg_len > MAX_MSG_LEN { if header.msg_len > MAX_MSG_LEN {
// TODO add additional restrictions on a per-message-type basis to avoid 20MB // TODO add additional restrictions on a per-message-type basis to avoid 20MB
// pings // pings
return Err(ser::Error::TooLargeReadErr); return Err(Error::Serialization(ser::Error::TooLargeReadErr));
} }
Ok((reader, header)) Ok((reader, header))
}); });
let read_msg = read_header.and_then(|(reader, header)| { let read_msg = read_header.and_then(|(reader, header)| {
read_exact(reader, vec![0u8; header.msg_len as usize]).map_err(|e| ser::Error::IOErr(e)) read_exact(reader, vec![0u8; header.msg_len as usize]).from_err()
}) })
.and_then(|(reader, buf)| { .and_then(|(reader, buf)| {
let body = try!(ser::deserialize(&mut &buf[..])); let body = try!(ser::deserialize(&mut &buf[..]));
@ -99,7 +99,7 @@ pub fn read_msg<T>(conn: TcpStream) -> Box<Future<Item = (TcpStream, T), Error =
pub fn write_msg<T>(conn: TcpStream, pub fn write_msg<T>(conn: TcpStream,
msg: T, msg: T,
msg_type: Type) msg_type: Type)
-> Box<Future<Item = TcpStream, Error = ser::Error>> -> Box<Future<Item = TcpStream, Error = Error>>
where T: Writeable + 'static where T: Writeable + 'static
{ {
let write_msg = ok((conn)).and_then(move |conn| { let write_msg = ok((conn)).and_then(move |conn| {
@ -116,7 +116,7 @@ pub fn write_msg<T>(conn: TcpStream,
write_all(conn, header_buf) write_all(conn, header_buf)
.and_then(|(conn, _)| write_all(conn, body_buf)) .and_then(|(conn, _)| write_all(conn, body_buf))
.map(|(conn, _)| conn) .map(|(conn, _)| conn)
.map_err(|e| ser::Error::IOErr(e)) .from_err()
}); });
Box::new(write_msg) Box::new(write_msg)
} }

View file

@ -13,7 +13,7 @@
// limitations under the License. // limitations under the License.
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::{RwLock, Arc};
use futures::Future; use futures::Future;
use tokio_core::net::TcpStream; use tokio_core::net::TcpStream;
@ -21,13 +21,19 @@ use tokio_core::net::TcpStream;
use core::core; use core::core;
use core::core::hash::Hash; use core::core::hash::Hash;
use core::core::target::Difficulty; use core::core::target::Difficulty;
use core::ser::Error;
use handshake::Handshake; use handshake::Handshake;
use types::*; use types::*;
enum State {
Connected,
Disconnected,
Banned,
}
pub struct Peer { pub struct Peer {
pub info: PeerInfo, pub info: PeerInfo,
proto: Box<Protocol>, proto: Box<Protocol>,
state: Arc<RwLock<State>>,
} }
unsafe impl Sync for Peer {} unsafe impl Sync for Peer {}
@ -47,6 +53,7 @@ impl Peer {
Peer { Peer {
info: info, info: info,
proto: Box::new(proto), proto: Box::new(proto),
state: Arc::new(RwLock::new(State::Connected)),
})) }))
}); });
Box::new(connect_peer) Box::new(connect_peer)
@ -64,6 +71,7 @@ impl Peer {
Peer { Peer {
info: info, info: info,
proto: Box::new(proto), proto: Box::new(proto),
state: Arc::new(RwLock::new(State::Connected)),
})) }))
}); });
Box::new(hs_peer) Box::new(hs_peer)
@ -77,9 +85,26 @@ impl Peer {
-> Box<Future<Item = (), Error = Error>> { -> Box<Future<Item = (), Error = Error>> {
let addr = self.info.addr; let addr = self.info.addr;
Box::new(self.proto.handle(conn, na).and_then(move |_| { let state = self.state.clone();
info!("Client {} disconnected.", addr); Box::new(self.proto.handle(conn, na).then(move |res| {
Ok(()) let mut state = state.write().unwrap();
match res {
Ok(res) => {
*state = State::Disconnected;
info!("Client {} disconnected.", addr);
Ok(())
}
Err(Error::Serialization(e)) => {
*state = State::Banned;
info!("Client {} corrupted, ban.", addr);
Err(Error::Serialization(e))
}
Err(_) => {
*state = State::Disconnected;
info!("Client {} connection lost.", addr);
Ok(())
}
}
})) }))
} }

View file

@ -48,7 +48,7 @@ impl Protocol for ProtocolV1 {
fn handle(&self, fn handle(&self,
conn: TcpStream, conn: TcpStream,
adapter: Arc<NetAdapter>) adapter: Arc<NetAdapter>)
-> Box<Future<Item = (), Error = ser::Error>> { -> Box<Future<Item = (), Error = Error>> {
let (conn, listener) = TimeoutConnection::listen(conn, move |sender, header, data| { let (conn, listener) = TimeoutConnection::listen(conn, move |sender, header, data| {
let adapt = adapter.as_ref(); let adapt = adapter.as_ref();
@ -67,32 +67,32 @@ impl Protocol for ProtocolV1 {
/// Sends a ping message to the remote peer. Will panic if handle has never /// Sends a ping message to the remote peer. Will panic if handle has never
/// been called on this protocol. /// been called on this protocol.
fn send_ping(&self) -> Result<(), ser::Error> { fn send_ping(&self) -> Result<(), Error> {
self.send_request(Type::Ping, Type::Pong, &Empty {}, None) self.send_request(Type::Ping, Type::Pong, &Empty {}, None)
} }
/// Serializes and sends a block to our remote peer /// Serializes and sends a block to our remote peer
fn send_block(&self, b: &core::Block) -> Result<(), ser::Error> { fn send_block(&self, b: &core::Block) -> Result<(), Error> {
self.send_msg(Type::Block, b) self.send_msg(Type::Block, b)
} }
/// Serializes and sends a transaction to our remote peer /// Serializes and sends a transaction to our remote peer
fn send_transaction(&self, tx: &core::Transaction) -> Result<(), ser::Error> { fn send_transaction(&self, tx: &core::Transaction) -> Result<(), Error> {
self.send_msg(Type::Transaction, tx) self.send_msg(Type::Transaction, tx)
} }
fn send_header_request(&self, locator: Vec<Hash>) -> Result<(), ser::Error> { fn send_header_request(&self, locator: Vec<Hash>) -> Result<(), Error> {
self.send_request(Type::GetHeaders, self.send_request(Type::GetHeaders,
Type::Headers, Type::Headers,
&Locator { hashes: locator }, &Locator { hashes: locator },
None) None)
} }
fn send_block_request(&self, h: Hash) -> Result<(), ser::Error> { fn send_block_request(&self, h: Hash) -> Result<(), Error> {
self.send_request(Type::GetBlock, Type::Block, &h, Some(h)) self.send_request(Type::GetBlock, Type::Block, &h, Some(h))
} }
fn send_peer_request(&self, capab: Capabilities) -> Result<(), ser::Error> { fn send_peer_request(&self, capab: Capabilities) -> Result<(), Error> {
self.send_request(Type::GetPeerAddrs, self.send_request(Type::GetPeerAddrs,
Type::PeerAddrs, Type::PeerAddrs,
&GetPeerAddrs { capabilities: capab }, &GetPeerAddrs { capabilities: capab },
@ -106,7 +106,7 @@ impl Protocol for ProtocolV1 {
} }
impl ProtocolV1 { impl ProtocolV1 {
fn send_msg(&self, t: Type, body: &ser::Writeable) -> Result<(), ser::Error> { fn send_msg(&self, t: Type, body: &ser::Writeable) -> Result<(), Error> {
self.conn.borrow().send_msg(t, body) self.conn.borrow().send_msg(t, body)
} }
@ -115,7 +115,7 @@ impl ProtocolV1 {
rt: Type, rt: Type,
body: &ser::Writeable, body: &ser::Writeable,
expect_resp: Option<Hash>) expect_resp: Option<Hash>)
-> Result<(), ser::Error> { -> Result<(), Error> {
self.conn.borrow().send_request(t, rt, body, expect_resp) self.conn.borrow().send_request(t, rt, body, expect_resp)
} }
} }

View file

@ -31,7 +31,6 @@ use tokio_core::reactor;
use core::core; use core::core;
use core::core::hash::Hash; use core::core::hash::Hash;
use core::core::target::Difficulty; use core::core::target::Difficulty;
use core::ser::Error;
use handshake::Handshake; use handshake::Handshake;
use peer::Peer; use peer::Peer;
use types::*; use types::*;
@ -98,18 +97,17 @@ impl Server {
// main peer acceptance future handling handshake // main peer acceptance future handling handshake
let hp = h.clone(); let hp = h.clone();
let peers = socket.incoming().map_err(|e| Error::IOErr(e)).map(move |(conn, addr)| { let peers = socket.incoming().map_err(From::from).map(move |(conn, addr)| {
let adapter = adapter.clone(); let adapter = adapter.clone();
let total_diff = adapter.total_difficulty(); let total_diff = adapter.total_difficulty();
let peers = peers.clone(); let peers = peers.clone();
// accept the peer and add it to the server map // accept the peer and add it to the server map
let peer_accept = add_to_peers(peers, let accept = Peer::accept(conn, capab, total_diff, &hs.clone());
adapter.clone(), let added = add_to_peers(peers, adapter.clone(), accept);
Peer::accept(conn, capab, total_diff, &hs.clone()));
// wire in a future to timeout the accept after 5 secs // wire in a future to timeout the accept after 5 secs
let timed_peer = with_timeout(Box::new(peer_accept), &hp); let timed_peer = with_timeout(Box::new(added), &hp);
// run the main peer protocol // run the main peer protocol
timed_peer.and_then(move |(conn, peer)| peer.clone().run(conn, adapter)) timed_peer.and_then(move |(conn, peer)| peer.clone().run(conn, adapter))
@ -120,7 +118,7 @@ impl Server {
let server = peers.for_each(move |peer| { let server = peers.for_each(move |peer| {
hs.spawn(peer.then(|res| { hs.spawn(peer.then(|res| {
match res { match res {
Err(e) => info!("Client error: {}", e), Err(e) => info!("Client error: {:?}", e),
_ => {} _ => {}
} }
futures::finished(()) futures::finished(())
@ -134,7 +132,7 @@ impl Server {
let mut stop_mut = self.stop.borrow_mut(); let mut stop_mut = self.stop.borrow_mut();
*stop_mut = Some(stop); *stop_mut = Some(stop);
} }
Box::new(server.select(stop_rx.map_err(|_| Error::CorruptedData)).then(|res| { Box::new(server.select(stop_rx.map_err(|_| Error::ConnectionClose)).then(|res| {
match res { match res {
Ok((_, _)) => Ok(()), Ok((_, _)) => Ok(()),
Err((e, _)) => Err(e), Err((e, _)) => Err(e),
@ -165,7 +163,7 @@ impl Server {
debug!("{} connecting to {}", self_addr, addr); debug!("{} connecting to {}", self_addr, addr);
let socket = TcpStream::connect(&addr, &h).map_err(|e| Error::IOErr(e)); let socket = TcpStream::connect(&addr, &h).map_err(|e| Error::Connection(e));
let h2 = h.clone(); let h2 = h.clone();
let request = socket.and_then(move |socket| { let request = socket.and_then(move |socket| {
let peers = peers.clone(); let peers = peers.clone();
@ -173,14 +171,10 @@ impl Server {
// connect to the peer and add it to the server map, wiring it a timeout for // connect to the peer and add it to the server map, wiring it a timeout for
// the handhake // the handhake
let peer_connect = add_to_peers(peers, let connect =
adapter1, Peer::connect(socket, capab, total_diff, self_addr, &Handshake::new());
Peer::connect(socket, let added = add_to_peers(peers, adapter1, connect);
capab, with_timeout(Box::new(added), &h)
total_diff,
self_addr,
&Handshake::new()));
with_timeout(Box::new(peer_connect), &h)
}) })
.and_then(move |(socket, peer)| { .and_then(move |(socket, peer)| {
h2.spawn(peer.run(socket, adapter2).map_err(|e| { h2.spawn(peer.run(socket, adapter2).map_err(|e| {
@ -226,7 +220,7 @@ impl Server {
let peers = self.peers.write().unwrap(); let peers = self.peers.write().unwrap();
for p in peers.deref() { for p in peers.deref() {
if let Err(e) = p.send_block(b) { if let Err(e) = p.send_block(b) {
debug!("Error sending block to peer: {}", e); debug!("Error sending block to peer: {:?}", e);
} }
} }
} }
@ -268,11 +262,11 @@ fn with_timeout<T: 'static>(fut: Box<Future<Item = Result<T, ()>, Error = Error>
h: &reactor::Handle) h: &reactor::Handle)
-> Box<Future<Item = T, Error = Error>> { -> Box<Future<Item = T, Error = Error>> {
let timeout = reactor::Timeout::new(Duration::new(5, 0), h).unwrap(); let timeout = reactor::Timeout::new(Duration::new(5, 0), h).unwrap();
let timed = fut.select(timeout.map(Err).map_err(|e| Error::IOErr(e))) let timed = fut.select(timeout.map(Err).from_err())
.then(|res| { .then(|res| {
match res { match res {
Ok((Ok(inner), _timeout)) => Ok(inner), Ok((Ok(inner), _timeout)) => Ok(inner),
Ok((_, _accept)) => Err(Error::TooLargeReadErr), Ok((_, _accept)) => Err(Error::Timeout),
Err((e, _other)) => Err(e), Err((e, _other)) => Err(e),
} }
}); });

View file

@ -12,16 +12,19 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::convert::From;
use std::io;
use std::net::{SocketAddr, IpAddr}; use std::net::{SocketAddr, IpAddr};
use std::sync::Arc; use std::sync::Arc;
use futures::Future; use futures::Future;
use tokio_core::net::TcpStream; use tokio_core::net::TcpStream;
use tokio_timer::TimerError;
use core::core; use core::core;
use core::core::hash::Hash; use core::core::hash::Hash;
use core::core::target::Difficulty; use core::core::target::Difficulty;
use core::ser::Error; use core::ser;
/// Maximum number of hashes in a block header locator request /// Maximum number of hashes in a block header locator request
pub const MAX_LOCATORS: u32 = 10; pub const MAX_LOCATORS: u32 = 10;
@ -35,6 +38,30 @@ pub const MAX_BLOCK_BODIES: u32 = 16;
/// Maximum number of peer addresses a peer should ever send /// Maximum number of peer addresses a peer should ever send
pub const MAX_PEER_ADDRS: u32 = 256; pub const MAX_PEER_ADDRS: u32 = 256;
#[derive(Debug)]
pub enum Error {
Serialization(ser::Error),
Connection(io::Error),
ConnectionClose,
Timeout,
}
impl From<ser::Error> for Error {
fn from(e: ser::Error) -> Error {
Error::Serialization(e)
}
}
impl From<io::Error> for Error {
fn from(e: io::Error) -> Error {
Error::Connection(e)
}
}
impl From<TimerError> for Error {
fn from(e: TimerError) -> Error {
Error::Timeout
}
}
/// Configuration for the peer-to-peer server. /// Configuration for the peer-to-peer server.
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
pub struct P2PConfig { pub struct P2PConfig {

View file

@ -47,18 +47,18 @@ fn peer_handshake() {
let rhandle = handle.clone(); let rhandle = handle.clone();
let timeout = reactor::Timeout::new(time::Duration::new(1, 0), &handle).unwrap(); let timeout = reactor::Timeout::new(time::Duration::new(1, 0), &handle).unwrap();
let timeout_send = reactor::Timeout::new(time::Duration::new(2, 0), &handle).unwrap(); let timeout_send = reactor::Timeout::new(time::Duration::new(2, 0), &handle).unwrap();
handle.spawn(timeout.map_err(|e| ser::Error::IOErr(e)).and_then(move |_| { handle.spawn(timeout.from_err().and_then(move |_| {
let p2p_conf = p2p::P2PConfig::default(); let p2p_conf = p2p::P2PConfig::default();
let addr = SocketAddr::new(p2p_conf.host, p2p_conf.port); let addr = SocketAddr::new(p2p_conf.host, p2p_conf.port);
let socket = TcpStream::connect(&addr, &phandle).map_err(|e| ser::Error::IOErr(e)); let socket = TcpStream::connect(&addr, &phandle).map_err(|e| p2p::Error::Connection(e));
socket.and_then(move |socket| { socket.and_then(move |socket| {
Peer::connect(socket, p2p::UNKNOWN, Difficulty::one(), my_addr, &p2p::handshake::Handshake::new()) Peer::connect(socket, p2p::UNKNOWN, Difficulty::one(), my_addr, &p2p::handshake::Handshake::new())
}).and_then(move |(socket, peer)| { }).and_then(move |(socket, peer)| {
rhandle.spawn(peer.run(socket, net_adapter.clone()).map_err(|e| { rhandle.spawn(peer.run(socket, net_adapter.clone()).map_err(|e| {
panic!("Client run failed: {}", e); panic!("Client run failed: {:?}", e);
})); }));
peer.send_ping().unwrap(); peer.send_ping().unwrap();
timeout_send.map_err(|e| ser::Error::IOErr(e)).map(|_| peer) timeout_send.from_err().map(|_| peer)
}).and_then(|peer| { }).and_then(|peer| {
let (sent, recv) = peer.transmitted_bytes(); let (sent, recv) = peer.transmitted_bytes();
assert!(sent > 0); assert!(sent > 0);
@ -70,7 +70,7 @@ fn peer_handshake() {
Ok(()) Ok(())
}) })
}).map_err(|e| { }).map_err(|e| {
panic!("Client connection failed: {}", e); panic!("Client connection failed: {:?}", e);
})); }));
evtlp.run(run_server).unwrap(); evtlp.run(run_server).unwrap();