From efe414bf07f9256a205b330ea41e8d9ed5e21ff5 Mon Sep 17 00:00:00 2001 From: Ignotus Peverell Date: Thu, 26 Oct 2017 17:48:51 +0000 Subject: [PATCH] No send back for blocks and transactions (#212) * Do not send txs and blocks to senders. Keeps a ring buffer of transaction and blocks hashes that a peer has received. Do not send what we've already received. * Test fix and fmt --- p2p/src/conn.rs | 111 ++++++++++++++----------- p2p/src/handshake.rs | 24 +++--- p2p/src/msg.rs | 31 ++++--- p2p/src/peer.rs | 159 ++++++++++++++++++++++++++++-------- p2p/src/protocol.rs | 47 +++++------ p2p/src/rate_limit.rs | 10 ++- p2p/src/server.rs | 60 ++++++++------ p2p/src/store.rs | 24 +++--- p2p/src/types.rs | 2 +- p2p/tests/peer_handshake.rs | 3 +- 10 files changed, 306 insertions(+), 165 deletions(-) diff --git a/p2p/src/conn.rs b/p2p/src/conn.rs index b8c349672..584086350 100644 --- a/p2p/src/conn.rs +++ b/p2p/src/conn.rs @@ -43,22 +43,26 @@ pub trait Handler: Sync + Send { /// Handle function to implement to process incoming messages. A sender to /// reply immediately as well as the message header and its unparsed body /// are provided. - fn handle(&self, - sender: UnboundedSender>, - header: MsgHeader, - body: Vec) - -> Result, ser::Error>; + fn handle( + &self, + sender: UnboundedSender>, + header: MsgHeader, + body: Vec, + ) -> Result, ser::Error>; } impl Handler for F - where F: Fn(UnboundedSender>, MsgHeader, Vec) -> Result, ser::Error>, - F: Sync + Send +where + F: Fn(UnboundedSender>, MsgHeader, Vec) + -> Result, ser::Error>, + F: Sync + Send, { - fn handle(&self, - sender: UnboundedSender>, - header: MsgHeader, - body: Vec) - -> Result, ser::Error> { + fn handle( + &self, + sender: UnboundedSender>, + header: MsgHeader, + body: Vec, + ) -> Result, ser::Error> { self(sender, header, body) } } @@ -88,10 +92,12 @@ impl Connection { /// Start listening on the provided connection and wraps it. Does not hang /// the current thread, instead just returns a future and the Connection /// itself. - pub fn listen(conn: TcpStream, - handler: F) - -> (Connection, Box>) - where F: Handler + 'static + pub fn listen( + conn: TcpStream, + handler: F, + ) -> (Connection, Box>) + where + F: Handler + 'static, { let (reader, writer) = conn.split(); @@ -106,9 +112,9 @@ impl Connection { // same for closing the connection let (close_tx, close_rx) = futures::sync::mpsc::channel(1); - let close_conn = close_rx - .for_each(|_| Ok(())) - .map_err(|_| Error::ConnectionClose); + let close_conn = close_rx.for_each(|_| Ok(())).map_err( + |_| Error::ConnectionClose, + ); let me = Connection { outbound_chan: tx.clone(), @@ -138,11 +144,13 @@ impl Connection { /// Prepares the future that gets message data produced by our system and /// sends it to the peer connection - fn write_msg(&self, - rx: UnboundedReceiver>, - writer: W) - -> Box> - where W: AsyncWrite + 'static + fn write_msg( + &self, + rx: UnboundedReceiver>, + writer: W, + ) -> Box> + where + W: AsyncWrite + 'static, { let sent_bytes = self.sent_bytes.clone(); @@ -163,13 +171,15 @@ impl Connection { /// Prepares the future reading from the peer connection, parsing each /// message and forwarding them appropriately based on their type - fn read_msg(&self, - sender: UnboundedSender>, - reader: R, - handler: F) - -> Box> - where F: Handler + 'static, - R: AsyncRead + 'static + fn read_msg( + &self, + sender: UnboundedSender>, + reader: R, + handler: F, + ) -> Box> + where + F: Handler + 'static, + R: AsyncRead + 'static, { // infinite iterator stream so we repeat the message reading logic until the @@ -223,12 +233,15 @@ impl Connection { let mut body_data = vec![]; try!(ser::serialize(&mut body_data, body)); let mut data = vec![]; - try!(ser::serialize(&mut data, &MsgHeader::new(t, body_data.len() as u64))); + try!(ser::serialize( + &mut data, + &MsgHeader::new(t, body_data.len() as u64), + )); data.append(&mut body_data); - self.outbound_chan - .unbounded_send(data) - .map_err(|_| Error::ConnectionClose) + self.outbound_chan.unbounded_send(data).map_err(|_| { + Error::ConnectionClose + }) } /// Bytes sent and received by this peer to the remote peer. @@ -249,10 +262,12 @@ pub struct TimeoutConnection { impl TimeoutConnection { /// Same as Connection - pub fn listen(conn: TcpStream, - handler: F) - -> (TimeoutConnection, Box>) - where F: Handler + 'static + pub fn listen( + conn: TcpStream, + handler: F, + ) -> (TimeoutConnection, Box>) + where + F: Handler + 'static, { let expects = Arc::new(Mutex::new(vec![])); @@ -296,17 +311,21 @@ impl TimeoutConnection { underlying: conn, expected_responses: expects, }; - (me, Box::new(fut.select(timer).map(|_| ()).map_err(|(e1, _)| e1))) + ( + me, + Box::new(fut.select(timer).map(|_| ()).map_err(|(e1, _)| e1)), + ) } /// Sends a request and registers a timer on the provided message type and /// optionally the hash of the sent data. - pub fn send_request(&self, - t: Type, - rt: Type, - body: &W, - expect_h: Option<(Hash)>) - -> Result<(), Error> { + pub fn send_request( + &self, + t: Type, + rt: Type, + body: &W, + expect_h: Option<(Hash)>, + ) -> Result<(), Error> { let _sent = try!(self.underlying.send_msg(t, body)); let mut expects = self.expected_responses.lock().unwrap(); diff --git a/p2p/src/handshake.rs b/p2p/src/handshake.rs index 16ad6cd5c..e674cda79 100644 --- a/p2p/src/handshake.rs +++ b/p2p/src/handshake.rs @@ -48,12 +48,13 @@ impl Handshake { } /// Handles connecting to a new remote peer, starting the version handshake. - pub fn connect(&self, - capab: Capabilities, - total_difficulty: Difficulty, - self_addr: SocketAddr, - conn: TcpStream) - -> Box> { + pub fn connect( + &self, + capab: Capabilities, + total_difficulty: Difficulty, + self_addr: SocketAddr, + conn: TcpStream, + ) -> Box> { // prepare the first part of the hanshake let nonce = self.next_nonce(); let hand = Hand { @@ -95,11 +96,12 @@ impl Handshake { /// Handles receiving a connection from a new remote peer that started the /// version handshake. - pub fn handshake(&self, - capab: Capabilities, - total_difficulty: Difficulty, - conn: TcpStream) - -> Box> { + pub fn handshake( + &self, + capab: Capabilities, + total_difficulty: Difficulty, + conn: TcpStream, + ) -> Box> { let nonces = self.nonces.clone(); Box::new( read_msg::(conn) diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index cd9ed92fc..2641650e0 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -70,7 +70,8 @@ enum_from_primitive! { /// the header first, handles its validation and then reads the Readable body, /// allocating buffers of the right size. pub fn read_msg(conn: TcpStream) -> Box> - where T: Readable + 'static +where + T: Readable + 'static, { let read_header = read_exact(conn, vec![0u8; HEADER_LEN as usize]) .from_err() @@ -98,11 +99,13 @@ pub fn read_msg(conn: TcpStream) -> Box(conn: TcpStream, - msg: T, - msg_type: Type) - -> Box> - where T: Writeable + 'static +pub fn write_msg( + conn: TcpStream, + msg: T, + msg_type: Type, +) -> Box> +where + T: Writeable + 'static, { let write_msg = ok((conn)).and_then(move |conn| { // prepare the body first so we know its serialized length @@ -223,7 +226,9 @@ impl Readable for Hand { let receiver_addr = try!(SockAddr::read(reader)); let ua = try!(reader.read_vec()); let user_agent = try!(String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData)); - let capabilities = try!(Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData)); + let capabilities = try!(Capabilities::from_bits(capab).ok_or( + ser::Error::CorruptedData, + )); Ok(Hand { version: version, capabilities: capabilities, @@ -270,7 +275,9 @@ impl Readable for Shake { let total_diff = try!(Difficulty::read(reader)); let ua = try!(reader.read_vec()); let user_agent = try!(String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData)); - let capabilities = try!(Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData)); + let capabilities = try!(Capabilities::from_bits(capab).ok_or( + ser::Error::CorruptedData, + )); Ok(Shake { version: version, capabilities: capabilities, @@ -295,7 +302,9 @@ impl Writeable for GetPeerAddrs { impl Readable for GetPeerAddrs { fn read(reader: &mut Reader) -> Result { let capab = try!(reader.read_u32()); - let capabilities = try!(Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData)); + let capabilities = try!(Capabilities::from_bits(capab).ok_or( + ser::Error::CorruptedData, + )); Ok(GetPeerAddrs { capabilities: capabilities }) } } @@ -352,7 +361,9 @@ impl Writeable for PeerError { impl Readable for PeerError { fn read(reader: &mut Reader) -> Result { let (code, msg) = ser_multiread!(reader, read_u32, read_vec); - let message = try!(String::from_utf8(msg).map_err(|_| ser::Error::CorruptedData)); + let message = try!(String::from_utf8(msg).map_err( + |_| ser::Error::CorruptedData, + )); Ok(PeerError { code: code, message: message, diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 893587d87..813adba78 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -19,12 +19,14 @@ use futures::Future; use tokio_core::net::TcpStream; use core::core; -use core::core::hash::Hash; +use core::core::hash::{Hash, Hashed}; use core::core::target::Difficulty; use handshake::Handshake; use types::*; use util::LOGGER; +const MAX_TRACK_SIZE: usize = 30; + #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum State { Connected, @@ -36,59 +38,66 @@ pub struct Peer { pub info: PeerInfo, proto: Box, state: Arc>, + // set of all hashes known to this peer (so no need to send) + tracking_adapter: TrackingAdapter, } unsafe impl Sync for Peer {} unsafe impl Send for Peer {} impl Peer { + // Only accept and connect can be externally used to build a peer + fn new(info: PeerInfo, proto: Box, na: Arc) -> Peer { + Peer { + info: info, + proto: proto, + state: Arc::new(RwLock::new(State::Connected)), + tracking_adapter: TrackingAdapter::new(na), + } + } + /// Initiates the handshake with another peer. - pub fn connect(conn: TcpStream, - capab: Capabilities, - total_difficulty: Difficulty, - self_addr: SocketAddr, - hs: &Handshake) - -> Box> { + pub fn connect( + conn: TcpStream, + capab: Capabilities, + total_difficulty: Difficulty, + self_addr: SocketAddr, + hs: &Handshake, + na: Arc, + ) -> Box> { let connect_peer = hs.connect(capab, total_difficulty, self_addr, conn) .and_then(|(conn, proto, info)| { - Ok((conn, - Peer { - info: info, - proto: Box::new(proto), - state: Arc::new(RwLock::new(State::Connected)), - })) + Ok((conn, Peer::new(info, Box::new(proto), na))) }); Box::new(connect_peer) } /// Accept a handshake initiated by another peer. - pub fn accept(conn: TcpStream, - capab: Capabilities, - total_difficulty: Difficulty, - hs: &Handshake) - -> Box> { - let hs_peer = hs.handshake(capab, total_difficulty, conn) - .and_then(|(conn, proto, info)| { - Ok((conn, - Peer { - info: info, - proto: Box::new(proto), - state: Arc::new(RwLock::new(State::Connected)), - })) - }); + pub fn accept( + conn: TcpStream, + capab: Capabilities, + total_difficulty: Difficulty, + hs: &Handshake, + na: Arc, + ) -> Box> { + let hs_peer = hs.handshake(capab, total_difficulty, conn).and_then( + |(conn, + proto, + info)| { + Ok((conn, Peer::new(info, Box::new(proto), na))) + }, + ); Box::new(hs_peer) } /// Main peer loop listening for messages and forwarding to the rest of the /// system. - pub fn run(&self, - conn: TcpStream, - na: Arc) - -> Box> { + pub fn run(&self, conn: TcpStream) -> Box> { let addr = self.info.addr; let state = self.state.clone(); - Box::new(self.proto.handle(conn, na).then(move |res| { + let adapter = Arc::new(self.tracking_adapter.clone()); + Box::new(self.proto.handle(conn, adapter).then(move |res| { // handle disconnection, standard disconnections aren't considered an error let mut state = state.write().unwrap(); match res { @@ -135,15 +144,21 @@ impl Peer { /// Sends the provided block to the remote peer. The request may be dropped /// if the remote peer is known to already have the block. pub fn send_block(&self, b: &core::Block) -> Result<(), Error> { - // TODO do not send if the peer sent us the block in the first place - self.proto.send_block(b) + if !self.tracking_adapter.has(b.hash()) { + self.proto.send_block(b) + } else { + Ok(()) + } } /// Sends the provided transaction to the remote peer. The request may be /// dropped if the remote peer is known to already have the transaction. pub fn send_transaction(&self, tx: &core::Transaction) -> Result<(), Error> { - // TODO do not send if the peer sent us the tx in the first place - self.proto.send_transaction(tx) + if !self.tracking_adapter.has(tx.hash()) { + self.proto.send_transaction(tx) + } else { + Ok(()) + } } pub fn send_header_request(&self, locator: Vec) -> Result<(), Error> { @@ -169,3 +184,75 @@ impl Peer { self.proto.close(); } } + +/// Adapter implementation that forwards everything to an underlying adapter +/// but keeps track of the block and transaction hashes that were received. +#[derive(Clone)] +struct TrackingAdapter { + adapter: Arc, + known: Arc>>, +} + +impl TrackingAdapter { + fn new(adapter: Arc) -> TrackingAdapter { + TrackingAdapter { + adapter: adapter, + known: Arc::new(RwLock::new(vec![])), + } + } + + fn has(&self, hash: Hash) -> bool { + let known = self.known.read().unwrap(); + // may become too slow, an ordered set (by timestamp for eviction) may + // end up being a better choice + known.contains(&hash) + } + + fn push(&self, hash: Hash) { + let mut known = self.known.write().unwrap(); + if known.len() > MAX_TRACK_SIZE { + known.truncate(MAX_TRACK_SIZE); + } + known.insert(0, hash); + } +} + +impl NetAdapter for TrackingAdapter { + fn total_difficulty(&self) -> Difficulty { + self.adapter.total_difficulty() + } + + fn transaction_received(&self, tx: core::Transaction) { + self.push(tx.hash()); + self.adapter.transaction_received(tx) + } + + fn block_received(&self, b: core::Block) { + self.push(b.hash()); + self.adapter.block_received(b) + } + + fn headers_received(&self, bh: Vec) { + self.adapter.headers_received(bh) + } + + fn locate_headers(&self, locator: Vec) -> Vec { + self.adapter.locate_headers(locator) + } + + fn get_block(&self, h: Hash) -> Option { + self.adapter.get_block(h) + } + + fn find_peer_addrs(&self, capab: Capabilities) -> Vec { + self.adapter.find_peer_addrs(capab) + } + + fn peer_addrs_received(&self, addrs: Vec) { + self.adapter.peer_addrs_received(addrs) + } + + fn peer_connected(&self, pi: &PeerInfo) { + self.adapter.peer_connected(pi) + } +} diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index 8d4270d0a..39777bec8 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::{Mutex, Arc}; +use std::sync::Arc; use futures::Future; use futures::sync::mpsc::UnboundedSender; @@ -30,25 +30,21 @@ use util::OneTime; #[allow(dead_code)] pub struct ProtocolV1 { conn: OneTime, - - expected_responses: Mutex>, } impl ProtocolV1 { pub fn new() -> ProtocolV1 { - ProtocolV1 { - conn: OneTime::new(), - expected_responses: Mutex::new(vec![]), - } + ProtocolV1 { conn: OneTime::new() } } } impl Protocol for ProtocolV1 { /// Sets up the protocol reading, writing and closing logic. - fn handle(&self, - conn: TcpStream, - adapter: Arc) - -> Box> { + fn handle( + &self, + conn: TcpStream, + adapter: Arc, + ) -> Box> { let (conn, listener) = TimeoutConnection::listen(conn, move |sender, header, data| { let adapt = adapter.as_ref(); @@ -114,21 +110,23 @@ impl ProtocolV1 { self.conn.borrow().send_msg(t, body) } - fn send_request(&self, - t: Type, - rt: Type, - body: &W, - expect_resp: Option) - -> Result<(), Error> { + fn send_request( + &self, + t: Type, + rt: Type, + body: &W, + expect_resp: Option, + ) -> Result<(), Error> { self.conn.borrow().send_request(t, rt, body, expect_resp) } } -fn handle_payload(adapter: &NetAdapter, - sender: UnboundedSender>, - header: MsgHeader, - buf: Vec) - -> Result, ser::Error> { +fn handle_payload( + adapter: &NetAdapter, + sender: UnboundedSender>, + header: MsgHeader, + buf: Vec, +) -> Result, ser::Error> { match header.msg_type { Type::Ping => { let data = ser::ser_vec(&MsgHeader::new(Type::Pong, 0))?; @@ -171,7 +169,10 @@ fn handle_payload(adapter: &NetAdapter, // serialize and send all the headers over let mut body_data = vec![]; - try!(ser::serialize(&mut body_data, &Headers { headers: headers })); + try!(ser::serialize( + &mut body_data, + &Headers { headers: headers }, + )); let mut data = vec![]; try!(ser::serialize( &mut data, diff --git a/p2p/src/rate_limit.rs b/p2p/src/rate_limit.rs index 8beac90f5..88cae3614 100644 --- a/p2p/src/rate_limit.rs +++ b/p2p/src/rate_limit.rs @@ -77,7 +77,10 @@ impl io::Read for ThrottledReader { // Check if Allowed if self.allowed < 1 { - return Err(io::Error::new(io::ErrorKind::WouldBlock, "Reached Allowed Read Limit")); + return Err(io::Error::new( + io::ErrorKind::WouldBlock, + "Reached Allowed Read Limit", + )); } // Read Max Allowed @@ -155,7 +158,10 @@ impl io::Write for ThrottledWriter { // Check if Allowed if self.allowed < 1 { - return Err(io::Error::new(io::ErrorKind::WouldBlock, "Reached Allowed Write Limit")); + return Err(io::Error::new( + io::ErrorKind::WouldBlock, + "Reached Allowed Write Limit", + )); } // Write max allowed diff --git a/p2p/src/server.rs b/p2p/src/server.rs index 99b1d373e..0aab932ab 100644 --- a/p2p/src/server.rs +++ b/p2p/src/server.rs @@ -104,14 +104,14 @@ impl Server { let peers = peers.clone(); // accept the peer and add it to the server map - let accept = Peer::accept(conn, capab, total_diff, &hs.clone()); - let added = add_to_peers(peers, adapter.clone(), accept); + let accept = Peer::accept(conn, capab, total_diff, &hs.clone(), adapter.clone()); + let added = add_to_peers(peers, adapter, accept); // wire in a future to timeout the accept after 5 secs let timed_peer = with_timeout(Box::new(added), &hp); // run the main peer protocol - timed_peer.and_then(move |(conn, peer)| peer.clone().run(conn, adapter)) + timed_peer.and_then(move |(conn, peer)| peer.clone().run(conn)) }); // spawn each peer future to its own task @@ -144,10 +144,11 @@ impl Server { } /// Asks the server to connect to a new peer. - pub fn connect_peer(&self, - addr: SocketAddr, - h: reactor::Handle) - -> Box>, Error = Error>> { + pub fn connect_peer( + &self, + addr: SocketAddr, + h: reactor::Handle, + ) -> Box>, Error = Error>> { if let Some(p) = self.get_peer(addr) { // if we're already connected to the addr, just return the peer return Box::new(future::ok(Some(p))); @@ -159,8 +160,7 @@ impl Server { // cloneapalooza let peers = self.peers.clone(); - let adapter1 = self.adapter.clone(); - let adapter2 = self.adapter.clone(); + let adapter = self.adapter.clone(); let capab = self.capabilities.clone(); let self_addr = SocketAddr::new(self.config.host, self.config.port); @@ -171,17 +171,23 @@ impl Server { let request = socket .and_then(move |socket| { let peers = peers.clone(); - let total_diff = adapter1.clone().total_difficulty(); + let total_diff = adapter.clone().total_difficulty(); // connect to the peer and add it to the server map, wiring it a timeout for // the handhake - let connect = - Peer::connect(socket, capab, total_diff, self_addr, &Handshake::new()); - let added = add_to_peers(peers, adapter1, connect); + let connect = Peer::connect( + socket, + capab, + total_diff, + self_addr, + &Handshake::new(), + adapter.clone(), + ); + let added = add_to_peers(peers, adapter, connect); with_timeout(Box::new(added), &h) }) .and_then(move |(socket, peer)| { - h2.spawn(peer.run(socket, adapter2).map_err(|e| { + h2.spawn(peer.run(socket).map_err(|e| { error!(LOGGER, "Peer error: {:?}", e); () })); @@ -300,11 +306,13 @@ impl Server { } // Adds the peer built by the provided future in the peers map -fn add_to_peers(peers: Arc>>>, - adapter: Arc, - peer_fut: A) - -> Box), ()>, Error = Error>> - where A: IntoFuture + 'static +fn add_to_peers( + peers: Arc>>>, + adapter: Arc, + peer_fut: A, +) -> Box), ()>, Error = Error>> +where + A: IntoFuture + 'static, { let peer_add = peer_fut.into_future().map(move |(conn, peer)| { adapter.peer_connected(&peer.info); @@ -317,15 +325,17 @@ fn add_to_peers(peers: Arc>>>, } // Adds a timeout to a future -fn with_timeout(fut: Box, Error = Error>>, - h: &reactor::Handle) - -> Box> { +fn with_timeout( + fut: Box, Error = Error>>, + h: &reactor::Handle, +) -> Box> { let timeout = reactor::Timeout::new(Duration::new(5, 0), h).unwrap(); - let timed = fut.select(timeout.map(Err).from_err()) - .then(|res| match res { + let timed = fut.select(timeout.map(Err).from_err()).then( + |res| match res { Ok((Ok(inner), _timeout)) => Ok(inner), Ok((_, _accept)) => Err(Error::Timeout), Err((e, _other)) => Err(e), - }); + }, + ); Box::new(timed) } diff --git a/p2p/src/store.rs b/p2p/src/store.rs index eb4296ca1..754aa0497 100644 --- a/p2p/src/store.rs +++ b/p2p/src/store.rs @@ -67,10 +67,10 @@ impl Readable for PeerData { fn read(reader: &mut Reader) -> Result { let addr = SockAddr::read(reader)?; let (capab, ua, fl) = ser_multiread!(reader, read_u32, read_vec, read_u8); - let user_agent = String::from_utf8(ua) - .map_err(|_| ser::Error::CorruptedData)?; - let capabilities = Capabilities::from_bits(capab) - .ok_or(ser::Error::CorruptedData)?; + let user_agent = String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData)?; + let capabilities = Capabilities::from_bits(capab).ok_or( + ser::Error::CorruptedData, + )?; match State::from_u8(fl) { Some(flags) => { Ok(PeerData { @@ -109,18 +109,22 @@ impl PeerStore { } pub fn exists_peer(&self, peer_addr: SocketAddr) -> Result { - self.db - .exists(&to_key(PEER_PREFIX, &mut format!("{}", peer_addr).into_bytes())[..]) + self.db.exists( + &to_key(PEER_PREFIX, &mut format!("{}", peer_addr).into_bytes())[..], + ) } pub fn delete_peer(&self, peer_addr: SocketAddr) -> Result<(), Error> { - self.db - .delete(&to_key(PEER_PREFIX, &mut format!("{}", peer_addr).into_bytes())[..]) + self.db.delete( + &to_key(PEER_PREFIX, &mut format!("{}", peer_addr).into_bytes())[..], + ) } pub fn find_peers(&self, state: State, cap: Capabilities, count: usize) -> Vec { - let peers_iter = self.db - .iter::(&to_key(PEER_PREFIX, &mut "".to_string().into_bytes())); + let peers_iter = self.db.iter::(&to_key( + PEER_PREFIX, + &mut "".to_string().into_bytes(), + )); let mut peers = Vec::with_capacity(count); for p in peers_iter { if p.flags == state && p.capabilities.contains(cap) { diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 284b59eb7..4c1a07c6c 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -118,7 +118,7 @@ pub trait Protocol { /// block so needs to be called withing a coroutine. Should also be called /// only once. fn handle(&self, conn: TcpStream, na: Arc) - -> Box>; + -> Box>; /// Sends a ping message to the remote peer. fn send_ping(&self) -> Result<(), Error>; diff --git a/p2p/tests/peer_handshake.rs b/p2p/tests/peer_handshake.rs index f6b52fbef..f9b715a2d 100644 --- a/p2p/tests/peer_handshake.rs +++ b/p2p/tests/peer_handshake.rs @@ -61,10 +61,11 @@ fn peer_handshake() { Difficulty::one(), my_addr, &p2p::handshake::Handshake::new(), + net_adapter.clone(), ) }) .and_then(move |(socket, peer)| { - rhandle.spawn(peer.run(socket, net_adapter.clone()).map_err(|e| { + rhandle.spawn(peer.run(socket).map_err(|e| { panic!("Client run failed: {:?}", e); })); peer.send_ping().unwrap();