From d900f0b9342db226b642618efed5827b02059eef Mon Sep 17 00:00:00 2001 From: Ignotus Peverell Date: Mon, 27 Feb 2017 14:17:53 -0800 Subject: [PATCH] P2P server cleanup of disconnected peers. Mark those that don't respect the protocol as banned. --- grin/src/seed.rs | 13 ++++++++++++- p2p/src/conn.rs | 5 +++-- p2p/src/peer.rs | 14 ++++++++++++++ p2p/src/server.rs | 26 +++++++++++++++++++++++--- p2p/src/store.rs | 26 +++++++++++++++++++++++++- store/src/lib.rs | 10 ++++++---- 6 files changed, 83 insertions(+), 11 deletions(-) diff --git a/grin/src/seed.rs b/grin/src/seed.rs index f6ed4fe92..b71bb4fb8 100644 --- a/grin/src/seed.rs +++ b/grin/src/seed.rs @@ -79,6 +79,18 @@ impl Seeder { let mon_loop = Timer::default() .interval(time::Duration::from_secs(10)) .for_each(move |_| { + + // maintenance step first, clean up p2p server peers and mark bans + // if needed + let disconnected = p2p_server.clean_peers(); + for p in disconnected { + if p.is_banned() { + debug!("Marking peer {} as banned.", p.info.addr); + peer_store.update_state(p.info.addr, p2p::State::Banned); + } + } + + // we don't have enough peers, getting more from db if p2p_server.peer_count() < PEER_PREFERRED_COUNT { let mut peers = peer_store.find_peers(p2p::State::Healthy, p2p::UNKNOWN, @@ -91,7 +103,6 @@ impl Seeder { } } Ok(()) - // TODO clean disconnected server peers }) .map_err(|e| e.to_string()); Box::new(mon_loop) diff --git a/p2p/src/conn.rs b/p2p/src/conn.rs index cdecda818..f86e3cc37 100644 --- a/p2p/src/conn.rs +++ b/p2p/src/conn.rs @@ -141,8 +141,9 @@ impl Connection { data }) // write the data and make sure the future returns the right types - .fold(writer, - |writer, data| write_all(writer, data).map_err(|e| Error::Connection(e)).map(|(writer, buf)| writer)); + .fold(writer, |writer, data| { + write_all(writer, data).map_err(|e| Error::Connection(e)).map(|(writer, buf)| writer) + }); Box::new(send_data) } diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 11d3c9eea..dce0a0ff5 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -24,6 +24,7 @@ use core::core::target::Difficulty; use handshake::Handshake; use types::*; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] enum State { Connected, Disconnected, @@ -87,6 +88,7 @@ impl Peer { let addr = self.info.addr; let state = self.state.clone(); Box::new(self.proto.handle(conn, na).then(move |res| { + // handle disconnection, standard disconnections aren't considered an error let mut state = state.write().unwrap(); match res { Ok(res) => { @@ -108,6 +110,18 @@ impl Peer { })) } + /// Whether this peer is still connected. + pub fn is_connected(&self) -> bool { + let state = self.state.read().unwrap(); + *state == State::Connected + } + + /// Whether this peer has been banned. + pub fn is_banned(&self) -> bool { + let state = self.state.read().unwrap(); + *state == State::Banned + } + /// Bytes sent and received by this peer to the remote peer. pub fn transmitted_bytes(&self) -> (u64, u64) { self.proto.transmitted_bytes() diff --git a/p2p/src/server.rs b/p2p/src/server.rs index f4d610a67..b3c112daa 100644 --- a/p2p/src/server.rs +++ b/p2p/src/server.rs @@ -186,6 +186,24 @@ impl Server { Box::new(request) } + /// Have the server iterate over its peer list and prune all peers we have + /// lost connection to or have been deemed problematic. The removed peers + /// are returned. + pub fn clean_peers(&self) -> Vec> { + let mut peers = self.peers.write().unwrap(); + + let (keep, rm) = peers.iter().fold((vec![], vec![]), |mut acc, ref p| { + if p.clone().is_connected() { + acc.0.push((*p).clone()); + } else { + acc.1.push((*p).clone()); + } + acc + }); + *peers = keep; + rm + } + /// Returns the peer with the most worked branch, showing the highest total /// difficulty. pub fn most_work_peer(&self) -> Option> { @@ -195,7 +213,7 @@ impl Server { } let mut res = peers[0].clone(); for p in peers.deref() { - if res.info.total_difficulty < p.info.total_difficulty { + if p.is_connected() && res.info.total_difficulty < p.info.total_difficulty { res = (*p).clone(); } } @@ -219,8 +237,10 @@ impl Server { pub fn broadcast_block(&self, b: &core::Block) { let peers = self.peers.write().unwrap(); for p in peers.deref() { - if let Err(e) = p.send_block(b) { - debug!("Error sending block to peer: {:?}", e); + if p.is_connected() { + if let Err(e) = p.send_block(b) { + debug!("Error sending block to peer: {:?}", e); + } } } } diff --git a/p2p/src/store.rs b/p2p/src/store.rs index c628fd202..a87300825 100644 --- a/p2p/src/store.rs +++ b/p2p/src/store.rs @@ -18,7 +18,7 @@ use std::net::SocketAddr; use num::FromPrimitive; use core::ser::{self, Readable, Writeable, Reader, Writer}; -use grin_store::{self, Error, to_key}; +use grin_store::{self, Error, to_key, option_to_not_found}; use msg::SockAddr; use types::Capabilities; @@ -36,10 +36,16 @@ enum_from_primitive! { } } +/// Data stored for any given peer we've encountered. pub struct PeerData { + /// Network address of the peer. pub addr: SocketAddr, + /// What capabilities the peer advertises. Unknown until a successful + /// connection. pub capabilities: Capabilities, + /// The peer user agent. pub user_agent: String, + /// State the peer has been detected with. pub flags: State, } @@ -74,11 +80,13 @@ impl Readable for PeerData { } } +/// Storage facility for peer data. pub struct PeerStore { db: grin_store::Store, } impl PeerStore { + /// Instantiates a new peer store under the provided root path. pub fn new(root_path: String) -> Result { let db = grin_store::Store::open(format!("{}/{}", root_path, STORE_SUBPATH).as_str())?; Ok(PeerStore { db: db }) @@ -89,6 +97,10 @@ impl PeerStore { p) } + fn get_peer(&self, peer_addr: SocketAddr) -> Result { + option_to_not_found(self.db.get_ser(&peer_key(peer_addr)[..])) + } + pub fn exists_peer(&self, peer_addr: SocketAddr) -> Result { self.db.exists(&to_key(PEER_PREFIX, &mut format!("{}", peer_addr).into_bytes())[..]) } @@ -111,4 +123,16 @@ impl PeerStore { } peers } + + /// Convenience method to load a peer data, update its status and save it + /// back. + pub fn update_state(&self, peer_addr: SocketAddr, new_state: State) -> Result<(), Error> { + let mut peer = self.get_peer(peer_addr)?; + peer.flags = new_state; + self.save_peer(&peer) + } +} + +fn peer_key(peer_addr: SocketAddr) -> Vec { + to_key(PEER_PREFIX, &mut format!("{}", peer_addr).into_bytes()) } diff --git a/store/src/lib.rs b/store/src/lib.rs index 791d10d83..47c406bce 100644 --- a/store/src/lib.rs +++ b/store/src/lib.rs @@ -217,10 +217,12 @@ impl Iterator for SerIterator } /// Build a db key from a prefix and a byte vector identifier. -pub fn to_key(prefix: u8, id: &mut Vec) -> &mut Vec { - id.insert(0, SEP); - id.insert(0, prefix); - id +pub fn to_key(prefix: u8, k: &mut Vec) -> Vec { + let mut res = Vec::with_capacity(k.len() + 2); + res.push(prefix); + res.push(SEP); + res.append(k); + res } /// Build a db key from a prefix and a numeric identifier.