diff --git a/api/src/handlers/peers_api.rs b/api/src/handlers/peers_api.rs
index 9469ef273..65b8c6141 100644
--- a/api/src/handlers/peers_api.rs
+++ b/api/src/handlers/peers_api.rs
@@ -28,7 +28,7 @@ pub struct PeersAllHandler {
impl Handler for PeersAllHandler {
fn get(&self, _req: Request
) -> ResponseFuture {
- let peers = &w_fut!(&self.peers).all_peers();
+ let peers = &w_fut!(&self.peers).all_peer_data();
json_response_pretty(&peers)
}
}
@@ -40,8 +40,9 @@ pub struct PeersConnectedHandler {
impl PeersConnectedHandler {
pub fn get_connected_peers(&self) -> Result, Error> {
let peers = w(&self.peers)?
- .connected_peers()
.iter()
+ .connected()
+ .into_iter()
.map(|p| p.info.clone().into())
.collect::>();
Ok(peers)
@@ -51,8 +52,9 @@ impl PeersConnectedHandler {
impl Handler for PeersConnectedHandler {
fn get(&self, _req: Request) -> ResponseFuture {
let peers: Vec = w_fut!(&self.peers)
- .connected_peers()
.iter()
+ .connected()
+ .into_iter()
.map(|p| p.info.clone().into())
.collect();
json_response(&peers)
@@ -77,7 +79,7 @@ impl PeerHandler {
})?;
return Ok(vec![peer_data]);
}
- let peers = w(&self.peers)?.all_peers();
+ let peers = w(&self.peers)?.all_peer_data();
Ok(peers)
}
diff --git a/api/src/handlers/server_api.rs b/api/src/handlers/server_api.rs
index 5b14557d0..4802bf839 100644
--- a/api/src/handlers/server_api.rs
+++ b/api/src/handlers/server_api.rs
@@ -21,6 +21,7 @@ use crate::types::*;
use crate::web::*;
use hyper::{Body, Request};
use serde_json::json;
+use std::convert::TryInto;
use std::sync::Weak;
// RESTful index of available api endpoints
@@ -54,7 +55,12 @@ impl StatusHandler {
let (api_sync_status, api_sync_info) = sync_status_to_api(sync_status);
Ok(Status::from_tip_and_peers(
head,
- w(&self.peers)?.peer_count(),
+ w(&self.peers)?
+ .iter()
+ .connected()
+ .count()
+ .try_into()
+ .unwrap(),
api_sync_status,
api_sync_info,
))
diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs
index e4efa6cca..9c6dd2c59 100644
--- a/p2p/src/peers.rs
+++ b/p2p/src/peers.rs
@@ -18,8 +18,7 @@ use std::fs::File;
use std::path::PathBuf;
use std::sync::Arc;
-use rand::seq::SliceRandom;
-use rand::thread_rng;
+use rand::prelude::*;
use crate::chain;
use crate::core::core;
@@ -105,139 +104,32 @@ impl Peers {
Ok(peers.contains_key(&addr))
}
- /// Get vec of peers we are currently connected to.
- pub fn connected_peers(&self) -> Vec> {
+ /// Iterator over our current peers.
+ /// This allows us to hide try_read_for() behind a cleaner interface.
+ /// PeersIter lets us chain various adaptors for convenience.
+ pub fn iter(&self) -> PeersIter>> {
let peers = match self.peers.try_read_for(LOCK_TIMEOUT) {
- Some(peers) => peers,
+ Some(peers) => peers.values().cloned().collect(),
None => {
error!("connected_peers: failed to get peers lock");
- return vec![];
+ vec![]
}
};
- let mut res = peers
- .values()
- .filter(|p| p.is_connected())
- .cloned()
- .collect::>();
- res.shuffle(&mut thread_rng());
- res
- }
-
- /// Get vec of peers we currently have an outgoing connection with.
- pub fn outgoing_connected_peers(&self) -> Vec> {
- self.connected_peers()
- .into_iter()
- .filter(|x| x.info.is_outbound())
- .collect()
- }
-
- /// Get vec of peers we currently have an incoming connection with.
- pub fn incoming_connected_peers(&self) -> Vec> {
- self.connected_peers()
- .into_iter()
- .filter(|x| x.info.is_inbound())
- .collect()
+ PeersIter {
+ iter: peers.into_iter(),
+ }
}
/// Get a peer we're connected to by address.
pub fn get_connected_peer(&self, addr: PeerAddr) -> Option> {
- let peers = match self.peers.try_read_for(LOCK_TIMEOUT) {
- Some(peers) => peers,
- None => {
- error!("get_connected_peer: failed to get peers lock");
- return None;
- }
- };
- peers.get(&addr).cloned()
+ self.iter().connected().by_addr(addr)
}
- /// Number of peers currently connected to.
- pub fn peer_count(&self) -> u32 {
- self.connected_peers().len() as u32
- }
-
- /// Number of outbound peers currently connected to.
- pub fn peer_outbound_count(&self) -> u32 {
- self.outgoing_connected_peers().len() as u32
- }
-
- /// Number of inbound peers currently connected to.
- pub fn peer_inbound_count(&self) -> u32 {
- self.incoming_connected_peers().len() as u32
- }
-
- // Return vec of connected peers that currently advertise more work
- // (total_difficulty) than we do.
- pub fn more_work_peers(&self) -> Result>, chain::Error> {
- let peers = self.connected_peers();
- if peers.is_empty() {
- return Ok(vec![]);
- }
-
- let total_difficulty = self.total_difficulty()?;
-
- let mut max_peers = peers
- .into_iter()
- .filter(|x| x.info.total_difficulty() > total_difficulty)
- .collect::>();
-
- max_peers.shuffle(&mut thread_rng());
- Ok(max_peers)
- }
-
- // Return number of connected peers that currently advertise more/same work
- // (total_difficulty) than/as we do.
- pub fn more_or_same_work_peers(&self) -> Result {
- let peers = self.connected_peers();
- if peers.is_empty() {
- return Ok(0);
- }
-
- let total_difficulty = self.total_difficulty()?;
-
- Ok(peers
- .iter()
- .filter(|x| x.info.total_difficulty() >= total_difficulty)
- .count())
- }
-
- /// Returns single random peer with more work than us.
- pub fn more_work_peer(&self) -> Option> {
- match self.more_work_peers() {
- Ok(mut peers) => peers.pop(),
- Err(e) => {
- error!("failed to get more work peers: {:?}", e);
- None
- }
- }
- }
-
- /// Return vec of connected peers that currently have the most worked
- /// branch, showing the highest total difficulty.
- pub fn most_work_peers(&self) -> Vec> {
- let peers = self.connected_peers();
- if peers.is_empty() {
- return vec![];
- }
-
- let max_total_difficulty = match peers.iter().map(|x| x.info.total_difficulty()).max() {
- Some(v) => v,
- None => return vec![],
- };
-
- let mut max_peers = peers
- .into_iter()
- .filter(|x| x.info.total_difficulty() == max_total_difficulty)
- .collect::>();
-
- max_peers.shuffle(&mut thread_rng());
- max_peers
- }
-
- /// Returns single random peer with the most worked branch, showing the
- /// highest total difficulty.
- pub fn most_work_peer(&self) -> Option> {
- self.most_work_peers().pop()
+ pub fn max_peer_difficulty(&self) -> Difficulty {
+ self.iter()
+ .connected()
+ .max_difficulty()
+ .unwrap_or(Difficulty::zero())
}
pub fn is_banned(&self, peer_addr: PeerAddr) -> bool {
@@ -286,7 +178,7 @@ impl Peers {
{
let mut count = 0;
- for p in self.connected_peers().iter() {
+ for p in self.iter().connected() {
match inner(&p) {
Ok(true) => count += 1,
Ok(false) => (),
@@ -353,7 +245,7 @@ impl Peers {
/// Ping all our connected peers. Always automatically expects a pong back
/// or disconnects. This acts as a liveness test.
pub fn check_all(&self, total_difficulty: Difficulty, height: u64) {
- for p in self.connected_peers().iter() {
+ for p in self.iter().connected() {
if let Err(e) = p.send_ping(total_difficulty, height) {
debug!("Error pinging peer {:?}: {:?}", &p.info.addr, e);
let mut peers = match self.peers.try_write_for(LOCK_TIMEOUT) {
@@ -370,13 +262,13 @@ impl Peers {
}
/// Iterator over all peers we know about (stored in our db).
- pub fn peers_iter(&self) -> Result, Error> {
+ pub fn peer_data_iter(&self) -> Result, Error> {
self.store.peers_iter().map_err(From::from)
}
- /// Convenience for reading all peers.
- pub fn all_peers(&self) -> Vec {
- self.peers_iter()
+ /// Convenience for reading all peer data from the db.
+ pub fn all_peer_data(&self) -> Vec {
+ self.peer_data_iter()
.map(|peers| peers.collect())
.unwrap_or(vec![])
}
@@ -427,14 +319,8 @@ impl Peers {
// build a list of peers to be cleaned up
{
- let peers = match self.peers.try_read_for(LOCK_TIMEOUT) {
- Some(peers) => peers,
- None => {
- error!("clean_peers: can't get peers lock");
- return;
- }
- };
- for peer in peers.values() {
+ for peer in self.iter() {
+ let ref peer: &Peer = peer.as_ref();
if peer.is_banned() {
debug!("clean_peers {:?}, peer banned", peer.info.addr);
rm.push(peer.info.addr.clone());
@@ -466,27 +352,34 @@ impl Peers {
}
}
+ // closure to build an iterator of our inbound peers
+ let outbound_peers = || self.iter().outbound().connected().into_iter();
+
// check here to make sure we don't have too many outgoing connections
- let excess_outgoing_count =
- (self.peer_outbound_count() as usize).saturating_sub(max_outbound_count);
+ // Preferred peers are treated preferentially here.
+ // Also choose outbound peers with lowest total difficulty to drop.
+ let excess_outgoing_count = outbound_peers().count().saturating_sub(max_outbound_count);
if excess_outgoing_count > 0 {
- let mut addrs: Vec<_> = self
- .outgoing_connected_peers()
- .iter()
- .filter(|x| !preferred_peers.contains(&x.info.addr))
+ let mut peer_infos: Vec<_> = outbound_peers()
+ .map(|x| x.info.clone())
+ .filter(|x| !preferred_peers.contains(&x.addr))
+ .collect();
+ peer_infos.sort_unstable_by_key(|x| x.total_difficulty());
+ let mut addrs = peer_infos
+ .into_iter()
+ .map(|x| x.addr)
.take(excess_outgoing_count)
- .map(|x| x.info.addr)
.collect();
rm.append(&mut addrs);
}
+ // closure to build an iterator of our inbound peers
+ let inbound_peers = || self.iter().inbound().connected().into_iter();
+
// check here to make sure we don't have too many incoming connections
- let excess_incoming_count =
- (self.peer_inbound_count() as usize).saturating_sub(max_inbound_count);
+ let excess_incoming_count = inbound_peers().count().saturating_sub(max_inbound_count);
if excess_incoming_count > 0 {
- let mut addrs: Vec<_> = self
- .incoming_connected_peers()
- .iter()
+ let mut addrs: Vec<_> = inbound_peers()
.filter(|x| !preferred_peers.contains(&x.info.addr))
.take(excess_incoming_count)
.map(|x| x.info.addr)
@@ -522,7 +415,8 @@ impl Peers {
/// We have enough outbound connected peers
pub fn enough_outbound_peers(&self) -> bool {
- self.peer_outbound_count() >= self.config.peer_min_preferred_outbound_count()
+ self.iter().outbound().connected().count()
+ >= self.config.peer_min_preferred_outbound_count() as usize
}
/// Removes those peers that seem to have expired
@@ -780,3 +674,86 @@ impl NetAdapter for Peers {
}
}
}
+
+pub struct PeersIter {
+ iter: I,
+}
+
+impl IntoIterator for PeersIter {
+ type Item = I::Item;
+ type IntoIter = I;
+
+ fn into_iter(self) -> Self::IntoIter {
+ self.iter.into_iter()
+ }
+}
+
+impl>> PeersIter {
+ /// Filter peers that are currently connected.
+ /// Note: This adaptor takes a read lock internally.
+ /// So if we are chaining adaptors then defer this toward the end of the chain.
+ pub fn connected(self) -> PeersIter>> {
+ PeersIter {
+ iter: self.iter.filter(|p| p.is_connected()),
+ }
+ }
+
+ /// Filter inbound peers.
+ pub fn inbound(self) -> PeersIter>> {
+ PeersIter {
+ iter: self.iter.filter(|p| p.info.is_inbound()),
+ }
+ }
+
+ /// Filter outbound peers.
+ pub fn outbound(self) -> PeersIter>> {
+ PeersIter {
+ iter: self.iter.filter(|p| p.info.is_outbound()),
+ }
+ }
+
+ /// Filter peers with the provided difficulty comparison fn.
+ ///
+ /// with_difficulty(|x| x > diff)
+ ///
+ /// Note: This adaptor takes a read lock internally for each peer.
+ /// So if we are chaining adaptors then put this toward later in the chain.
+ pub fn with_difficulty(self, f: F) -> PeersIter>>
+ where
+ F: Fn(Difficulty) -> bool,
+ {
+ PeersIter {
+ iter: self.iter.filter(move |p| f(p.info.total_difficulty())),
+ }
+ }
+
+ /// Filter peers that support the provided capabilities.
+ pub fn with_capabilities(
+ self,
+ cap: Capabilities,
+ ) -> PeersIter>> {
+ PeersIter {
+ iter: self.iter.filter(move |p| p.info.capabilities.contains(cap)),
+ }
+ }
+
+ pub fn by_addr(&mut self, addr: PeerAddr) -> Option> {
+ self.iter.find(|p| p.info.addr == addr)
+ }
+
+ /// Choose a random peer from the current (filtered) peers.
+ pub fn choose_random(self) -> Option> {
+ let mut rng = rand::thread_rng();
+ self.iter.choose(&mut rng)
+ }
+
+ /// Find the max difficulty of the current (filtered) peers.
+ pub fn max_difficulty(self) -> Option {
+ self.iter.map(|p| p.info.total_difficulty()).max()
+ }
+
+ /// Count the current (filtered) peers.
+ pub fn count(self) -> usize {
+ self.iter.count()
+ }
+}
diff --git a/p2p/src/serv.rs b/p2p/src/serv.rs
index ff2a5c34c..3211e285a 100644
--- a/p2p/src/serv.rs
+++ b/p2p/src/serv.rs
@@ -233,7 +233,7 @@ impl Server {
/// different sets of peers themselves. In addition, it prevent potential
/// duplicate connections, malicious or not.
fn check_undesirable(&self, stream: &TcpStream) -> bool {
- if self.peers.peer_inbound_count()
+ if self.peers.iter().inbound().connected().count() as u32
>= self.config.peer_max_inbound_count() + self.config.peer_listener_buffer_count()
{
debug!("Accepting new connection will exceed peer limit, refusing connection.");
diff --git a/p2p/src/types.rs b/p2p/src/types.rs
index e7a2d33d3..f8929d5ce 100644
--- a/p2p/src/types.rs
+++ b/p2p/src/types.rs
@@ -266,6 +266,7 @@ pub struct P2PConfig {
/// The list of seed nodes, if using Seeding as a seed type
pub seeds: Option,
+ /// TODO: Rethink this. We need to separate what *we* advertise vs. who we connect to.
/// Capabilities expose by this node, also conditions which other peers this
/// node will have an affinity toward when connection.
pub capabilities: Capabilities,
diff --git a/p2p/tests/peer_handshake.rs b/p2p/tests/peer_handshake.rs
index d34e6cd91..342613884 100644
--- a/p2p/tests/peer_handshake.rs
+++ b/p2p/tests/peer_handshake.rs
@@ -97,5 +97,5 @@ fn peer_handshake() {
let server_peer = server.peers.get_connected_peer(my_addr).unwrap();
assert_eq!(server_peer.info.total_difficulty(), Difficulty::min());
- assert!(server.peers.peer_count() > 0);
+ assert!(server.peers.iter().connected().count() > 0);
}
diff --git a/servers/src/common/types.rs b/servers/src/common/types.rs
index 4f86d51cd..5f6ff1479 100644
--- a/servers/src/common/types.rs
+++ b/servers/src/common/types.rs
@@ -345,11 +345,11 @@ impl DandelionEpoch {
/// Choose a new outbound stem relay peer.
pub fn next_epoch(&mut self, peers: &Arc) {
self.start_time = Some(Utc::now().timestamp());
- self.relay_peer = peers.outgoing_connected_peers().first().cloned();
+ self.relay_peer = peers.iter().outbound().connected().choose_random();
// If stem_probability == 90 then we stem 90% of the time.
- let mut rng = rand::thread_rng();
let stem_probability = self.config.stem_probability;
+ let mut rng = rand::thread_rng();
self.is_stem = rng.gen_range(0, 100) < stem_probability;
let addr = self.relay_peer.clone().map(|p| p.info.addr);
@@ -386,7 +386,7 @@ impl DandelionEpoch {
}
if update_relay {
- self.relay_peer = peers.outgoing_connected_peers().first().cloned();
+ self.relay_peer = peers.iter().outbound().connected().choose_random();
info!(
"DandelionEpoch: relay_peer: new peer chosen: {:?}",
self.relay_peer.clone().map(|p| p.info.addr)
diff --git a/servers/src/grin/seed.rs b/servers/src/grin/seed.rs
index 7caa7e783..ed5a682e4 100644
--- a/servers/src/grin/seed.rs
+++ b/servers/src/grin/seed.rs
@@ -151,7 +151,7 @@ fn monitor_peers(
let mut banned_count = 0;
let mut defuncts = vec![];
- for x in peers.all_peers().into_iter() {
+ for x in peers.all_peer_data().into_iter() {
match x.flags {
p2p::State::Banned => {
let interval = Utc::now().timestamp() - x.last_banned;
@@ -174,13 +174,23 @@ fn monitor_peers(
total_count += 1;
}
+ let peers_count = peers.iter().connected().count();
+
+ let max_diff = peers.max_peer_difficulty();
+ let most_work_count = peers
+ .iter()
+ .outbound()
+ .with_difficulty(|x| x >= max_diff)
+ .connected()
+ .count();
+
debug!(
"monitor_peers: on {}:{}, {} connected ({} most_work). \
all {} = {} healthy + {} banned + {} defunct",
config.host,
config.port,
- peers.peer_count(),
- peers.most_work_peers().len(),
+ peers_count,
+ most_work_count,
total_count,
healthy_count,
banned_count,
@@ -198,10 +208,14 @@ fn monitor_peers(
return;
}
- // loop over connected peers
+ // loop over connected peers that can provide peer lists
// ask them for their list of peers
let mut connected_peers: Vec = vec![];
- for p in peers.connected_peers() {
+ for p in peers
+ .iter()
+ .with_capabilities(p2p::Capabilities::PEER_LIST)
+ .connected()
+ {
trace!(
"monitor_peers: {}:{} ask {} for more peers",
config.host,
@@ -331,9 +345,10 @@ fn listen_for_addrs(
.name("peer_connect".to_string())
.spawn(move || match p2p_c.connect(addr) {
Ok(p) => {
- if p.send_peer_request(capab).is_ok() {
- let _ = peers_c.update_state(addr, p2p::State::Healthy);
+ if p.info.capabilities.contains(p2p::Capabilities::PEER_LIST) {
+ let _ = p.send_peer_request(capab);
}
+ let _ = peers_c.update_state(addr, p2p::State::Healthy);
}
Err(_) => {
let _ = peers_c.update_state(addr, p2p::State::Defunct);
diff --git a/servers/src/grin/server.rs b/servers/src/grin/server.rs
index f6cf069ca..c5b187130 100644
--- a/servers/src/grin/server.rs
+++ b/servers/src/grin/server.rs
@@ -16,11 +16,11 @@
//! the peer-to-peer server, the blockchain and the transaction pool) and acts
//! as a facade.
-use std::fs;
use std::fs::File;
use std::io::prelude::*;
use std::path::Path;
use std::sync::{mpsc, Arc};
+use std::{convert::TryInto, fs};
use std::{
thread::{self, JoinHandle},
time::{self, Duration},
@@ -356,7 +356,13 @@ impl Server {
/// Number of peers
pub fn peer_count(&self) -> u32 {
- self.p2p.peers.peer_count()
+ self.p2p
+ .peers
+ .iter()
+ .connected()
+ .count()
+ .try_into()
+ .unwrap()
}
/// Start a minimal "stratum" mining service on a separate thread
@@ -486,7 +492,8 @@ impl Server {
let peer_stats = self
.p2p
.peers
- .connected_peers()
+ .iter()
+ .connected()
.into_iter()
.map(|p| PeerStats::from_peer(&p))
.collect();
diff --git a/servers/src/grin/sync/body_sync.rs b/servers/src/grin/sync/body_sync.rs
index d77e634a0..65f7c417f 100644
--- a/servers/src/grin/sync/body_sync.rs
+++ b/servers/src/grin/sync/body_sync.rs
@@ -14,6 +14,7 @@
use chrono::prelude::{DateTime, Utc};
use chrono::Duration;
+use rand::prelude::*;
use std::cmp;
use std::sync::Arc;
@@ -89,7 +90,17 @@ impl BodySync {
hashes.reverse();
- let peers = self.peers.more_work_peers()?;
+ let head = self.chain.head()?;
+
+ // Find connected peers with strictly greater difficulty than us.
+ let peers: Vec<_> = self
+ .peers
+ .iter()
+ .outbound()
+ .with_difficulty(|x| x > head.total_difficulty)
+ .connected()
+ .into_iter()
+ .collect();
// if we have 5 peers to sync from then ask for 50 blocks total (peer_count *
// 10) max will be 80 if all 8 peers are advertising more work
@@ -125,9 +136,9 @@ impl BodySync {
self.blocks_requested = 0;
self.receive_timeout = Utc::now() + Duration::seconds(6);
- let mut peers_iter = peers.iter().cycle();
+ let mut rng = rand::thread_rng();
for hash in hashes_to_get.clone() {
- if let Some(peer) = peers_iter.next() {
+ if let Some(peer) = peers.choose(&mut rng) {
if let Err(e) = peer.send_block_request(*hash, chain::Options::SYNC) {
debug!("Skipped request to {}: {:?}", peer.info.addr, e);
peer.stop();
diff --git a/servers/src/grin/sync/header_sync.rs b/servers/src/grin/sync/header_sync.rs
index 6fec87e13..a55c425ec 100644
--- a/servers/src/grin/sync/header_sync.rs
+++ b/servers/src/grin/sync/header_sync.rs
@@ -19,7 +19,7 @@ use std::sync::Arc;
use crate::chain::{self, SyncState, SyncStatus};
use crate::common::types::Error;
use crate::core::core::hash::{Hash, Hashed};
-use crate::p2p::{self, types::ReasonForBan, Peer};
+use crate::p2p::{self, types::ReasonForBan, Capabilities, Peer};
pub struct HeaderSync {
sync_state: Arc,
@@ -170,10 +170,17 @@ impl HeaderSync {
fn header_sync(&mut self) -> Option> {
if let Ok(header_head) = self.chain.header_head() {
- let difficulty = header_head.total_difficulty;
-
- if let Some(peer) = self.peers.most_work_peer() {
- if peer.info.total_difficulty() > difficulty {
+ let max_diff = self.peers.max_peer_difficulty();
+ let peer = self
+ .peers
+ .iter()
+ .outbound()
+ .with_capabilities(Capabilities::HEADER_HIST)
+ .with_difficulty(|x| x >= max_diff)
+ .connected()
+ .choose_random();
+ if let Some(peer) = peer {
+ if peer.info.total_difficulty() > header_head.total_difficulty {
return self.request_headers(peer);
}
}
diff --git a/servers/src/grin/sync/state_sync.rs b/servers/src/grin/sync/state_sync.rs
index e5ab4d30d..86bceca6d 100644
--- a/servers/src/grin/sync/state_sync.rs
+++ b/servers/src/grin/sync/state_sync.rs
@@ -19,7 +19,7 @@ use std::sync::Arc;
use crate::chain::{self, SyncState, SyncStatus};
use crate::core::core::hash::Hashed;
use crate::core::global;
-use crate::p2p::{self, Peer};
+use crate::p2p::{self, Capabilities, Peer};
/// Fast sync has 3 "states":
/// * syncing headers
@@ -158,7 +158,16 @@ impl StateSync {
let mut txhashset_height = header_head.height.saturating_sub(threshold);
txhashset_height = txhashset_height.saturating_sub(txhashset_height % archive_interval);
- if let Some(peer) = self.peers.most_work_peer() {
+ let max_diff = self.peers.max_peer_difficulty();
+ let peer = self
+ .peers
+ .iter()
+ .outbound()
+ .with_capabilities(Capabilities::TXHASHSET_HIST)
+ .with_difficulty(|x| x >= max_diff)
+ .connected()
+ .choose_random();
+ if let Some(peer) = peer {
// ask for txhashset at state_sync_threshold
let mut txhashset_head = self
.chain
diff --git a/servers/src/grin/sync/syncer.rs b/servers/src/grin/sync/syncer.rs
index a5e66fb95..4f71b48fd 100644
--- a/servers/src/grin/sync/syncer.rs
+++ b/servers/src/grin/sync/syncer.rs
@@ -79,7 +79,15 @@ impl SyncRunner {
if self.stop_state.is_stopped() {
break;
}
- let wp = self.peers.more_or_same_work_peers()?;
+ // Count peers with at least our difficulty.
+ let wp = self
+ .peers
+ .iter()
+ .outbound()
+ .with_difficulty(|x| x >= head.total_difficulty)
+ .connected()
+ .count();
+
// exit loop when:
// * we have more than MIN_PEERS more_or_same_work peers
// * we are synced already, e.g. grin was quickly restarted
@@ -221,7 +229,16 @@ impl SyncRunner {
fn needs_syncing(&self) -> Result<(bool, u64), chain::Error> {
let local_diff = self.chain.head()?.total_difficulty;
let mut is_syncing = self.sync_state.is_syncing();
- let peer = self.peers.most_work_peer();
+
+ // Find a peer with greatest known difficulty.
+ let max_diff = self.peers.max_peer_difficulty();
+ let peer = self
+ .peers
+ .iter()
+ .outbound()
+ .with_difficulty(|x| x >= max_diff)
+ .connected()
+ .choose_random();
let peer_info = if let Some(p) = peer {
p.info.clone()