feat: remove expired peers from the storage (#1794)

* Initial expired peers removal
* Stop expired peers
* Simplify peer removal and remove only Defunct peers
* Make seed to check for expired peers every hour
* Get rid of unused vector of peers to remove
* Make peer deletion predicate closure immutable
This commit is contained in:
eupn 2018-10-22 23:59:40 +03:00 committed by Ignotus Peverell
parent 5f2e8db092
commit 8f42f7306c
4 changed files with 79 additions and 3 deletions

View file

@ -69,6 +69,12 @@ pub const TESTNET3_INITIAL_DIFFICULTY: u64 = 30000;
/// we're sure this peer is a stuck node, and we will kick out such kind of stuck peers. /// we're sure this peer is a stuck node, and we will kick out such kind of stuck peers.
pub const STUCK_PEER_KICK_TIME: i64 = 2 * 3600 * 1000; pub const STUCK_PEER_KICK_TIME: i64 = 2 * 3600 * 1000;
/// If a peer's last seen time is 2 weeks ago we will forget such kind of defunct peers.
const PEER_EXPIRATION_DAYS: i64 = 7 * 2;
/// Constant that expresses defunct peer timeout in seconds to be used in checks.
pub const PEER_EXPIRATION_REMOVE_TIME: i64 = PEER_EXPIRATION_DAYS * 24 * 3600;
/// Testnet 4 initial block difficulty /// Testnet 4 initial block difficulty
/// 1_000 times natural scale factor for cuckatoo29 /// 1_000 times natural scale factor for cuckatoo29
pub const TESTNET4_INITIAL_DIFFICULTY: u64 = 1_000 * (2 << (29 - 24)) * 29; pub const TESTNET4_INITIAL_DIFFICULTY: u64 = 1_000 * (2 << (29 - 24)) * 29;

View file

@ -21,8 +21,10 @@ use util::RwLock;
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use chrono::prelude::*; use chrono::prelude::*;
use chrono::Duration;
use core::core; use core::core;
use core::core::hash::{Hash, Hashed}; use core::core::hash::{Hash, Hashed};
use core::global;
use core::pow::Difficulty; use core::pow::Difficulty;
use peer::Peer; use peer::Peer;
@ -67,6 +69,7 @@ impl Peers {
flags: State::Healthy, flags: State::Healthy,
last_banned: 0, last_banned: 0,
ban_reason: ReasonForBan::None, ban_reason: ReasonForBan::None,
last_connected: Utc::now().timestamp(),
}; };
addr = peer.info.addr.clone(); addr = peer.info.addr.clone();
} }
@ -469,6 +472,31 @@ impl Peers {
pub fn enough_peers(&self) -> bool { pub fn enough_peers(&self) -> bool {
self.connected_peers().len() >= self.config.peer_min_preferred_count() as usize self.connected_peers().len() >= self.config.peer_min_preferred_count() as usize
} }
/// Removes those peers that seem to have expired
pub fn remove_expired(&self) {
let now = Utc::now();
// Delete defunct peers from storage
let _ = self.store.delete_peers(|peer| {
let diff = now - Utc.timestamp(peer.last_connected, 0);
let should_remove = peer.flags == State::Defunct
&& diff > Duration::seconds(global::PEER_EXPIRATION_REMOVE_TIME);
if should_remove {
debug!(
"removing peer {:?}: last connected {} days {} hours {} minutes ago.",
peer.addr,
diff.num_days(),
diff.num_hours(),
diff.num_minutes()
);
}
should_remove
});
}
} }
impl ChainAdapter for Peers { impl ChainAdapter for Peers {
@ -603,6 +631,7 @@ impl NetAdapter for Peers {
flags: State::Healthy, flags: State::Healthy,
last_banned: 0, last_banned: 0,
ban_reason: ReasonForBan::None, ban_reason: ReasonForBan::None,
last_connected: Utc::now().timestamp(),
}; };
if let Err(e) = self.save_peer(&peer) { if let Err(e) = self.save_peer(&peer) {
error!("Could not save received peer address: {:?}", e); error!("Could not save received peer address: {:?}", e);

View file

@ -57,6 +57,8 @@ pub struct PeerData {
pub last_banned: i64, pub last_banned: i64,
/// The reason for the ban /// The reason for the ban
pub ban_reason: ReasonForBan, pub ban_reason: ReasonForBan,
/// Time when we last connected to this peer.
pub last_connected: i64,
} }
impl Writeable for PeerData { impl Writeable for PeerData {
@ -68,7 +70,8 @@ impl Writeable for PeerData {
[write_bytes, &self.user_agent], [write_bytes, &self.user_agent],
[write_u8, self.flags as u8], [write_u8, self.flags as u8],
[write_i64, self.last_banned], [write_i64, self.last_banned],
[write_i32, self.ban_reason as i32] [write_i32, self.ban_reason as i32],
[write_i64, self.last_connected]
); );
Ok(()) Ok(())
} }
@ -77,12 +80,13 @@ impl Writeable for PeerData {
impl Readable for PeerData { impl Readable for PeerData {
fn read(reader: &mut Reader) -> Result<PeerData, ser::Error> { fn read(reader: &mut Reader) -> Result<PeerData, ser::Error> {
let addr = SockAddr::read(reader)?; let addr = SockAddr::read(reader)?;
let (capab, ua, fl, lb, br) = let (capab, ua, fl, lb, br, lc) =
ser_multiread!(reader, read_u32, read_vec, read_u8, read_i64, read_i32); ser_multiread!(reader, read_u32, read_vec, read_u8, read_i64, read_i32, read_i64);
let user_agent = String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData)?; let user_agent = String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData)?;
let capabilities = Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData)?; let capabilities = Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData)?;
let last_banned = lb; let last_banned = lb;
let ban_reason = ReasonForBan::from_i32(br).ok_or(ser::Error::CorruptedData)?; let ban_reason = ReasonForBan::from_i32(br).ok_or(ser::Error::CorruptedData)?;
match State::from_u8(fl) { match State::from_u8(fl) {
Some(flags) => Ok(PeerData { Some(flags) => Ok(PeerData {
addr: addr.0, addr: addr.0,
@ -91,6 +95,7 @@ impl Readable for PeerData {
flags: flags, flags: flags,
last_banned: last_banned, last_banned: last_banned,
ban_reason: ban_reason, ban_reason: ban_reason,
last_connected: lc,
}), }),
None => Err(ser::Error::CorruptedData), None => Err(ser::Error::CorruptedData),
} }
@ -171,6 +176,33 @@ impl PeerStore {
batch.put_ser(&peer_key(peer.addr)[..], &peer)?; batch.put_ser(&peer_key(peer.addr)[..], &peer)?;
batch.commit() batch.commit()
} }
/// Deletes peers from the storage that satisfy some condition `predicate`
pub fn delete_peers<F>(&self, predicate: F) -> Result<(), Error>
where
F: Fn(&PeerData) -> bool,
{
let mut to_remove = vec![];
for x in self.all_peers() {
if predicate(&x) {
to_remove.push(x)
}
}
// Delete peers in single batch
if !to_remove.is_empty() {
let batch = self.db.batch()?;
for peer in to_remove {
batch.delete(&peer_key(peer.addr)[..])?;
}
batch.commit()?;
}
Ok(())
}
} }
fn peer_key(peer_addr: SocketAddr) -> Vec<u8> { fn peer_key(peer_addr: SocketAddr) -> Vec<u8> {

View file

@ -59,10 +59,18 @@ pub fn connect_and_monitor(
); );
let mut prev = MIN_DATE.and_hms(0, 0, 0); let mut prev = MIN_DATE.and_hms(0, 0, 0);
let mut prev_expire_check = MIN_DATE.and_hms(0, 0, 0);
let mut prev_ping = Utc::now(); let mut prev_ping = Utc::now();
let mut start_attempt = 0; let mut start_attempt = 0;
while !stop.load(Ordering::Relaxed) { while !stop.load(Ordering::Relaxed) {
// Check for and remove expired peers from the storage
if Utc::now() - prev_expire_check > Duration::hours(1) {
peers.remove_expired();
prev_expire_check = Utc::now();
}
// make several attempts to get peers as quick as possible // make several attempts to get peers as quick as possible
// with exponential backoff // with exponential backoff
if Utc::now() - prev > Duration::seconds(cmp::min(20, 1 << start_attempt)) { if Utc::now() - prev > Duration::seconds(cmp::min(20, 1 << start_attempt)) {
@ -110,6 +118,7 @@ fn monitor_peers(
let mut healthy_count = 0; let mut healthy_count = 0;
let mut banned_count = 0; let mut banned_count = 0;
let mut defuncts = vec![]; let mut defuncts = vec![];
for x in peers.all_peers() { for x in peers.all_peers() {
match x.flags { match x.flags {
p2p::State::Banned => { p2p::State::Banned => {