maintain preferred peer connections (#3435)

This commit is contained in:
Antioch Peverell 2020-09-08 08:22:19 +01:00 committed by GitHub
parent 7dc94576bd
commit 655e080963
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 34 additions and 26 deletions

View file

@ -416,7 +416,12 @@ impl Peers {
/// Iterate over the peer list and prune all peers we have /// Iterate over the peer list and prune all peers we have
/// lost connection to or have been deemed problematic. /// lost connection to or have been deemed problematic.
/// Also avoid connected peer count getting too high. /// Also avoid connected peer count getting too high.
pub fn clean_peers(&self, max_inbound_count: usize, max_outbound_count: usize) { pub fn clean_peers(
&self,
max_inbound_count: usize,
max_outbound_count: usize,
preferred_peers: &[PeerAddr],
) {
let mut rm = vec![]; let mut rm = vec![];
// build a list of peers to be cleaned up // build a list of peers to be cleaned up
@ -464,12 +469,13 @@ impl Peers {
let excess_outgoing_count = let excess_outgoing_count =
(self.peer_outbound_count() as usize).saturating_sub(max_outbound_count); (self.peer_outbound_count() as usize).saturating_sub(max_outbound_count);
if excess_outgoing_count > 0 { if excess_outgoing_count > 0 {
let mut addrs = self let mut addrs: Vec<_> = self
.outgoing_connected_peers() .outgoing_connected_peers()
.iter() .iter()
.filter(|x| !preferred_peers.contains(&x.info.addr))
.take(excess_outgoing_count) .take(excess_outgoing_count)
.map(|x| x.info.addr) .map(|x| x.info.addr)
.collect::<Vec<_>>(); .collect();
rm.append(&mut addrs); rm.append(&mut addrs);
} }
@ -477,12 +483,13 @@ impl Peers {
let excess_incoming_count = let excess_incoming_count =
(self.peer_inbound_count() as usize).saturating_sub(max_inbound_count); (self.peer_inbound_count() as usize).saturating_sub(max_inbound_count);
if excess_incoming_count > 0 { if excess_incoming_count > 0 {
let mut addrs = self let mut addrs: Vec<_> = self
.incoming_connected_peers() .incoming_connected_peers()
.iter() .iter()
.filter(|x| !preferred_peers.contains(&x.info.addr))
.take(excess_incoming_count) .take(excess_incoming_count)
.map(|x| x.info.addr) .map(|x| x.info.addr)
.collect::<Vec<_>>(); .collect();
rm.append(&mut addrs); rm.append(&mut addrs);
} }

View file

@ -52,9 +52,11 @@ pub fn connect_and_monitor(
p2p_server: Arc<p2p::Server>, p2p_server: Arc<p2p::Server>,
capabilities: p2p::Capabilities, capabilities: p2p::Capabilities,
seed_list: Box<dyn Fn() -> Vec<PeerAddr> + Send>, seed_list: Box<dyn Fn() -> Vec<PeerAddr> + Send>,
preferred_peers: Option<Vec<PeerAddr>>, preferred_peers: &[PeerAddr],
stop_state: Arc<StopState>, stop_state: Arc<StopState>,
) -> std::io::Result<thread::JoinHandle<()>> { ) -> std::io::Result<thread::JoinHandle<()>> {
let preferred_peers = preferred_peers.to_vec();
thread::Builder::new() thread::Builder::new()
.name("seed".to_string()) .name("seed".to_string())
.spawn(move || { .spawn(move || {
@ -69,7 +71,7 @@ pub fn connect_and_monitor(
peers.clone(), peers.clone(),
tx.clone(), tx.clone(),
seed_list, seed_list,
preferred_peers.clone(), &preferred_peers,
); );
let mut prev = MIN_DATE.and_hms(0, 0, 0); let mut prev = MIN_DATE.and_hms(0, 0, 0);
@ -113,7 +115,7 @@ pub fn connect_and_monitor(
peers.clone(), peers.clone(),
p2p_server.config.clone(), p2p_server.config.clone(),
tx.clone(), tx.clone(),
preferred_peers.clone(), &preferred_peers,
); );
prev = Utc::now(); prev = Utc::now();
@ -141,7 +143,7 @@ fn monitor_peers(
peers: Arc<p2p::Peers>, peers: Arc<p2p::Peers>,
config: p2p::P2PConfig, config: p2p::P2PConfig,
tx: mpsc::Sender<PeerAddr>, tx: mpsc::Sender<PeerAddr>,
preferred_peers_list: Option<Vec<PeerAddr>>, preferred_peers: &[PeerAddr],
) { ) {
// regularly check if we need to acquire more peers and if so, gets // regularly check if we need to acquire more peers and if so, gets
// them from db // them from db
@ -189,6 +191,7 @@ fn monitor_peers(
peers.clean_peers( peers.clean_peers(
config.peer_max_inbound_count() as usize, config.peer_max_inbound_count() as usize,
config.peer_max_outbound_count() as usize, config.peer_max_outbound_count() as usize,
preferred_peers,
); );
if peers.enough_outbound_peers() { if peers.enough_outbound_peers() {
@ -209,16 +212,14 @@ fn monitor_peers(
connected_peers.push(p.info.addr) connected_peers.push(p.info.addr)
} }
// Attempt to connect to preferred peers if there is some // Attempt to connect to any preferred peers.
if let Some(preferred_peers) = preferred_peers_list { for p in preferred_peers {
for p in preferred_peers { if !connected_peers.is_empty() {
if !connected_peers.is_empty() { if !connected_peers.contains(p) {
if !connected_peers.contains(&p) { tx.send(*p).unwrap();
tx.send(p).unwrap();
}
} else {
tx.send(p).unwrap();
} }
} else {
tx.send(*p).unwrap();
} }
} }
@ -257,7 +258,7 @@ fn connect_to_seeds_and_preferred_peers(
peers: Arc<p2p::Peers>, peers: Arc<p2p::Peers>,
tx: mpsc::Sender<PeerAddr>, tx: mpsc::Sender<PeerAddr>,
seed_list: Box<dyn Fn() -> Vec<PeerAddr>>, seed_list: Box<dyn Fn() -> Vec<PeerAddr>>,
peers_preferred_list: Option<Vec<PeerAddr>>, peers_preferred: &[PeerAddr],
) { ) {
// check if we have some peers in db // check if we have some peers in db
// look for peers that are able to give us other peers (via PEER_LIST capability) // look for peers that are able to give us other peers (via PEER_LIST capability)
@ -270,11 +271,8 @@ fn connect_to_seeds_and_preferred_peers(
seed_list() seed_list()
}; };
// If we have preferred peers add them to the connection // If we have preferred peers add them to the initial list
match peers_preferred_list { peer_addrs.extend_from_slice(peers_preferred);
Some(mut peers_preferred) => peer_addrs.append(&mut peers_preferred),
None => trace!("No preferred peers"),
};
if peer_addrs.is_empty() { if peer_addrs.is_empty() {
warn!("No seeds were retrieved."); warn!("No seeds were retrieved.");

View file

@ -248,13 +248,16 @@ impl Server {
_ => unreachable!(), _ => unreachable!(),
}; };
let preferred_peers = config.p2p_config.peers_preferred.clone().map(|p| p.peers); let preferred_peers = match &config.p2p_config.peers_preferred {
Some(addrs) => addrs.peers.clone(),
None => vec![],
};
connect_thread = Some(seed::connect_and_monitor( connect_thread = Some(seed::connect_and_monitor(
p2p_server.clone(), p2p_server.clone(),
config.p2p_config.capabilities, config.p2p_config.capabilities,
seeder, seeder,
preferred_peers, &preferred_peers,
stop_state.clone(), stop_state.clone(),
)?); )?);
} }