Flexible peer filtering (#3458)

* first pass at peers iter cleanup

* more flexible peer with diff lookups

* PeersIter with impl Iterator

* sync against outbound peers
reorder peers filters so expensive filters come later

* filter peers by capabilities during sync

* prefer outbound peers with high total difficulty

* with_difficulty now takes a fn to allow more flexible comparisons based on difficulty

* rename peers_iter() -> iter()
This commit is contained in:
Antioch Peverell 2020-10-27 12:36:00 +00:00 committed by GitHub
parent cea546ceb8
commit 25fcefada2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 235 additions and 183 deletions

View file

@ -28,7 +28,7 @@ pub struct PeersAllHandler {
impl Handler for PeersAllHandler {
fn get(&self, _req: Request<Body>) -> ResponseFuture {
let peers = &w_fut!(&self.peers).all_peers();
let peers = &w_fut!(&self.peers).all_peer_data();
json_response_pretty(&peers)
}
}
@ -40,8 +40,9 @@ pub struct PeersConnectedHandler {
impl PeersConnectedHandler {
pub fn get_connected_peers(&self) -> Result<Vec<PeerInfoDisplay>, Error> {
let peers = w(&self.peers)?
.connected_peers()
.iter()
.connected()
.into_iter()
.map(|p| p.info.clone().into())
.collect::<Vec<PeerInfoDisplay>>();
Ok(peers)
@ -51,8 +52,9 @@ impl PeersConnectedHandler {
impl Handler for PeersConnectedHandler {
fn get(&self, _req: Request<Body>) -> ResponseFuture {
let peers: Vec<PeerInfoDisplay> = w_fut!(&self.peers)
.connected_peers()
.iter()
.connected()
.into_iter()
.map(|p| p.info.clone().into())
.collect();
json_response(&peers)
@ -77,7 +79,7 @@ impl PeerHandler {
})?;
return Ok(vec![peer_data]);
}
let peers = w(&self.peers)?.all_peers();
let peers = w(&self.peers)?.all_peer_data();
Ok(peers)
}

View file

@ -21,6 +21,7 @@ use crate::types::*;
use crate::web::*;
use hyper::{Body, Request};
use serde_json::json;
use std::convert::TryInto;
use std::sync::Weak;
// RESTful index of available api endpoints
@ -54,7 +55,12 @@ impl StatusHandler {
let (api_sync_status, api_sync_info) = sync_status_to_api(sync_status);
Ok(Status::from_tip_and_peers(
head,
w(&self.peers)?.peer_count(),
w(&self.peers)?
.iter()
.connected()
.count()
.try_into()
.unwrap(),
api_sync_status,
api_sync_info,
))

View file

@ -18,8 +18,7 @@ use std::fs::File;
use std::path::PathBuf;
use std::sync::Arc;
use rand::seq::SliceRandom;
use rand::thread_rng;
use rand::prelude::*;
use crate::chain;
use crate::core::core;
@ -105,139 +104,32 @@ impl Peers {
Ok(peers.contains_key(&addr))
}
/// Get vec of peers we are currently connected to.
pub fn connected_peers(&self) -> Vec<Arc<Peer>> {
/// Iterator over our current peers.
/// This allows us to hide try_read_for() behind a cleaner interface.
/// PeersIter lets us chain various adaptors for convenience.
pub fn iter(&self) -> PeersIter<impl Iterator<Item = Arc<Peer>>> {
let peers = match self.peers.try_read_for(LOCK_TIMEOUT) {
Some(peers) => peers,
Some(peers) => peers.values().cloned().collect(),
None => {
error!("connected_peers: failed to get peers lock");
return vec![];
vec![]
}
};
let mut res = peers
.values()
.filter(|p| p.is_connected())
.cloned()
.collect::<Vec<_>>();
res.shuffle(&mut thread_rng());
res
}
/// Get vec of peers we currently have an outgoing connection with.
pub fn outgoing_connected_peers(&self) -> Vec<Arc<Peer>> {
self.connected_peers()
.into_iter()
.filter(|x| x.info.is_outbound())
.collect()
}
/// Get vec of peers we currently have an incoming connection with.
pub fn incoming_connected_peers(&self) -> Vec<Arc<Peer>> {
self.connected_peers()
.into_iter()
.filter(|x| x.info.is_inbound())
.collect()
PeersIter {
iter: peers.into_iter(),
}
}
/// Get a peer we're connected to by address.
pub fn get_connected_peer(&self, addr: PeerAddr) -> Option<Arc<Peer>> {
let peers = match self.peers.try_read_for(LOCK_TIMEOUT) {
Some(peers) => peers,
None => {
error!("get_connected_peer: failed to get peers lock");
return None;
}
};
peers.get(&addr).cloned()
self.iter().connected().by_addr(addr)
}
/// Number of peers currently connected to.
pub fn peer_count(&self) -> u32 {
self.connected_peers().len() as u32
}
/// Number of outbound peers currently connected to.
pub fn peer_outbound_count(&self) -> u32 {
self.outgoing_connected_peers().len() as u32
}
/// Number of inbound peers currently connected to.
pub fn peer_inbound_count(&self) -> u32 {
self.incoming_connected_peers().len() as u32
}
// Return vec of connected peers that currently advertise more work
// (total_difficulty) than we do.
pub fn more_work_peers(&self) -> Result<Vec<Arc<Peer>>, chain::Error> {
let peers = self.connected_peers();
if peers.is_empty() {
return Ok(vec![]);
}
let total_difficulty = self.total_difficulty()?;
let mut max_peers = peers
.into_iter()
.filter(|x| x.info.total_difficulty() > total_difficulty)
.collect::<Vec<_>>();
max_peers.shuffle(&mut thread_rng());
Ok(max_peers)
}
// Return number of connected peers that currently advertise more/same work
// (total_difficulty) than/as we do.
pub fn more_or_same_work_peers(&self) -> Result<usize, chain::Error> {
let peers = self.connected_peers();
if peers.is_empty() {
return Ok(0);
}
let total_difficulty = self.total_difficulty()?;
Ok(peers
.iter()
.filter(|x| x.info.total_difficulty() >= total_difficulty)
.count())
}
/// Returns single random peer with more work than us.
pub fn more_work_peer(&self) -> Option<Arc<Peer>> {
match self.more_work_peers() {
Ok(mut peers) => peers.pop(),
Err(e) => {
error!("failed to get more work peers: {:?}", e);
None
}
}
}
/// Return vec of connected peers that currently have the most worked
/// branch, showing the highest total difficulty.
pub fn most_work_peers(&self) -> Vec<Arc<Peer>> {
let peers = self.connected_peers();
if peers.is_empty() {
return vec![];
}
let max_total_difficulty = match peers.iter().map(|x| x.info.total_difficulty()).max() {
Some(v) => v,
None => return vec![],
};
let mut max_peers = peers
.into_iter()
.filter(|x| x.info.total_difficulty() == max_total_difficulty)
.collect::<Vec<_>>();
max_peers.shuffle(&mut thread_rng());
max_peers
}
/// Returns single random peer with the most worked branch, showing the
/// highest total difficulty.
pub fn most_work_peer(&self) -> Option<Arc<Peer>> {
self.most_work_peers().pop()
pub fn max_peer_difficulty(&self) -> Difficulty {
self.iter()
.connected()
.max_difficulty()
.unwrap_or(Difficulty::zero())
}
pub fn is_banned(&self, peer_addr: PeerAddr) -> bool {
@ -286,7 +178,7 @@ impl Peers {
{
let mut count = 0;
for p in self.connected_peers().iter() {
for p in self.iter().connected() {
match inner(&p) {
Ok(true) => count += 1,
Ok(false) => (),
@ -353,7 +245,7 @@ impl Peers {
/// Ping all our connected peers. Always automatically expects a pong back
/// or disconnects. This acts as a liveness test.
pub fn check_all(&self, total_difficulty: Difficulty, height: u64) {
for p in self.connected_peers().iter() {
for p in self.iter().connected() {
if let Err(e) = p.send_ping(total_difficulty, height) {
debug!("Error pinging peer {:?}: {:?}", &p.info.addr, e);
let mut peers = match self.peers.try_write_for(LOCK_TIMEOUT) {
@ -370,13 +262,13 @@ impl Peers {
}
/// Iterator over all peers we know about (stored in our db).
pub fn peers_iter(&self) -> Result<impl Iterator<Item = PeerData>, Error> {
pub fn peer_data_iter(&self) -> Result<impl Iterator<Item = PeerData>, Error> {
self.store.peers_iter().map_err(From::from)
}
/// Convenience for reading all peers.
pub fn all_peers(&self) -> Vec<PeerData> {
self.peers_iter()
/// Convenience for reading all peer data from the db.
pub fn all_peer_data(&self) -> Vec<PeerData> {
self.peer_data_iter()
.map(|peers| peers.collect())
.unwrap_or(vec![])
}
@ -427,14 +319,8 @@ impl Peers {
// build a list of peers to be cleaned up
{
let peers = match self.peers.try_read_for(LOCK_TIMEOUT) {
Some(peers) => peers,
None => {
error!("clean_peers: can't get peers lock");
return;
}
};
for peer in peers.values() {
for peer in self.iter() {
let ref peer: &Peer = peer.as_ref();
if peer.is_banned() {
debug!("clean_peers {:?}, peer banned", peer.info.addr);
rm.push(peer.info.addr.clone());
@ -466,27 +352,34 @@ impl Peers {
}
}
// closure to build an iterator of our inbound peers
let outbound_peers = || self.iter().outbound().connected().into_iter();
// check here to make sure we don't have too many outgoing connections
let excess_outgoing_count =
(self.peer_outbound_count() as usize).saturating_sub(max_outbound_count);
// Preferred peers are treated preferentially here.
// Also choose outbound peers with lowest total difficulty to drop.
let excess_outgoing_count = outbound_peers().count().saturating_sub(max_outbound_count);
if excess_outgoing_count > 0 {
let mut addrs: Vec<_> = self
.outgoing_connected_peers()
.iter()
.filter(|x| !preferred_peers.contains(&x.info.addr))
let mut peer_infos: Vec<_> = outbound_peers()
.map(|x| x.info.clone())
.filter(|x| !preferred_peers.contains(&x.addr))
.collect();
peer_infos.sort_unstable_by_key(|x| x.total_difficulty());
let mut addrs = peer_infos
.into_iter()
.map(|x| x.addr)
.take(excess_outgoing_count)
.map(|x| x.info.addr)
.collect();
rm.append(&mut addrs);
}
// closure to build an iterator of our inbound peers
let inbound_peers = || self.iter().inbound().connected().into_iter();
// check here to make sure we don't have too many incoming connections
let excess_incoming_count =
(self.peer_inbound_count() as usize).saturating_sub(max_inbound_count);
let excess_incoming_count = inbound_peers().count().saturating_sub(max_inbound_count);
if excess_incoming_count > 0 {
let mut addrs: Vec<_> = self
.incoming_connected_peers()
.iter()
let mut addrs: Vec<_> = inbound_peers()
.filter(|x| !preferred_peers.contains(&x.info.addr))
.take(excess_incoming_count)
.map(|x| x.info.addr)
@ -522,7 +415,8 @@ impl Peers {
/// We have enough outbound connected peers
pub fn enough_outbound_peers(&self) -> bool {
self.peer_outbound_count() >= self.config.peer_min_preferred_outbound_count()
self.iter().outbound().connected().count()
>= self.config.peer_min_preferred_outbound_count() as usize
}
/// Removes those peers that seem to have expired
@ -780,3 +674,86 @@ impl NetAdapter for Peers {
}
}
}
pub struct PeersIter<I> {
iter: I,
}
impl<I: Iterator> IntoIterator for PeersIter<I> {
type Item = I::Item;
type IntoIter = I;
fn into_iter(self) -> Self::IntoIter {
self.iter.into_iter()
}
}
impl<I: Iterator<Item = Arc<Peer>>> PeersIter<I> {
/// Filter peers that are currently connected.
/// Note: This adaptor takes a read lock internally.
/// So if we are chaining adaptors then defer this toward the end of the chain.
pub fn connected(self) -> PeersIter<impl Iterator<Item = Arc<Peer>>> {
PeersIter {
iter: self.iter.filter(|p| p.is_connected()),
}
}
/// Filter inbound peers.
pub fn inbound(self) -> PeersIter<impl Iterator<Item = Arc<Peer>>> {
PeersIter {
iter: self.iter.filter(|p| p.info.is_inbound()),
}
}
/// Filter outbound peers.
pub fn outbound(self) -> PeersIter<impl Iterator<Item = Arc<Peer>>> {
PeersIter {
iter: self.iter.filter(|p| p.info.is_outbound()),
}
}
/// Filter peers with the provided difficulty comparison fn.
///
/// with_difficulty(|x| x > diff)
///
/// Note: This adaptor takes a read lock internally for each peer.
/// So if we are chaining adaptors then put this toward later in the chain.
pub fn with_difficulty<F>(self, f: F) -> PeersIter<impl Iterator<Item = Arc<Peer>>>
where
F: Fn(Difficulty) -> bool,
{
PeersIter {
iter: self.iter.filter(move |p| f(p.info.total_difficulty())),
}
}
/// Filter peers that support the provided capabilities.
pub fn with_capabilities(
self,
cap: Capabilities,
) -> PeersIter<impl Iterator<Item = Arc<Peer>>> {
PeersIter {
iter: self.iter.filter(move |p| p.info.capabilities.contains(cap)),
}
}
pub fn by_addr(&mut self, addr: PeerAddr) -> Option<Arc<Peer>> {
self.iter.find(|p| p.info.addr == addr)
}
/// Choose a random peer from the current (filtered) peers.
pub fn choose_random(self) -> Option<Arc<Peer>> {
let mut rng = rand::thread_rng();
self.iter.choose(&mut rng)
}
/// Find the max difficulty of the current (filtered) peers.
pub fn max_difficulty(self) -> Option<Difficulty> {
self.iter.map(|p| p.info.total_difficulty()).max()
}
/// Count the current (filtered) peers.
pub fn count(self) -> usize {
self.iter.count()
}
}

View file

@ -233,7 +233,7 @@ impl Server {
/// different sets of peers themselves. In addition, it prevent potential
/// duplicate connections, malicious or not.
fn check_undesirable(&self, stream: &TcpStream) -> bool {
if self.peers.peer_inbound_count()
if self.peers.iter().inbound().connected().count() as u32
>= self.config.peer_max_inbound_count() + self.config.peer_listener_buffer_count()
{
debug!("Accepting new connection will exceed peer limit, refusing connection.");

View file

@ -266,6 +266,7 @@ pub struct P2PConfig {
/// The list of seed nodes, if using Seeding as a seed type
pub seeds: Option<PeerAddrs>,
/// TODO: Rethink this. We need to separate what *we* advertise vs. who we connect to.
/// Capabilities expose by this node, also conditions which other peers this
/// node will have an affinity toward when connection.
pub capabilities: Capabilities,

View file

@ -97,5 +97,5 @@ fn peer_handshake() {
let server_peer = server.peers.get_connected_peer(my_addr).unwrap();
assert_eq!(server_peer.info.total_difficulty(), Difficulty::min());
assert!(server.peers.peer_count() > 0);
assert!(server.peers.iter().connected().count() > 0);
}

View file

@ -345,11 +345,11 @@ impl DandelionEpoch {
/// Choose a new outbound stem relay peer.
pub fn next_epoch(&mut self, peers: &Arc<p2p::Peers>) {
self.start_time = Some(Utc::now().timestamp());
self.relay_peer = peers.outgoing_connected_peers().first().cloned();
self.relay_peer = peers.iter().outbound().connected().choose_random();
// If stem_probability == 90 then we stem 90% of the time.
let mut rng = rand::thread_rng();
let stem_probability = self.config.stem_probability;
let mut rng = rand::thread_rng();
self.is_stem = rng.gen_range(0, 100) < stem_probability;
let addr = self.relay_peer.clone().map(|p| p.info.addr);
@ -386,7 +386,7 @@ impl DandelionEpoch {
}
if update_relay {
self.relay_peer = peers.outgoing_connected_peers().first().cloned();
self.relay_peer = peers.iter().outbound().connected().choose_random();
info!(
"DandelionEpoch: relay_peer: new peer chosen: {:?}",
self.relay_peer.clone().map(|p| p.info.addr)

View file

@ -151,7 +151,7 @@ fn monitor_peers(
let mut banned_count = 0;
let mut defuncts = vec![];
for x in peers.all_peers().into_iter() {
for x in peers.all_peer_data().into_iter() {
match x.flags {
p2p::State::Banned => {
let interval = Utc::now().timestamp() - x.last_banned;
@ -174,13 +174,23 @@ fn monitor_peers(
total_count += 1;
}
let peers_count = peers.iter().connected().count();
let max_diff = peers.max_peer_difficulty();
let most_work_count = peers
.iter()
.outbound()
.with_difficulty(|x| x >= max_diff)
.connected()
.count();
debug!(
"monitor_peers: on {}:{}, {} connected ({} most_work). \
all {} = {} healthy + {} banned + {} defunct",
config.host,
config.port,
peers.peer_count(),
peers.most_work_peers().len(),
peers_count,
most_work_count,
total_count,
healthy_count,
banned_count,
@ -198,10 +208,14 @@ fn monitor_peers(
return;
}
// loop over connected peers
// loop over connected peers that can provide peer lists
// ask them for their list of peers
let mut connected_peers: Vec<PeerAddr> = vec![];
for p in peers.connected_peers() {
for p in peers
.iter()
.with_capabilities(p2p::Capabilities::PEER_LIST)
.connected()
{
trace!(
"monitor_peers: {}:{} ask {} for more peers",
config.host,
@ -331,9 +345,10 @@ fn listen_for_addrs(
.name("peer_connect".to_string())
.spawn(move || match p2p_c.connect(addr) {
Ok(p) => {
if p.send_peer_request(capab).is_ok() {
let _ = peers_c.update_state(addr, p2p::State::Healthy);
if p.info.capabilities.contains(p2p::Capabilities::PEER_LIST) {
let _ = p.send_peer_request(capab);
}
let _ = peers_c.update_state(addr, p2p::State::Healthy);
}
Err(_) => {
let _ = peers_c.update_state(addr, p2p::State::Defunct);

View file

@ -16,11 +16,11 @@
//! the peer-to-peer server, the blockchain and the transaction pool) and acts
//! as a facade.
use std::fs;
use std::fs::File;
use std::io::prelude::*;
use std::path::Path;
use std::sync::{mpsc, Arc};
use std::{convert::TryInto, fs};
use std::{
thread::{self, JoinHandle},
time::{self, Duration},
@ -356,7 +356,13 @@ impl Server {
/// Number of peers
pub fn peer_count(&self) -> u32 {
self.p2p.peers.peer_count()
self.p2p
.peers
.iter()
.connected()
.count()
.try_into()
.unwrap()
}
/// Start a minimal "stratum" mining service on a separate thread
@ -486,7 +492,8 @@ impl Server {
let peer_stats = self
.p2p
.peers
.connected_peers()
.iter()
.connected()
.into_iter()
.map(|p| PeerStats::from_peer(&p))
.collect();

View file

@ -14,6 +14,7 @@
use chrono::prelude::{DateTime, Utc};
use chrono::Duration;
use rand::prelude::*;
use std::cmp;
use std::sync::Arc;
@ -89,7 +90,17 @@ impl BodySync {
hashes.reverse();
let peers = self.peers.more_work_peers()?;
let head = self.chain.head()?;
// Find connected peers with strictly greater difficulty than us.
let peers: Vec<_> = self
.peers
.iter()
.outbound()
.with_difficulty(|x| x > head.total_difficulty)
.connected()
.into_iter()
.collect();
// if we have 5 peers to sync from then ask for 50 blocks total (peer_count *
// 10) max will be 80 if all 8 peers are advertising more work
@ -125,9 +136,9 @@ impl BodySync {
self.blocks_requested = 0;
self.receive_timeout = Utc::now() + Duration::seconds(6);
let mut peers_iter = peers.iter().cycle();
let mut rng = rand::thread_rng();
for hash in hashes_to_get.clone() {
if let Some(peer) = peers_iter.next() {
if let Some(peer) = peers.choose(&mut rng) {
if let Err(e) = peer.send_block_request(*hash, chain::Options::SYNC) {
debug!("Skipped request to {}: {:?}", peer.info.addr, e);
peer.stop();

View file

@ -19,7 +19,7 @@ use std::sync::Arc;
use crate::chain::{self, SyncState, SyncStatus};
use crate::common::types::Error;
use crate::core::core::hash::{Hash, Hashed};
use crate::p2p::{self, types::ReasonForBan, Peer};
use crate::p2p::{self, types::ReasonForBan, Capabilities, Peer};
pub struct HeaderSync {
sync_state: Arc<SyncState>,
@ -170,10 +170,17 @@ impl HeaderSync {
fn header_sync(&mut self) -> Option<Arc<Peer>> {
if let Ok(header_head) = self.chain.header_head() {
let difficulty = header_head.total_difficulty;
if let Some(peer) = self.peers.most_work_peer() {
if peer.info.total_difficulty() > difficulty {
let max_diff = self.peers.max_peer_difficulty();
let peer = self
.peers
.iter()
.outbound()
.with_capabilities(Capabilities::HEADER_HIST)
.with_difficulty(|x| x >= max_diff)
.connected()
.choose_random();
if let Some(peer) = peer {
if peer.info.total_difficulty() > header_head.total_difficulty {
return self.request_headers(peer);
}
}

View file

@ -19,7 +19,7 @@ use std::sync::Arc;
use crate::chain::{self, SyncState, SyncStatus};
use crate::core::core::hash::Hashed;
use crate::core::global;
use crate::p2p::{self, Peer};
use crate::p2p::{self, Capabilities, Peer};
/// Fast sync has 3 "states":
/// * syncing headers
@ -158,7 +158,16 @@ impl StateSync {
let mut txhashset_height = header_head.height.saturating_sub(threshold);
txhashset_height = txhashset_height.saturating_sub(txhashset_height % archive_interval);
if let Some(peer) = self.peers.most_work_peer() {
let max_diff = self.peers.max_peer_difficulty();
let peer = self
.peers
.iter()
.outbound()
.with_capabilities(Capabilities::TXHASHSET_HIST)
.with_difficulty(|x| x >= max_diff)
.connected()
.choose_random();
if let Some(peer) = peer {
// ask for txhashset at state_sync_threshold
let mut txhashset_head = self
.chain

View file

@ -79,7 +79,15 @@ impl SyncRunner {
if self.stop_state.is_stopped() {
break;
}
let wp = self.peers.more_or_same_work_peers()?;
// Count peers with at least our difficulty.
let wp = self
.peers
.iter()
.outbound()
.with_difficulty(|x| x >= head.total_difficulty)
.connected()
.count();
// exit loop when:
// * we have more than MIN_PEERS more_or_same_work peers
// * we are synced already, e.g. grin was quickly restarted
@ -221,7 +229,16 @@ impl SyncRunner {
fn needs_syncing(&self) -> Result<(bool, u64), chain::Error> {
let local_diff = self.chain.head()?.total_difficulty;
let mut is_syncing = self.sync_state.is_syncing();
let peer = self.peers.most_work_peer();
// Find a peer with greatest known difficulty.
let max_diff = self.peers.max_peer_difficulty();
let peer = self
.peers
.iter()
.outbound()
.with_difficulty(|x| x >= max_diff)
.connected()
.choose_random();
let peer_info = if let Some(p) = peer {
p.info.clone()