diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index 0a094a4ef..449ae355b 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -171,7 +171,7 @@ impl Peers { self.peers.read().get(addr).map(|p| p.clone()) } - /// Number of peers we're currently connected to. + /// Number of peers currently connected to. pub fn peer_count(&self) -> u32 { self.peers .read() @@ -180,6 +180,15 @@ impl Peers { .count() as u32 } + /// Number of outbound peers currently connected to. + pub fn peer_outbound_count(&self) -> u32 { + self.peers + .read() + .values() + .filter(|x| x.is_connected() && x.info.is_outbound()) + .count() as u32 + } + // Return vec of connected peers that currently advertise more work // (total_difficulty) than we do. pub fn more_work_peers(&self) -> Vec> { diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 91361682d..60c6eef5c 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -273,6 +273,10 @@ impl PeerInfo { self.live_info.read().total_difficulty } + pub fn is_outbound(&self) -> bool { + self.direction == Direction::Outbound + } + /// The current height of the peer. pub fn height(&self) -> u64 { self.live_info.read().height diff --git a/servers/src/grin/seed.rs b/servers/src/grin/seed.rs index eca7b9eb7..5af7df96e 100644 --- a/servers/src/grin/seed.rs +++ b/servers/src/grin/seed.rs @@ -186,8 +186,10 @@ fn monitor_peers( // maintenance step first, clean up p2p server peers peers.clean_peers(config.peer_max_count() as usize); - // not enough peers, getting more from db - if peers.peer_count() >= config.peer_min_preferred_count() { + // We have enough peers, both total connected and outbound connected so we are good. + if peers.peer_count() >= config.peer_min_preferred_count() + && peers.peer_outbound_count() >= config.peer_min_preferred_count() / 2 + { return; } @@ -309,14 +311,22 @@ fn listen_for_addrs( rx: &mpsc::Receiver, connecting_history: &mut HashMap>, ) { - if peers.peer_count() >= p2p.config.peer_max_count() { - // clean the rx messages to avoid accumulating - for _ in rx.try_iter() {} + // Pull everything currently on the queue off the queue. + // Does not block so addrs may be empty. + // We will take(max_peers) from this later but we want to drain the rx queue + // here to prevent it backing up. + let addrs: Vec = rx.try_iter().collect(); + + // If we have a healthy number of outbound peers then we are done here. + if peers.peer_outbound_count() >= p2p.config.peer_min_preferred_count() / 2 { return; } + // Try to connect to (up to max peers) peer addresses. + // Note: We drained the rx queue earlier to keep it under control. + // Even if there are many addresses to try we will only try a bounded number of them. let connect_min_interval = 30; - for addr in rx.try_iter() { + for addr in addrs.into_iter().take(p2p.config.peer_max_count() as usize) { // ignore the duplicate connecting to same peer within 30 seconds let now = Utc::now(); if let Some(last_connect_time) = connecting_history.get(&addr) { diff --git a/src/bin/tui/peers.rs b/src/bin/tui/peers.rs index 8833dd5d1..ddff969c2 100644 --- a/src/bin/tui/peers.rs +++ b/src/bin/tui/peers.rs @@ -135,7 +135,6 @@ impl TUIStatusListener for TUIPeerView { LinearLayout::new(Orientation::Vertical) .child( LinearLayout::new(Orientation::Horizontal) - .child(TextView::new("Total Peers: ")) .child(TextView::new(" ").with_id("peers_total")), ) .child( @@ -179,7 +178,15 @@ impl TUIStatusListener for TUIPeerView { }, ); let _ = c.call_on_id("peers_total", |t: &mut TextView| { - t.set_content(stats.peer_stats.len().to_string()); + t.set_content(format!( + "Total Peers: {} (Outbound: {})", + stats.peer_stats.len(), + stats + .peer_stats + .iter() + .filter(|x| x.direction == "Outbound") + .count(), + )); }); let _ = c.call_on_id("longest_work_peer", |t: &mut TextView| { t.set_content(lp_str);