Refactoring and cleanup on seed ()

This commit is contained in:
Gary Yu 2018-09-15 07:09:25 +08:00 committed by Ignotus Peverell
parent e7eb26eed4
commit d3a5ee130f

View file

@ -17,16 +17,11 @@
//!
use chrono::prelude::Utc;
use chrono::Duration;
use std::io;
use chrono::{Duration, MIN_DATE};
use std::net::{SocketAddr, ToSocketAddrs};
use std::str;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{mpsc, Arc};
use std::thread;
use std::time;
use api;
use std::{cmp, io, str, thread, time};
use p2p;
use pool::DandelionConfig;
@ -64,18 +59,13 @@ pub fn connect_and_monitor(
preferred_peers.clone(),
);
let mut prev = Utc::now() - Duration::seconds(60);
let mut prev = MIN_DATE.and_hms(0, 0, 0);
let mut start_attempt = 0;
let start_wait_base: i64 = 2;
loop {
let current_time = Utc::now();
// make several attempts to get peers as quick as possible with
// exponential backoff
if (peers.peer_count() < p2p_server.config.peer_min_preferred_count()
&& current_time - prev > Duration::seconds(start_wait_base.pow(start_attempt)))
|| current_time - prev > Duration::seconds(20)
{
while !stop.load(Ordering::Relaxed) {
// make several attempts to get peers as quick as possible
// with exponential backoff
if Utc::now() - prev > Duration::seconds(cmp::min(20, 1 << start_attempt)) {
// try to connect to any address sent to the channel
listen_for_addrs(peers.clone(), p2p_server.clone(), capabilities, &rx);
@ -90,17 +80,11 @@ pub fn connect_and_monitor(
update_dandelion_relay(peers.clone(), dandelion_config.clone());
prev = current_time;
if start_attempt < 6 {
start_attempt += 1;
}
prev = Utc::now();
start_attempt = cmp::min(6, start_attempt + 1);
}
thread::sleep(time::Duration::from_secs(1));
if stop.load(Ordering::Relaxed) {
break;
}
}
});
}
@ -268,73 +252,47 @@ fn listen_for_addrs(
capab: p2p::Capabilities,
rx: &mpsc::Receiver<SocketAddr>,
) {
let pc = peers.peer_count();
if peers.peer_count() >= p2p.config.peer_max_count() {
// clean the rx messages to avoid accumulating
for _ in rx.try_iter() {}
return;
}
for addr in rx.try_iter() {
if pc < p2p.config.peer_max_count() {
let peers_c = peers.clone();
let p2p_c = p2p.clone();
let _ = thread::Builder::new()
.name("peer_connect".to_string())
.spawn(move || {
let mut connect_retry_count = 0;
loop {
let connect_peer = p2p_c.connect(&addr);
match connect_peer {
Ok(p) => {
trace!(
LOGGER,
"connect_and_req: on {}:{}. connect to {} ok. attempting send_peer_request",
p2p_c.config.host,
p2p_c.config.port,
addr
);
if let Ok(p) = p.try_read() {
let _ = p.send_peer_request(capab);
}
let _ = peers_c.update_state(addr, p2p::State::Healthy);
break;
let peers_c = peers.clone();
let p2p_c = p2p.clone();
let _ = thread::Builder::new()
.name("peer_connect".to_string())
.spawn(move || {
// connect and retry on fail, but for 3 times at most
for _ in 0..3 {
match p2p_c.connect(&addr) {
Ok(p) => {
if let Ok(p) = p.try_read() {
let _ = p.send_peer_request(capab);
}
Err(e) => {
debug!(
LOGGER,
"connect_and_req: on {}:{}. connect to {} is Defunct. {:?}",
p2p_c.config.host,
p2p_c.config.port,
addr,
e,
);
let _ = peers_c.update_state(addr, p2p::State::Defunct);
// don't retry if connection refused or PeerWithSelf
match e {
p2p::Error::Connection(io_err) => {
if io::ErrorKind::ConnectionRefused == io_err.kind() {
break;
}
}
p2p::Error::PeerWithSelf => break,
_ => (),
}
}
}
// retry for 3 times
thread::sleep(time::Duration::from_secs(1));
connect_retry_count += 1;
if connect_retry_count >= 3 {
let _ = peers_c.update_state(addr, p2p::State::Healthy);
break;
}
debug!(
LOGGER,
"connect_and_req: on {}:{}. connect to {} retrying {}",
p2p_c.config.host,
p2p_c.config.port,
addr,
connect_retry_count,
);
Err(e) => {
let _ = peers_c.update_state(addr, p2p::State::Defunct);
// don't retry if connection refused or PeerWithSelf
match e {
p2p::Error::Connection(io_err) => {
if io::ErrorKind::ConnectionRefused == io_err.kind() {
break;
}
}
p2p::Error::PeerWithSelf => break,
_ => (), // allow to retry on any other error
}
}
}
});
}
thread::sleep(time::Duration::from_secs(1));
}
});
}
}