From 67d2a04e9270a4a4fb81e30f09febdd0ab4b4f49 Mon Sep 17 00:00:00 2001 From: hashmap Date: Tue, 28 Aug 2018 11:37:08 +0200 Subject: [PATCH] Fix non-atomic peer update --- p2p/src/peers.rs | 17 ++--------------- p2p/src/store.rs | 28 ++++++++++++++++------------ 2 files changed, 18 insertions(+), 27 deletions(-) diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index 7d34ea734..b4b931b57 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -109,7 +109,8 @@ impl Peers { /// Get vec of peers we are currently connected to. pub fn connected_peers(&self) -> Vec>> { - let mut res = self.peers + let mut res = self + .peers .read() .unwrap() .values() @@ -254,13 +255,6 @@ impl Peers { error!(LOGGER, "Couldn't ban {}: {:?}", peer_addr, e); } - if let Err(e) = self.update_last_banned(*peer_addr, Utc::now().timestamp()) { - error!( - LOGGER, - "Couldn't update last_banned time {}: {:?}", peer_addr, e - ); - } - if let Some(peer) = self.get_connected_peer(peer_addr) { debug!(LOGGER, "Banning peer {}", peer_addr); // setting peer status will get it removed at the next clean_peer @@ -443,13 +437,6 @@ impl Peers { .map_err(From::from) } - /// Updates the last banned time of a peer in store - pub fn update_last_banned(&self, peer_addr: SocketAddr, last_banned: i64) -> Result<(), Error> { - self.store - .update_last_banned(peer_addr, last_banned) - .map_err(From::from) - } - /// Iterate over the peer list and prune all peers we have /// lost connection to or have been deemed problematic. /// Also avoid connected peer count getting too high. diff --git a/p2p/src/store.rs b/p2p/src/store.rs index 744210d33..4166f0fef 100644 --- a/p2p/src/store.rs +++ b/p2p/src/store.rs @@ -14,6 +14,7 @@ //! Storage implementation for peer data. +use chrono::Utc; use num::FromPrimitive; use rand::{thread_rng, Rng}; use std::net::SocketAddr; @@ -137,7 +138,8 @@ impl PeerStore { } pub fn find_peers(&self, state: State, cap: Capabilities, count: usize) -> Vec { - let mut peers = self.db + let mut peers = self + .db .iter::(&to_key(PEER_PREFIX, &mut "".to_string().into_bytes())) .unwrap() .filter(|p| p.flags == state && p.capabilities.contains(cap)) @@ -154,19 +156,21 @@ impl PeerStore { } /// Convenience method to load a peer data, update its status and save it - /// back. + /// back. If new state is Banned its last banned time will be updated too. pub fn update_state(&self, peer_addr: SocketAddr, new_state: State) -> Result<(), Error> { - let mut peer = self.get_peer(peer_addr)?; - peer.flags = new_state; - self.save_peer(&peer) - } + let batch = self.db.batch()?; - /// Convenience method to load a peer data, update its last banned time and - /// save it back. - pub fn update_last_banned(&self, peer_addr: SocketAddr, last_banned: i64) -> Result<(), Error> { - let mut peer = self.get_peer(peer_addr)?; - peer.last_banned = last_banned; - self.save_peer(&peer) + let mut peer = option_to_not_found( + batch.get_ser::(&peer_key(peer_addr)[..]), + &format!("Peer at address: {}", peer_addr), + )?; + peer.flags = new_state; + if new_state == State::Banned { + peer.last_banned = Utc::now().timestamp(); + } + + batch.put_ser(&peer_key(peer.addr)[..], &peer)?; + batch.commit() } }