From 631201358fef908bf031f2ee1a648b60950201b9 Mon Sep 17 00:00:00 2001 From: hashmap Date: Tue, 21 Aug 2018 00:32:13 +0200 Subject: [PATCH] Refactor and optimise peers.rs (#1389) * Use internal hashmap for count and contains methods instead of relying on connected_peers method which is expensive (creates vector from hashmap) * Reduce number of `clone()` * Refactor broadcast_xxx --- p2p/src/peers.rs | 150 ++++++++++++++++++----------------------------- p2p/src/serv.rs | 4 +- 2 files changed, 60 insertions(+), 94 deletions(-) diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index 777b6dc48..41c29d0f5 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -54,7 +54,7 @@ impl Peers { /// Adds the peer to our internal peer mapping. Note that the peer is still /// returned so the server can run it. - pub fn add_connected(&self, p: Peer) -> Arc> { + pub fn add_connected(&self, p: Peer) -> Result>, Error> { debug!(LOGGER, "Saving newly connected peer {}.", p.info.addr); let peer_data = PeerData { addr: p.info.addr, @@ -64,9 +64,7 @@ impl Peers { last_banned: 0, ban_reason: ReasonForBan::None, }; - if let Err(e) = self.save_peer(&peer_data) { - error!(LOGGER, "Could not save connected peer: {:?}", e); - } + self.save_peer(&peer_data)?; let addr = p.info.addr.clone(); let apeer = Arc::new(RwLock::new(p)); @@ -74,7 +72,7 @@ impl Peers { let mut peers = self.peers.write().unwrap(); peers.insert(addr, apeer.clone()); } - apeer.clone() + Ok(apeer) } // Update the dandelion relay @@ -106,7 +104,7 @@ impl Peers { } pub fn is_known(&self, addr: &SocketAddr) -> bool { - self.get_connected_peer(addr).is_some() + self.peers.read().unwrap().contains_key(addr) } /// Get vec of peers we are currently connected to. @@ -125,13 +123,11 @@ impl Peers { pub fn outgoing_connected_peers(&self) -> Vec>> { let peers = self.connected_peers(); let res = peers - .iter() + .into_iter() .filter(|x| match x.try_read() { Ok(peer) => peer.info.direction == Direction::Outbound, Err(_) => false, - }) - .cloned() - .collect::>(); + }).collect::>(); res } @@ -142,7 +138,7 @@ impl Peers { /// Number of peers we're currently connected to. pub fn peer_count(&self) -> u32 { - self.connected_peers().len() as u32 + self.peers.read().unwrap().len() as u32 } // Return vec of connected peers that currently advertise more work @@ -156,13 +152,11 @@ impl Peers { let total_difficulty = self.total_difficulty(); let mut max_peers = peers - .iter() + .into_iter() .filter(|x| match x.try_read() { Ok(peer) => peer.info.total_difficulty > total_difficulty, Err(_) => false, - }) - .cloned() - .collect::>(); + }).collect::>(); thread_rng().shuffle(&mut max_peers); max_peers @@ -179,16 +173,14 @@ impl Peers { let total_difficulty = self.total_difficulty(); let mut max_peers = peers - .iter() + .into_iter() .filter(|x| match x.try_read() { Ok(peer) => { peer.info.total_difficulty > total_difficulty && peer.info.capabilities.contains(Capabilities::FULL_HIST) } Err(_) => false, - }) - .cloned() - .collect::>(); + }).collect::>(); thread_rng().shuffle(&mut max_peers); max_peers @@ -196,18 +188,12 @@ impl Peers { /// Returns single random peer with more work than us. pub fn more_work_peer(&self) -> Option>> { - match self.more_work_peers().first() { - Some(x) => Some(x.clone()), - None => None, - } + self.more_work_peers().pop() } /// Returns single random archival peer with more work than us. pub fn more_work_archival_peer(&self) -> Option>> { - match self.more_work_archival_peers().first() { - Some(x) => Some(x.clone()), - None => None, - } + self.more_work_archival_peers().pop() } /// Return vec of connected peers that currently have the most worked @@ -223,18 +209,15 @@ impl Peers { .map(|x| match x.try_read() { Ok(peer) => peer.info.total_difficulty.clone(), Err(_) => Difficulty::zero(), - }) - .max() + }).max() .unwrap(); let mut max_peers = peers - .iter() + .into_iter() .filter(|x| match x.try_read() { Ok(peer) => peer.info.total_difficulty == max_total_difficulty, Err(_) => false, - }) - .cloned() - .collect::>(); + }).collect::>(); thread_rng().shuffle(&mut max_peers); max_peers @@ -243,10 +226,7 @@ impl Peers { /// Returns single random peer with the most worked branch, showing the /// highest total difficulty. pub fn most_work_peer(&self) -> Option>> { - match self.most_work_peers().first() { - Some(x) => Some(x.clone()), - None => None, - } + self.most_work_peers().pop() } pub fn is_banned(&self, peer_addr: SocketAddr) -> bool { @@ -260,11 +240,11 @@ impl Peers { /// Ban a peer, disconnecting it if we're currently connected pub fn ban_peer(&self, peer_addr: &SocketAddr, ban_reason: ReasonForBan) { - if let Err(e) = self.update_state(peer_addr.clone(), State::Banned) { + if let Err(e) = self.update_state(*peer_addr, State::Banned) { error!(LOGGER, "Couldn't ban {}: {:?}", peer_addr, e); } - if let Err(e) = self.update_last_banned(peer_addr.clone(), Utc::now().timestamp()) { + if let Err(e) = self.update_last_banned(*peer_addr, Utc::now().timestamp()) { error!( LOGGER, "Couldn't update last_banned time {}: {:?}", peer_addr, e @@ -283,10 +263,10 @@ impl Peers { /// Unban a peer, checks if it exists and banned then unban pub fn unban_peer(&self, peer_addr: &SocketAddr) { - match self.get_peer(peer_addr.clone()) { + match self.get_peer(*peer_addr) { Ok(_) => { - if self.is_banned(peer_addr.clone()) { - if let Err(e) = self.update_state(peer_addr.clone(), State::Healthy) { + if self.is_banned(*peer_addr) { + if let Err(e) = self.update_state(*peer_addr, State::Healthy) { error!(LOGGER, "Couldn't unban {}: {:?}", peer_addr, e) } } else { @@ -297,25 +277,33 @@ impl Peers { }; } - /// Broadcasts the provided block to PEER_PREFERRED_COUNT of our peers. - /// We may be connected to PEER_MAX_COUNT peers so we only - /// want to broadcast to a random subset of peers. - /// A peer implementation may drop the broadcast request - /// if it knows the remote peer already has the block. - pub fn broadcast_block(&self, b: &core::Block) { + fn broadcast(&self, obj_name: &str, f: F) -> u8 + where + F: Fn(&Peer) -> Result<(), Error>, + { let peers = self.connected_peers(); let preferred_peers = 8; let mut count = 0; for p in peers.iter().take(preferred_peers) { let p = p.read().unwrap(); if p.is_connected() { - if let Err(e) = p.send_block(b) { - debug!(LOGGER, "Error sending block to peer: {:?}", e); + if let Err(e) = f(&p) { + debug!(LOGGER, "Error sending {} to peer: {:?}", obj_name, e); } else { count += 1; } } } + count + } + + /// Broadcasts the provided block to PEER_PREFERRED_COUNT of our peers. + /// We may be connected to PEER_MAX_COUNT peers so we only + /// want to broadcast to a random subset of peers. + /// A peer implementation may drop the broadcast request + /// if it knows the remote peer already has the block. + pub fn broadcast_block(&self, b: &core::Block) { + let count = self.broadcast("block", |p| p.send_block(b)); debug!( LOGGER, "broadcast_block: {} @ {} [{}] was sent to {} peers.", @@ -326,20 +314,13 @@ impl Peers { ); } + /// Broadcasts the provided compact block to PEER_PREFERRED_COUNT of our peers. + /// We may be connected to PEER_MAX_COUNT peers so we only + /// want to broadcast to a random subset of peers. + /// A peer implementation may drop the broadcast request + /// if it knows the remote peer already has the block. pub fn broadcast_compact_block(&self, b: &core::CompactBlock) { - let peers = self.connected_peers(); - let preferred_peers = 8; - let mut count = 0; - for p in peers.iter().take(preferred_peers) { - let p = p.read().unwrap(); - if p.is_connected() { - if let Err(e) = p.send_compact_block(b) { - debug!(LOGGER, "Error sending compact block to peer: {:?}", e); - } else { - count += 1; - } - } - } + let count = self.broadcast("compact block", |p| p.send_compact_block(b)); debug!( LOGGER, "broadcast_compact_block: {}, {} at {}, to {} peers, done.", @@ -350,25 +331,13 @@ impl Peers { ); } - /// Broadcasts the provided block to PEER_PREFERRED_COUNT of our peers. + /// Broadcasts the provided header to PEER_PREFERRED_COUNT of our peers. /// We may be connected to PEER_MAX_COUNT peers so we only /// want to broadcast to a random subset of peers. /// A peer implementation may drop the broadcast request - /// if it knows the remote peer already has the block. + /// if it knows the remote peer already has the header. pub fn broadcast_header(&self, bh: &core::BlockHeader) { - let peers = self.connected_peers(); - let preferred_peers = 8; - let mut count = 0; - for p in peers.iter().take(preferred_peers) { - let p = p.read().unwrap(); - if p.is_connected() { - if let Err(e) = p.send_header(bh) { - debug!(LOGGER, "Error sending header to peer: {:?}", e); - } else { - count += 1; - } - } - } + let count = self.broadcast("header", |p| p.send_header(bh)); trace!( LOGGER, "broadcast_header: {}, {} at {}, to {} peers, done.", @@ -411,15 +380,13 @@ impl Peers { /// A peer implementation may drop the broadcast request /// if it knows the remote peer already has the transaction. pub fn broadcast_transaction(&self, tx: &core::Transaction) { - let peers = self.connected_peers(); - for p in peers.iter().take(8) { - let p = p.read().unwrap(); - if p.is_connected() { - if let Err(e) = p.send_transaction(tx) { - debug!(LOGGER, "Error sending transaction to peer: {:?}", e); - } - } - } + let count = self.broadcast("transaction", |p| p.send_transaction(tx)); + trace!( + LOGGER, + "broadcast_transaction: {}, to {} peers, done.", + tx.hash(), + count, + ); } /// Ping all our connected peers. Always automatically expects a pong back @@ -429,7 +396,7 @@ impl Peers { for p in peers_map.values() { let p = p.read().unwrap(); if p.is_connected() { - let _ = p.send_ping(total_difficulty.clone(), height); + let _ = p.send_ping(total_difficulty, height); } } } @@ -494,7 +461,7 @@ impl Peers { // now clean up peer map based on the list to remove { let mut peers = self.peers.write().unwrap(); - for p in rm.clone() { + for p in rm { let p = p.read().unwrap(); peers.remove(&p.info.addr); } @@ -502,7 +469,7 @@ impl Peers { // ensure we do not have too many connected peers let excess_count = { - let peer_count = self.peer_count().clone() as usize; + let peer_count = self.peer_count() as usize; if peer_count > max_count { peer_count - max_count } else { @@ -517,8 +484,7 @@ impl Peers { .map(|x| { let p = x.read().unwrap(); p.info.addr.clone() - }) - .collect::>() + }).collect::>() }; // now remove them taking a short-lived write lock each time diff --git a/p2p/src/serv.rs b/p2p/src/serv.rs index 55d66a60e..ee895209f 100644 --- a/p2p/src/serv.rs +++ b/p2p/src/serv.rs @@ -168,7 +168,7 @@ impl Server { &self.handshake, self.peers.clone(), )?; - let added = self.peers.add_connected(peer); + let added = self.peers.add_connected(peer)?; { let mut peer = added.write().unwrap(); peer.start(stream); @@ -193,7 +193,7 @@ impl Server { &self.handshake, self.peers.clone(), )?; - let added = self.peers.add_connected(peer); + let added = self.peers.add_connected(peer)?; let mut peer = added.write().unwrap(); peer.start(stream); Ok(())