diff --git a/config/src/comments.rs b/config/src/comments.rs index b0e9f7f37..1262b12e9 100644 --- a/config/src/comments.rs +++ b/config/src/comments.rs @@ -274,18 +274,12 @@ fn comments() -> HashMap { #how long a banned peer should stay banned #ban_window = 10800 -#maximum number of inbound peer connections -#peer_max_inbound_count = 128 +#maximum number of peers +#peer_max_count = 125 -#maximum number of outbound peer connections -#peer_max_outbound_count = 8 - -#preferred minimum number of outbound peers (we'll actively keep trying to add peers -#until we get to at least this number) -#peer_min_preferred_outbound_count = 8 - -#amount of incoming connections temporarily allowed to exceed peer_max_inbound_count -#peer_listener_buffer_count = 8 +#preferred minimum number of peers (we'll actively keep trying to add peers +#until we get to at least this number +#peer_min_preferred_count = 8 # 15 = Bit flags for FULL_NODE #This structure needs to be changed internally, to make it more configurable diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index 4b5f16d5a..5c1ec247a 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -126,7 +126,6 @@ impl Peers { res } - /// Get vec of peers we currently have an outgoing connection with. pub fn outgoing_connected_peers(&self) -> Vec> { self.connected_peers() .into_iter() @@ -134,14 +133,6 @@ impl Peers { .collect() } - /// Get vec of peers we currently have an incoming connection with. - pub fn incoming_connected_peers(&self) -> Vec> { - self.connected_peers() - .into_iter() - .filter(|x| x.info.is_inbound()) - .collect() - } - /// Get a peer we're connected to by address. pub fn get_connected_peer(&self, addr: PeerAddr) -> Option> { let peers = match self.peers.try_read_for(LOCK_TIMEOUT) { @@ -164,11 +155,6 @@ impl Peers { self.outgoing_connected_peers().len() as u32 } - /// Number of inbound peers currently connected to. - pub fn peer_inbound_count(&self) -> u32 { - self.incoming_connected_peers().len() as u32 - } - // Return vec of connected peers that currently advertise more work // (total_difficulty) than we do. pub fn more_work_peers(&self) -> Result>, chain::Error> { @@ -338,8 +324,7 @@ impl Peers { /// A peer implementation may drop the broadcast request /// if it knows the remote peer already has the block. pub fn broadcast_compact_block(&self, b: &core::CompactBlock) { - let num_peers = - self.config.peer_max_inbound_count() + self.config.peer_max_outbound_count(); + let num_peers = self.config.peer_max_count(); let count = self.broadcast("compact block", num_peers, |p| p.send_compact_block(b)); debug!( "broadcast_compact_block: {}, {} at {}, to {} peers, done.", @@ -356,7 +341,7 @@ impl Peers { /// A peer implementation may drop the broadcast request /// if it knows the remote peer already has the header. pub fn broadcast_header(&self, bh: &core::BlockHeader) { - let num_peers = self.config.peer_min_preferred_outbound_count(); + let num_peers = self.config.peer_min_preferred_count(); let count = self.broadcast("header", num_peers, |p| p.send_header(bh)); debug!( "broadcast_header: {}, {} at {}, to {} peers, done.", @@ -373,8 +358,7 @@ impl Peers { /// A peer implementation may drop the broadcast request /// if it knows the remote peer already has the transaction. pub fn broadcast_transaction(&self, tx: &core::Transaction) { - let num_peers = - self.config.peer_max_inbound_count() + self.config.peer_max_outbound_count(); + let num_peers = self.config.peer_max_count(); let count = self.broadcast("transaction", num_peers, |p| p.send_transaction(tx)); debug!( "broadcast_transaction: {} to {} peers, done.", @@ -449,7 +433,7 @@ impl Peers { /// Iterate over the peer list and prune all peers we have /// lost connection to or have been deemed problematic. /// Also avoid connected peer count getting too high. - pub fn clean_peers(&self, max_inbound_count: usize, max_outbound_count: usize) { + pub fn clean_peers(&self, max_count: usize) { let mut rm = vec![]; // build a list of peers to be cleaned up @@ -493,27 +477,16 @@ impl Peers { } } - // check here to make sure we don't have too many outgoing connections - let excess_outgoing_count = - (self.peer_outbound_count() as usize).saturating_sub(max_outbound_count); - if excess_outgoing_count > 0 { + // ensure we do not still have too many connected peers + let excess_count = (self.peer_count() as usize) + .saturating_sub(rm.len()) + .saturating_sub(max_count); + if excess_count > 0 { + // map peers to addrs in a block to bound how long we keep the read lock for let mut addrs = self - .outgoing_connected_peers() + .connected_peers() .iter() - .take(excess_outgoing_count) - .map(|x| x.info.addr.clone()) - .collect::>(); - rm.append(&mut addrs); - } - - // check here to make sure we don't have too many incoming connections - let excess_incoming_count = - (self.peer_inbound_count() as usize).saturating_sub(max_inbound_count); - if excess_incoming_count > 0 { - let mut addrs = self - .incoming_connected_peers() - .iter() - .take(excess_incoming_count) + .take(excess_count) .map(|x| x.info.addr.clone()) .collect::>(); rm.append(&mut addrs); @@ -545,9 +518,14 @@ impl Peers { } } - /// We have enough outbound connected peers - pub fn enough_outbound_peers(&self) -> bool { - self.peer_outbound_count() >= self.config.peer_min_preferred_outbound_count() + pub fn enough_peers(&self) -> bool { + self.peer_count() >= self.config.peer_min_preferred_count() + } + + /// We have enough peers, both total connected and outbound connected + pub fn healthy_peers_mix(&self) -> bool { + self.enough_peers() + && self.peer_outbound_count() >= self.config.peer_min_preferred_count() / 2 } /// Removes those peers that seem to have expired diff --git a/p2p/src/serv.rs b/p2p/src/serv.rs index a8a48ec07..7da4eebba 100644 --- a/p2p/src/serv.rs +++ b/p2p/src/serv.rs @@ -87,10 +87,6 @@ impl Server { let peer_addr = PeerAddr(peer_addr); if self.check_undesirable(&stream) { - // Shutdown the incoming TCP connection if it is not desired - if let Err(e) = stream.shutdown(Shutdown::Both) { - debug!("Error shutting down conn: {:?}", e); - } continue; } match self.handle_new_peer(stream) { @@ -198,34 +194,30 @@ impl Server { Ok(()) } - /// Checks whether there's any reason we don't want to accept an incoming peer - /// connection. There can be a few of them: - /// 1. Accepting the peer connection would exceed the configured maximum allowed - /// inbound peer count. Note that seed nodes may wish to increase the default - /// value for PEER_LISTENER_BUFFER_COUNT to help with network bootstrapping. - /// A default buffer of 8 peers is allowed to help with network growth. - /// 2. The peer has been previously banned and the ban period hasn't + /// Checks whether there's any reason we don't want to accept a peer + /// connection. There can be a couple of them: + /// 1. The peer has been previously banned and the ban period hasn't /// expired yet. - /// 3. We're already connected to a peer at the same IP. While there are + /// 2. We're already connected to a peer at the same IP. While there are /// many reasons multiple peers can legitimately share identical IP /// addresses (NAT), network distribution is improved if they choose /// different sets of peers themselves. In addition, it prevent potential /// duplicate connections, malicious or not. fn check_undesirable(&self, stream: &TcpStream) -> bool { - if self.peers.peer_inbound_count() - >= self.config.peer_max_inbound_count() + self.config.peer_listener_buffer_count() - { - debug!("Accepting new connection will exceed peer limit, refusing connection."); - return true; - } if let Ok(peer_addr) = stream.peer_addr() { let peer_addr = PeerAddr(peer_addr); if self.peers.is_banned(peer_addr) { debug!("Peer {} banned, refusing connection.", peer_addr); + if let Err(e) = stream.shutdown(Shutdown::Both) { + debug!("Error shutting down conn: {:?}", e); + } return true; } if self.peers.is_known(peer_addr) { debug!("Peer {} already known, refusing connection.", peer_addr); + if let Err(e) = stream.shutdown(Shutdown::Both) { + debug!("Error shutting down conn: {:?}", e); + } return true; } } diff --git a/p2p/src/types.rs b/p2p/src/types.rs index bcd92a4d7..735d90c0f 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -48,18 +48,11 @@ pub const MAX_LOCATORS: u32 = 20; /// How long a banned peer should be banned for const BAN_WINDOW: i64 = 10800; -/// The max inbound peer count -const PEER_MAX_INBOUND_COUNT: u32 = 128; +/// The max peer count +const PEER_MAX_COUNT: u32 = 125; -/// The max outbound peer count -const PEER_MAX_OUTBOUND_COUNT: u32 = 8; - -/// The min preferred outbound peer count -const PEER_MIN_PREFERRED_OUTBOUND_COUNT: u32 = 8; - -/// The peer listener buffer count. Allows temporarily accepting more connections -/// than allowed by PEER_MAX_INBOUND_COUNT to encourage network bootstrapping. -const PEER_LISTENER_BUFFER_COUNT: u32 = 8; +/// min preferred peer count +const PEER_MIN_PREFERRED_COUNT: u32 = 8; #[derive(Debug)] pub enum Error { @@ -236,13 +229,9 @@ pub struct P2PConfig { pub ban_window: Option, - pub peer_max_inbound_count: Option, + pub peer_max_count: Option, - pub peer_max_outbound_count: Option, - - pub peer_min_preferred_outbound_count: Option, - - pub peer_listener_buffer_count: Option, + pub peer_min_preferred_count: Option, pub dandelion_peer: Option, } @@ -261,10 +250,8 @@ impl Default for P2PConfig { peers_deny: None, peers_preferred: None, ban_window: None, - peer_max_inbound_count: None, - peer_max_outbound_count: None, - peer_min_preferred_outbound_count: None, - peer_listener_buffer_count: None, + peer_max_count: None, + peer_min_preferred_count: None, dandelion_peer: None, } } @@ -281,35 +268,19 @@ impl P2PConfig { } } - /// return maximum inbound peer connections count - pub fn peer_max_inbound_count(&self) -> u32 { - match self.peer_max_inbound_count { + /// return peer_max_count + pub fn peer_max_count(&self) -> u32 { + match self.peer_max_count { Some(n) => n, - None => PEER_MAX_INBOUND_COUNT, + None => PEER_MAX_COUNT, } } - /// return maximum outbound peer connections count - pub fn peer_max_outbound_count(&self) -> u32 { - match self.peer_max_outbound_count { + /// return peer_preferred_count + pub fn peer_min_preferred_count(&self) -> u32 { + match self.peer_min_preferred_count { Some(n) => n, - None => PEER_MAX_OUTBOUND_COUNT, - } - } - - /// return minimum preferred outbound peer count - pub fn peer_min_preferred_outbound_count(&self) -> u32 { - match self.peer_min_preferred_outbound_count { - Some(n) => n, - None => PEER_MIN_PREFERRED_OUTBOUND_COUNT, - } - } - - /// return peer buffer count for listener - pub fn peer_listener_buffer_count(&self) -> u32 { - match self.peer_listener_buffer_count { - Some(n) => n, - None => PEER_LISTENER_BUFFER_COUNT, + None => PEER_MIN_PREFERRED_COUNT, } } } @@ -427,10 +398,6 @@ impl PeerInfo { self.direction == Direction::Outbound } - pub fn is_inbound(&self) -> bool { - self.direction == Direction::Inbound - } - /// The current height of the peer. pub fn height(&self) -> u64 { self.live_info.read().height diff --git a/servers/src/grin/seed.rs b/servers/src/grin/seed.rs index 3ad3fa28a..5ccd9af99 100644 --- a/servers/src/grin/seed.rs +++ b/servers/src/grin/seed.rs @@ -185,12 +185,9 @@ fn monitor_peers( ); // maintenance step first, clean up p2p server peers - peers.clean_peers( - config.peer_max_inbound_count() as usize, - config.peer_max_outbound_count() as usize, - ); + peers.clean_peers(config.peer_max_count() as usize); - if peers.enough_outbound_peers() { + if peers.healthy_peers_mix() { return; } @@ -233,7 +230,7 @@ fn monitor_peers( let new_peers = peers.find_peers( p2p::State::Healthy, p2p::Capabilities::UNKNOWN, - config.peer_max_outbound_count() as usize, + config.peer_max_count() as usize, ); for p in new_peers.iter().filter(|p| !peers.is_known(p.addr)) { @@ -299,18 +296,15 @@ fn listen_for_addrs( let addrs: Vec = rx.try_iter().collect(); // If we have a healthy number of outbound peers then we are done here. - if peers.enough_outbound_peers() { + if peers.peer_count() > peers.peer_outbound_count() && peers.healthy_peers_mix() { return; } - // Try to connect to (up to max outbound peers) peer addresses. + // Try to connect to (up to max peers) peer addresses. // Note: We drained the rx queue earlier to keep it under control. // Even if there are many addresses to try we will only try a bounded number of them. let connect_min_interval = 30; - for addr in addrs - .into_iter() - .take(p2p.config.peer_max_outbound_count() as usize) - { + for addr in addrs.into_iter().take(p2p.config.peer_max_count() as usize) { // ignore the duplicate connecting to same peer within 30 seconds let now = Utc::now(); if let Some(last_connect_time) = connecting_history.get(&addr) { diff --git a/servers/src/grin/sync/syncer.rs b/servers/src/grin/sync/syncer.rs index 81552b78a..1564b4e8a 100644 --- a/servers/src/grin/sync/syncer.rs +++ b/servers/src/grin/sync/syncer.rs @@ -86,7 +86,7 @@ impl SyncRunner { // * timeout if wp > MIN_PEERS || (wp == 0 - && self.peers.enough_outbound_peers() + && self.peers.enough_peers() && head.total_difficulty > Difficulty::zero()) || n > wait_secs {