Maintain min outbound peers (min_preferred_peers / 2) (#2417)

* outbound connection count

* rustfmt

* display outbound peer count

* rustfmt

* wip - allow connections to exceed max when trying to create new outcound connections
any in excess of total max will be cleaned up subsequently

* fix

* we care about connected outbound peer count when deciding to connect to new peers
This commit is contained in:
Antioch Peverell 2019-01-19 16:13:09 +00:00 committed by GitHub
parent 74422efa5b
commit ef5d83817e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 39 additions and 9 deletions

View file

@ -171,7 +171,7 @@ impl Peers {
self.peers.read().get(addr).map(|p| p.clone()) self.peers.read().get(addr).map(|p| p.clone())
} }
/// Number of peers we're currently connected to. /// Number of peers currently connected to.
pub fn peer_count(&self) -> u32 { pub fn peer_count(&self) -> u32 {
self.peers self.peers
.read() .read()
@ -180,6 +180,15 @@ impl Peers {
.count() as u32 .count() as u32
} }
/// Number of outbound peers currently connected to.
pub fn peer_outbound_count(&self) -> u32 {
self.peers
.read()
.values()
.filter(|x| x.is_connected() && x.info.is_outbound())
.count() as u32
}
// Return vec of connected peers that currently advertise more work // Return vec of connected peers that currently advertise more work
// (total_difficulty) than we do. // (total_difficulty) than we do.
pub fn more_work_peers(&self) -> Vec<Arc<Peer>> { pub fn more_work_peers(&self) -> Vec<Arc<Peer>> {

View file

@ -273,6 +273,10 @@ impl PeerInfo {
self.live_info.read().total_difficulty self.live_info.read().total_difficulty
} }
pub fn is_outbound(&self) -> bool {
self.direction == Direction::Outbound
}
/// The current height of the peer. /// The current height of the peer.
pub fn height(&self) -> u64 { pub fn height(&self) -> u64 {
self.live_info.read().height self.live_info.read().height

View file

@ -186,8 +186,10 @@ fn monitor_peers(
// maintenance step first, clean up p2p server peers // maintenance step first, clean up p2p server peers
peers.clean_peers(config.peer_max_count() as usize); peers.clean_peers(config.peer_max_count() as usize);
// not enough peers, getting more from db // We have enough peers, both total connected and outbound connected so we are good.
if peers.peer_count() >= config.peer_min_preferred_count() { if peers.peer_count() >= config.peer_min_preferred_count()
&& peers.peer_outbound_count() >= config.peer_min_preferred_count() / 2
{
return; return;
} }
@ -309,14 +311,22 @@ fn listen_for_addrs(
rx: &mpsc::Receiver<SocketAddr>, rx: &mpsc::Receiver<SocketAddr>,
connecting_history: &mut HashMap<SocketAddr, DateTime<Utc>>, connecting_history: &mut HashMap<SocketAddr, DateTime<Utc>>,
) { ) {
if peers.peer_count() >= p2p.config.peer_max_count() { // Pull everything currently on the queue off the queue.
// clean the rx messages to avoid accumulating // Does not block so addrs may be empty.
for _ in rx.try_iter() {} // We will take(max_peers) from this later but we want to drain the rx queue
// here to prevent it backing up.
let addrs: Vec<SocketAddr> = rx.try_iter().collect();
// If we have a healthy number of outbound peers then we are done here.
if peers.peer_outbound_count() >= p2p.config.peer_min_preferred_count() / 2 {
return; return;
} }
// Try to connect to (up to max peers) peer addresses.
// Note: We drained the rx queue earlier to keep it under control.
// Even if there are many addresses to try we will only try a bounded number of them.
let connect_min_interval = 30; let connect_min_interval = 30;
for addr in rx.try_iter() { for addr in addrs.into_iter().take(p2p.config.peer_max_count() as usize) {
// ignore the duplicate connecting to same peer within 30 seconds // ignore the duplicate connecting to same peer within 30 seconds
let now = Utc::now(); let now = Utc::now();
if let Some(last_connect_time) = connecting_history.get(&addr) { if let Some(last_connect_time) = connecting_history.get(&addr) {

View file

@ -135,7 +135,6 @@ impl TUIStatusListener for TUIPeerView {
LinearLayout::new(Orientation::Vertical) LinearLayout::new(Orientation::Vertical)
.child( .child(
LinearLayout::new(Orientation::Horizontal) LinearLayout::new(Orientation::Horizontal)
.child(TextView::new("Total Peers: "))
.child(TextView::new(" ").with_id("peers_total")), .child(TextView::new(" ").with_id("peers_total")),
) )
.child( .child(
@ -179,7 +178,15 @@ impl TUIStatusListener for TUIPeerView {
}, },
); );
let _ = c.call_on_id("peers_total", |t: &mut TextView| { let _ = c.call_on_id("peers_total", |t: &mut TextView| {
t.set_content(stats.peer_stats.len().to_string()); t.set_content(format!(
"Total Peers: {} (Outbound: {})",
stats.peer_stats.len(),
stats
.peer_stats
.iter()
.filter(|x| x.direction == "Outbound")
.count(),
));
}); });
let _ = c.call_on_id("longest_work_peer", |t: &mut TextView| { let _ = c.call_on_id("longest_work_peer", |t: &mut TextView| {
t.set_content(lp_str); t.set_content(lp_str);