fix: avoid duplicate connection (#2361)

* fix: avoid duplicate connection
* ignore the duplicate connecting to same peer within 10 seconds
* refactor: use hashmap instead of vector for connecting history
* remove the double checking for already connected peer on connect
* add previous connecting time to the log
* fix a mistake on shrink
* move the now() into the inter loop for accurate time
* change the minimum allowed inverval time from 10s to 30s
This commit is contained in:
Gary Yu 2019-01-15 03:44:35 +08:00 committed by Ignotus Peverell
parent d7be94fafb
commit c7bb5eab27

View file

@ -17,9 +17,10 @@
//! configurable with either no peers, a user-defined list or a preset
//! list of DNS records (the default).
use chrono::prelude::Utc;
use chrono::prelude::{DateTime, Utc};
use chrono::{Duration, MIN_DATE};
use rand::{thread_rng, Rng};
use std::collections::HashMap;
use std::net::{SocketAddr, ToSocketAddrs};
use std::sync::{mpsc, Arc};
use std::{cmp, str, thread, time};
@ -76,6 +77,8 @@ pub fn connect_and_monitor(
let mut prev_ping = Utc::now();
let mut start_attempt = 0;
let mut connecting_history: HashMap<SocketAddr, DateTime<Utc>> = HashMap::new();
loop {
if stop_state.lock().is_stopped() {
break;
@ -98,7 +101,13 @@ pub fn connect_and_monitor(
// with exponential backoff
if Utc::now() - prev > Duration::seconds(cmp::min(20, 1 << start_attempt)) {
// try to connect to any address sent to the channel
listen_for_addrs(peers.clone(), p2p_server.clone(), capabilities, &rx);
listen_for_addrs(
peers.clone(),
p2p_server.clone(),
capabilities,
&rx,
&mut connecting_history,
);
// monitor additional peers if we need to add more
monitor_peers(
@ -297,6 +306,7 @@ fn listen_for_addrs(
p2p: Arc<p2p::Server>,
capab: p2p::Capabilities,
rx: &mpsc::Receiver<SocketAddr>,
connecting_history: &mut HashMap<SocketAddr, DateTime<Utc>>,
) {
if peers.peer_count() >= p2p.config.peer_max_count() {
// clean the rx messages to avoid accumulating
@ -304,7 +314,24 @@ fn listen_for_addrs(
return;
}
let connect_min_interval = 30;
for addr in rx.try_iter() {
// ignore the duplicate connecting to same peer within 30 seconds
let now = Utc::now();
if let Some(last_connect_time) = connecting_history.get(&addr) {
if *last_connect_time + Duration::seconds(connect_min_interval) > now {
debug!(
"peer_connect: ignore a duplicate request to {}. previous connecting time: {}",
addr,
last_connect_time.format("%H:%M:%S%.3f").to_string(),
);
continue;
} else {
*connecting_history.get_mut(&addr).unwrap() = now;
}
}
connecting_history.insert(addr, now);
let peers_c = peers.clone();
let p2p_c = p2p.clone();
let _ = thread::Builder::new()
@ -319,6 +346,20 @@ fn listen_for_addrs(
}
});
}
// shrink the connecting history.
// put a threshold here to avoid frequent shrinking in every call
if connecting_history.len() > 100 {
let now = Utc::now();
let old: Vec<_> = connecting_history
.iter()
.filter(|&(_, t)| *t + Duration::seconds(connect_min_interval) < now)
.map(|(s, _)| s.clone())
.collect();
for addr in old {
connecting_history.remove(&addr);
}
}
}
pub fn dns_seeds() -> Box<dyn Fn() -> Vec<SocketAddr> + Send> {