diff --git a/core/src/global.rs b/core/src/global.rs index dc0c9a0a7..b65747038 100644 --- a/core/src/global.rs +++ b/core/src/global.rs @@ -69,6 +69,12 @@ pub const TESTNET3_INITIAL_DIFFICULTY: u64 = 30000; /// we're sure this peer is a stuck node, and we will kick out such kind of stuck peers. pub const STUCK_PEER_KICK_TIME: i64 = 2 * 3600 * 1000; +/// If a peer's last seen time is 2 weeks ago we will forget such kind of defunct peers. +const PEER_EXPIRATION_DAYS: i64 = 7 * 2; + +/// Constant that expresses defunct peer timeout in seconds to be used in checks. +pub const PEER_EXPIRATION_REMOVE_TIME: i64 = PEER_EXPIRATION_DAYS * 24 * 3600; + /// Testnet 4 initial block difficulty /// 1_000 times natural scale factor for cuckatoo29 pub const TESTNET4_INITIAL_DIFFICULTY: u64 = 1_000 * (2 << (29 - 24)) * 29; diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index a6c0e8087..a6cba0e64 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -21,8 +21,10 @@ use util::RwLock; use rand::{thread_rng, Rng}; use chrono::prelude::*; +use chrono::Duration; use core::core; use core::core::hash::{Hash, Hashed}; +use core::global; use core::pow::Difficulty; use peer::Peer; @@ -67,6 +69,7 @@ impl Peers { flags: State::Healthy, last_banned: 0, ban_reason: ReasonForBan::None, + last_connected: Utc::now().timestamp(), }; addr = peer.info.addr.clone(); } @@ -469,6 +472,31 @@ impl Peers { pub fn enough_peers(&self) -> bool { self.connected_peers().len() >= self.config.peer_min_preferred_count() as usize } + + /// Removes those peers that seem to have expired + pub fn remove_expired(&self) { + let now = Utc::now(); + + // Delete defunct peers from storage + let _ = self.store.delete_peers(|peer| { + let diff = now - Utc.timestamp(peer.last_connected, 0); + + let should_remove = peer.flags == State::Defunct + && diff > Duration::seconds(global::PEER_EXPIRATION_REMOVE_TIME); + + if should_remove { + debug!( + "removing peer {:?}: last connected {} days {} hours {} minutes ago.", + peer.addr, + diff.num_days(), + diff.num_hours(), + diff.num_minutes() + ); + } + + should_remove + }); + } } impl ChainAdapter for Peers { @@ -603,6 +631,7 @@ impl NetAdapter for Peers { flags: State::Healthy, last_banned: 0, ban_reason: ReasonForBan::None, + last_connected: Utc::now().timestamp(), }; if let Err(e) = self.save_peer(&peer) { error!("Could not save received peer address: {:?}", e); diff --git a/p2p/src/store.rs b/p2p/src/store.rs index f27d05c6c..1ca8a2bcb 100644 --- a/p2p/src/store.rs +++ b/p2p/src/store.rs @@ -57,6 +57,8 @@ pub struct PeerData { pub last_banned: i64, /// The reason for the ban pub ban_reason: ReasonForBan, + /// Time when we last connected to this peer. + pub last_connected: i64, } impl Writeable for PeerData { @@ -68,7 +70,8 @@ impl Writeable for PeerData { [write_bytes, &self.user_agent], [write_u8, self.flags as u8], [write_i64, self.last_banned], - [write_i32, self.ban_reason as i32] + [write_i32, self.ban_reason as i32], + [write_i64, self.last_connected] ); Ok(()) } @@ -77,12 +80,13 @@ impl Writeable for PeerData { impl Readable for PeerData { fn read(reader: &mut Reader) -> Result { let addr = SockAddr::read(reader)?; - let (capab, ua, fl, lb, br) = - ser_multiread!(reader, read_u32, read_vec, read_u8, read_i64, read_i32); + let (capab, ua, fl, lb, br, lc) = + ser_multiread!(reader, read_u32, read_vec, read_u8, read_i64, read_i32, read_i64); let user_agent = String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData)?; let capabilities = Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData)?; let last_banned = lb; let ban_reason = ReasonForBan::from_i32(br).ok_or(ser::Error::CorruptedData)?; + match State::from_u8(fl) { Some(flags) => Ok(PeerData { addr: addr.0, @@ -91,6 +95,7 @@ impl Readable for PeerData { flags: flags, last_banned: last_banned, ban_reason: ban_reason, + last_connected: lc, }), None => Err(ser::Error::CorruptedData), } @@ -171,6 +176,33 @@ impl PeerStore { batch.put_ser(&peer_key(peer.addr)[..], &peer)?; batch.commit() } + + /// Deletes peers from the storage that satisfy some condition `predicate` + pub fn delete_peers(&self, predicate: F) -> Result<(), Error> + where + F: Fn(&PeerData) -> bool, + { + let mut to_remove = vec![]; + + for x in self.all_peers() { + if predicate(&x) { + to_remove.push(x) + } + } + + // Delete peers in single batch + if !to_remove.is_empty() { + let batch = self.db.batch()?; + + for peer in to_remove { + batch.delete(&peer_key(peer.addr)[..])?; + } + + batch.commit()?; + } + + Ok(()) + } } fn peer_key(peer_addr: SocketAddr) -> Vec { diff --git a/servers/src/grin/seed.rs b/servers/src/grin/seed.rs index 923450f88..32fca7c76 100644 --- a/servers/src/grin/seed.rs +++ b/servers/src/grin/seed.rs @@ -59,10 +59,18 @@ pub fn connect_and_monitor( ); let mut prev = MIN_DATE.and_hms(0, 0, 0); + let mut prev_expire_check = MIN_DATE.and_hms(0, 0, 0); let mut prev_ping = Utc::now(); let mut start_attempt = 0; while !stop.load(Ordering::Relaxed) { + // Check for and remove expired peers from the storage + if Utc::now() - prev_expire_check > Duration::hours(1) { + peers.remove_expired(); + + prev_expire_check = Utc::now(); + } + // make several attempts to get peers as quick as possible // with exponential backoff if Utc::now() - prev > Duration::seconds(cmp::min(20, 1 << start_attempt)) { @@ -110,6 +118,7 @@ fn monitor_peers( let mut healthy_count = 0; let mut banned_count = 0; let mut defuncts = vec![]; + for x in peers.all_peers() { match x.flags { p2p::State::Banned => {