Improve checking for p2p connection limits (#2985)

* Add check for p2p connection limits

* Simplify undesirable connection shutdown

* Make inbound and outbound connections more explicit

* Cleanup inbound and outbound connections

* Cleanup an outbound peers check

* Rename healthy_peers_mix to enough_outbound_peers
This commit is contained in:
j01tz 2019-08-21 04:43:09 -07:00 committed by Antioch Peverell
parent ea023387bf
commit 24f0a52437
6 changed files with 133 additions and 58 deletions

View file

@ -274,12 +274,18 @@ fn comments() -> HashMap<String, String> {
#how long a banned peer should stay banned
#ban_window = 10800
#maximum number of peers
#peer_max_count = 125
#maximum number of inbound peer connections
#peer_max_inbound_count = 128
#preferred minimum number of peers (we'll actively keep trying to add peers
#until we get to at least this number
#peer_min_preferred_count = 8
#maximum number of outbound peer connections
#peer_max_outbound_count = 8
#preferred minimum number of outbound peers (we'll actively keep trying to add peers
#until we get to at least this number)
#peer_min_preferred_outbound_count = 8
#amount of incoming connections temporarily allowed to exceed peer_max_inbound_count
#peer_listener_buffer_count = 8
# 15 = Bit flags for FULL_NODE
#This structure needs to be changed internally, to make it more configurable

View file

@ -126,6 +126,7 @@ impl Peers {
res
}
/// Get vec of peers we currently have an outgoing connection with.
pub fn outgoing_connected_peers(&self) -> Vec<Arc<Peer>> {
self.connected_peers()
.into_iter()
@ -133,6 +134,14 @@ impl Peers {
.collect()
}
/// Get vec of peers we currently have an incoming connection with.
pub fn incoming_connected_peers(&self) -> Vec<Arc<Peer>> {
self.connected_peers()
.into_iter()
.filter(|x| x.info.is_inbound())
.collect()
}
/// Get a peer we're connected to by address.
pub fn get_connected_peer(&self, addr: PeerAddr) -> Option<Arc<Peer>> {
let peers = match self.peers.try_read_for(LOCK_TIMEOUT) {
@ -155,6 +164,11 @@ impl Peers {
self.outgoing_connected_peers().len() as u32
}
/// Number of inbound peers currently connected to.
pub fn peer_inbound_count(&self) -> u32 {
self.incoming_connected_peers().len() as u32
}
// Return vec of connected peers that currently advertise more work
// (total_difficulty) than we do.
pub fn more_work_peers(&self) -> Result<Vec<Arc<Peer>>, chain::Error> {
@ -324,7 +338,8 @@ impl Peers {
/// A peer implementation may drop the broadcast request
/// if it knows the remote peer already has the block.
pub fn broadcast_compact_block(&self, b: &core::CompactBlock) {
let num_peers = self.config.peer_max_count();
let num_peers =
self.config.peer_max_inbound_count() + self.config.peer_max_outbound_count();
let count = self.broadcast("compact block", num_peers, |p| p.send_compact_block(b));
debug!(
"broadcast_compact_block: {}, {} at {}, to {} peers, done.",
@ -341,7 +356,7 @@ impl Peers {
/// A peer implementation may drop the broadcast request
/// if it knows the remote peer already has the header.
pub fn broadcast_header(&self, bh: &core::BlockHeader) {
let num_peers = self.config.peer_min_preferred_count();
let num_peers = self.config.peer_min_preferred_outbound_count();
let count = self.broadcast("header", num_peers, |p| p.send_header(bh));
debug!(
"broadcast_header: {}, {} at {}, to {} peers, done.",
@ -358,7 +373,8 @@ impl Peers {
/// A peer implementation may drop the broadcast request
/// if it knows the remote peer already has the transaction.
pub fn broadcast_transaction(&self, tx: &core::Transaction) {
let num_peers = self.config.peer_max_count();
let num_peers =
self.config.peer_max_inbound_count() + self.config.peer_max_outbound_count();
let count = self.broadcast("transaction", num_peers, |p| p.send_transaction(tx));
debug!(
"broadcast_transaction: {} to {} peers, done.",
@ -433,7 +449,7 @@ impl Peers {
/// Iterate over the peer list and prune all peers we have
/// lost connection to or have been deemed problematic.
/// Also avoid connected peer count getting too high.
pub fn clean_peers(&self, max_count: usize) {
pub fn clean_peers(&self, max_inbound_count: usize, max_outbound_count: usize) {
let mut rm = vec![];
// build a list of peers to be cleaned up
@ -477,16 +493,27 @@ impl Peers {
}
}
// ensure we do not still have too many connected peers
let excess_count = (self.peer_count() as usize)
.saturating_sub(rm.len())
.saturating_sub(max_count);
if excess_count > 0 {
// map peers to addrs in a block to bound how long we keep the read lock for
// check here to make sure we don't have too many outgoing connections
let excess_outgoing_count =
(self.peer_outbound_count() as usize).saturating_sub(max_outbound_count);
if excess_outgoing_count > 0 {
let mut addrs = self
.connected_peers()
.outgoing_connected_peers()
.iter()
.take(excess_count)
.take(excess_outgoing_count)
.map(|x| x.info.addr.clone())
.collect::<Vec<_>>();
rm.append(&mut addrs);
}
// check here to make sure we don't have too many incoming connections
let excess_incoming_count =
(self.peer_inbound_count() as usize).saturating_sub(max_inbound_count);
if excess_incoming_count > 0 {
let mut addrs = self
.incoming_connected_peers()
.iter()
.take(excess_incoming_count)
.map(|x| x.info.addr.clone())
.collect::<Vec<_>>();
rm.append(&mut addrs);
@ -518,14 +545,9 @@ impl Peers {
}
}
pub fn enough_peers(&self) -> bool {
self.peer_count() >= self.config.peer_min_preferred_count()
}
/// We have enough peers, both total connected and outbound connected
pub fn healthy_peers_mix(&self) -> bool {
self.enough_peers()
&& self.peer_outbound_count() >= self.config.peer_min_preferred_count() / 2
/// We have enough outbound connected peers
pub fn enough_outbound_peers(&self) -> bool {
self.peer_outbound_count() >= self.config.peer_min_preferred_outbound_count()
}
/// Removes those peers that seem to have expired

View file

@ -87,6 +87,10 @@ impl Server {
let peer_addr = PeerAddr(peer_addr);
if self.check_undesirable(&stream) {
// Shutdown the incoming TCP connection if it is not desired
if let Err(e) = stream.shutdown(Shutdown::Both) {
debug!("Error shutting down conn: {:?}", e);
}
continue;
}
match self.handle_new_peer(stream) {
@ -194,30 +198,34 @@ impl Server {
Ok(())
}
/// Checks whether there's any reason we don't want to accept a peer
/// connection. There can be a couple of them:
/// 1. The peer has been previously banned and the ban period hasn't
/// Checks whether there's any reason we don't want to accept an incoming peer
/// connection. There can be a few of them:
/// 1. Accepting the peer connection would exceed the configured maximum allowed
/// inbound peer count. Note that seed nodes may wish to increase the default
/// value for PEER_LISTENER_BUFFER_COUNT to help with network bootstrapping.
/// A default buffer of 8 peers is allowed to help with network growth.
/// 2. The peer has been previously banned and the ban period hasn't
/// expired yet.
/// 2. We're already connected to a peer at the same IP. While there are
/// 3. We're already connected to a peer at the same IP. While there are
/// many reasons multiple peers can legitimately share identical IP
/// addresses (NAT), network distribution is improved if they choose
/// different sets of peers themselves. In addition, it prevent potential
/// duplicate connections, malicious or not.
fn check_undesirable(&self, stream: &TcpStream) -> bool {
if self.peers.peer_inbound_count()
>= self.config.peer_max_inbound_count() + self.config.peer_listener_buffer_count()
{
debug!("Accepting new connection will exceed peer limit, refusing connection.");
return true;
}
if let Ok(peer_addr) = stream.peer_addr() {
let peer_addr = PeerAddr(peer_addr);
if self.peers.is_banned(peer_addr) {
debug!("Peer {} banned, refusing connection.", peer_addr);
if let Err(e) = stream.shutdown(Shutdown::Both) {
debug!("Error shutting down conn: {:?}", e);
}
return true;
}
if self.peers.is_known(peer_addr) {
debug!("Peer {} already known, refusing connection.", peer_addr);
if let Err(e) = stream.shutdown(Shutdown::Both) {
debug!("Error shutting down conn: {:?}", e);
}
return true;
}
}

View file

@ -48,11 +48,18 @@ pub const MAX_LOCATORS: u32 = 20;
/// How long a banned peer should be banned for
const BAN_WINDOW: i64 = 10800;
/// The max peer count
const PEER_MAX_COUNT: u32 = 125;
/// The max inbound peer count
const PEER_MAX_INBOUND_COUNT: u32 = 128;
/// min preferred peer count
const PEER_MIN_PREFERRED_COUNT: u32 = 8;
/// The max outbound peer count
const PEER_MAX_OUTBOUND_COUNT: u32 = 8;
/// The min preferred outbound peer count
const PEER_MIN_PREFERRED_OUTBOUND_COUNT: u32 = 8;
/// The peer listener buffer count. Allows temporarily accepting more connections
/// than allowed by PEER_MAX_INBOUND_COUNT to encourage network bootstrapping.
const PEER_LISTENER_BUFFER_COUNT: u32 = 8;
#[derive(Debug)]
pub enum Error {
@ -229,9 +236,13 @@ pub struct P2PConfig {
pub ban_window: Option<i64>,
pub peer_max_count: Option<u32>,
pub peer_max_inbound_count: Option<u32>,
pub peer_min_preferred_count: Option<u32>,
pub peer_max_outbound_count: Option<u32>,
pub peer_min_preferred_outbound_count: Option<u32>,
pub peer_listener_buffer_count: Option<u32>,
pub dandelion_peer: Option<PeerAddr>,
}
@ -250,8 +261,10 @@ impl Default for P2PConfig {
peers_deny: None,
peers_preferred: None,
ban_window: None,
peer_max_count: None,
peer_min_preferred_count: None,
peer_max_inbound_count: None,
peer_max_outbound_count: None,
peer_min_preferred_outbound_count: None,
peer_listener_buffer_count: None,
dandelion_peer: None,
}
}
@ -268,19 +281,35 @@ impl P2PConfig {
}
}
/// return peer_max_count
pub fn peer_max_count(&self) -> u32 {
match self.peer_max_count {
/// return maximum inbound peer connections count
pub fn peer_max_inbound_count(&self) -> u32 {
match self.peer_max_inbound_count {
Some(n) => n,
None => PEER_MAX_COUNT,
None => PEER_MAX_INBOUND_COUNT,
}
}
/// return peer_preferred_count
pub fn peer_min_preferred_count(&self) -> u32 {
match self.peer_min_preferred_count {
/// return maximum outbound peer connections count
pub fn peer_max_outbound_count(&self) -> u32 {
match self.peer_max_outbound_count {
Some(n) => n,
None => PEER_MIN_PREFERRED_COUNT,
None => PEER_MAX_OUTBOUND_COUNT,
}
}
/// return minimum preferred outbound peer count
pub fn peer_min_preferred_outbound_count(&self) -> u32 {
match self.peer_min_preferred_outbound_count {
Some(n) => n,
None => PEER_MIN_PREFERRED_OUTBOUND_COUNT,
}
}
/// return peer buffer count for listener
pub fn peer_listener_buffer_count(&self) -> u32 {
match self.peer_listener_buffer_count {
Some(n) => n,
None => PEER_LISTENER_BUFFER_COUNT,
}
}
}
@ -398,6 +427,10 @@ impl PeerInfo {
self.direction == Direction::Outbound
}
pub fn is_inbound(&self) -> bool {
self.direction == Direction::Inbound
}
/// The current height of the peer.
pub fn height(&self) -> u64 {
self.live_info.read().height

View file

@ -185,9 +185,12 @@ fn monitor_peers(
);
// maintenance step first, clean up p2p server peers
peers.clean_peers(config.peer_max_count() as usize);
peers.clean_peers(
config.peer_max_inbound_count() as usize,
config.peer_max_outbound_count() as usize,
);
if peers.healthy_peers_mix() {
if peers.enough_outbound_peers() {
return;
}
@ -230,7 +233,7 @@ fn monitor_peers(
let new_peers = peers.find_peers(
p2p::State::Healthy,
p2p::Capabilities::UNKNOWN,
config.peer_max_count() as usize,
config.peer_max_outbound_count() as usize,
);
for p in new_peers.iter().filter(|p| !peers.is_known(p.addr)) {
@ -296,15 +299,18 @@ fn listen_for_addrs(
let addrs: Vec<PeerAddr> = rx.try_iter().collect();
// If we have a healthy number of outbound peers then we are done here.
if peers.peer_count() > peers.peer_outbound_count() && peers.healthy_peers_mix() {
if peers.enough_outbound_peers() {
return;
}
// Try to connect to (up to max peers) peer addresses.
// Try to connect to (up to max outbound peers) peer addresses.
// Note: We drained the rx queue earlier to keep it under control.
// Even if there are many addresses to try we will only try a bounded number of them.
let connect_min_interval = 30;
for addr in addrs.into_iter().take(p2p.config.peer_max_count() as usize) {
for addr in addrs
.into_iter()
.take(p2p.config.peer_max_outbound_count() as usize)
{
// ignore the duplicate connecting to same peer within 30 seconds
let now = Utc::now();
if let Some(last_connect_time) = connecting_history.get(&addr) {

View file

@ -86,7 +86,7 @@ impl SyncRunner {
// * timeout
if wp > MIN_PEERS
|| (wp == 0
&& self.peers.enough_peers()
&& self.peers.enough_outbound_peers()
&& head.total_difficulty > Difficulty::zero())
|| n > wait_secs
{