more aggressive peering (#445) (#446)

* [WIP] aggressive peer connections (and logging)

* get peer list from each connected peer when low on peers

* cleanup the try_read() calls and log consistent warning messages
(will clean these up later)

* cleanup error logging (log at debug level)
This commit is contained in:
AntiochP 2017-12-07 20:24:03 -05:00 committed by GitHub
parent fa91e4dc15
commit 10030a224a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 142 additions and 101 deletions

View file

@ -16,13 +16,10 @@
//! a mining worker implementation
//!
use rand::{thread_rng, Rng};
use std::cmp::min;
use std::net::SocketAddr;
use std::str::{self, FromStr};
use std::sync::Arc;
use std::time;
use std::time::Duration;
use cpupool;
use futures::{self, future, Future, Stream};
@ -80,15 +77,16 @@ impl Seeder {
tx: mpsc::UnboundedSender<SocketAddr>,
) -> Box<Future<Item = (), Error = String>> {
let p2p_server = self.p2p.clone();
let capabilities = self.capabilities.clone();
// now spawn a new future to regularly check if we need to acquire more peers
// and if so, gets them from db
let mon_loop = Timer::default()
.interval(time::Duration::from_secs(10))
.interval(time::Duration::from_secs(30))
.for_each(move |_| {
debug!(
LOGGER,
"monitoring_peers: {} / {} / {}",
"monitor_peers: {} / {} / {}",
p2p_server.most_work_peers().len(),
p2p_server.connected_peers().len(),
p2p_server.all_peers().len(),
@ -110,7 +108,7 @@ impl Seeder {
debug!(
LOGGER,
"monitoring_peers: all - {}, healthy - {}, banned - {}, defunct - {}",
"monitor_peers: all - {}, healthy - {}, banned - {}, defunct - {}",
all_peers.len(),
healthy_count,
banned_count,
@ -118,27 +116,44 @@ impl Seeder {
);
// maintenance step first, clean up p2p server peers
let _ = p2p_server.clean_peers();
{
p2p_server.clean_peers(PEER_PREFERRED_COUNT as usize);
}
// we don't have enough peers, getting more from db
// not enough peers, getting more from db
if p2p_server.peer_count() < PEER_PREFERRED_COUNT {
let mut peers = p2p_server.find_peers(
// loop over connected peers
// ask them for their list of peers
for p in p2p_server.connected_peers() {
if let Ok(p) = p.try_read() {
debug!(
LOGGER,
"monitor_peers: asking {} for more peers",
p.info.addr,
);
let _ = p.send_peer_request(capabilities);
} else {
warn!(
LOGGER,
"monitor_peers: failed to get read lock on peer",
);
}
}
// find some peers from our db
// and queue them up for a connection attempt
let peers = p2p_server.find_peers(
p2p::State::Healthy,
p2p::UNKNOWN,
(2 * PEER_MAX_COUNT) as usize,
100,
);
peers.retain(|p| !p2p_server.is_known(&p.addr));
if peers.len() > 0 {
for p in peers {
debug!(
LOGGER,
"Got {} peers from db, trying to connect.",
peers.len()
"monitor_peers: queueing up {} for connection (may be already connected)",
p.addr,
);
thread_rng().shuffle(&mut peers[..]);
let sz = min(PEER_PREFERRED_COUNT as usize, peers.len());
for p in &peers[0..sz] {
tx.unbounded_send(p.addr).unwrap();
}
tx.unbounded_send(p.addr).unwrap();
}
}
Ok(())
@ -166,29 +181,28 @@ impl Seeder {
let peers = p2p_server.find_peers(
p2p::State::Healthy,
p2p::FULL_HIST,
(2 * PEER_MAX_COUNT) as usize,
100,
);
Ok(peers)
})
.and_then(|mut peers| {
.and_then(|peers| {
// if so, get their addresses, otherwise use our seeds
if peers.len() > 3 {
thread_rng().shuffle(&mut peers[..]);
Box::new(future::ok(peers.iter().map(|p| p.addr).collect::<Vec<_>>()))
} else {
seed_list
}
})
.and_then(move |peer_addrs| {
// connect to this first set of addresses
let sz = min(PEER_PREFERRED_COUNT as usize, peer_addrs.len());
for addr in &peer_addrs[0..sz] {
debug!(LOGGER, "Connecting to seed: {}.", addr);
tx.unbounded_send(*addr).unwrap();
}
if peer_addrs.len() == 0 {
warn!(LOGGER, "No seeds were retrieved.");
}
// connect to this first set of addresses
for addr in peer_addrs {
tx.unbounded_send(addr).unwrap();
}
Ok(())
});
Box::new(seeder)
@ -285,29 +299,23 @@ fn connect_and_req(
h: reactor::Handle,
addr: SocketAddr,
) -> Box<Future<Item = (), Error = ()>> {
let connect_peer = p2p.connect_peer(addr, h).map_err(|_| ());
let timer = Timer::default();
let timeout = timer.timeout(connect_peer, Duration::from_secs(5));
let connect_peer = p2p.connect_peer(addr, h);
let p2p_server = p2p.clone();
let fut = timeout.then(move |p| {
let fut = connect_peer.then(move |p| {
match p {
Ok(Some(p)) => {
let p = p.read().unwrap();
let peer_result = p.send_peer_request(capab);
match peer_result {
Ok(()) => {}
Err(_) => {}
debug!(LOGGER, "connect_and_req: ok, will attempt send_peer_request");
if let Ok(p) = p.try_read() {
let _ = p.send_peer_request(capab);
}
}
Err(_) => {
let update_result = p2p_server.update_state(addr, p2p::State::Defunct);
match update_result {
Ok(()) => {}
Err(_) => {}
}
}
_ => {}
},
Ok(None) => {
debug!(LOGGER, "connect_and_req: ok but none inner (what does this mean?), {}", addr);
},
Err(e) => {
debug!(LOGGER, "connect_and_req: err - {:?}, {}, flagging as defunct", e, addr);
let _ = p2p_server.update_state(addr, p2p::State::Defunct);
},
}
Ok(())
});

View file

@ -106,12 +106,14 @@ fn body_sync(
}
hashes.reverse();
// let peer_count = p2p_server.most_work_peers().len();
let peer_count = {
p2p_server.most_work_peers().len()
};
let hashes_to_get = hashes
.iter()
.filter(|x| !chain.get_block(&x).is_ok())
.take(10)
// .take(peer_count * 2)
.take(peer_count * 2)
.cloned()
.collect::<Vec<_>>();
@ -162,26 +164,22 @@ fn request_headers(
chain: Arc<chain::Chain>,
) -> Result<(), Error> {
let locator = get_locator(chain)?;
match peer.try_read() {
Ok(peer) => {
debug!(
LOGGER,
"sync: request_headers: asking {} for headers, {:?}",
peer.info.addr,
locator,
);
let _ = peer.send_header_request(locator);
Ok(())
},
Err(_) => {
// not much we can do here, log and try again next time
warn!(
LOGGER,
"sync: request_headers: failed to get read lock on peer",
);
Ok(())
},
if let Ok(peer) = peer.try_read() {
debug!(
LOGGER,
"sync: request_headers: asking {} for headers, {:?}",
peer.info.addr,
locator,
);
let _ = peer.send_header_request(locator);
} else {
// not much we can do here, log and try again next time
debug!(
LOGGER,
"sync: request_headers: failed to get read lock on peer",
);
}
Ok(())
}
/// We build a locator based on sync_head.

View file

@ -113,7 +113,7 @@ impl Server {
// main peer acceptance future handling handshake
let hp = h.clone();
let peers_listen = socket.incoming().map_err(From::from).map(move |(conn, _)| {
// aaaand.. reclone for the internal closures
let adapter = adapter.clone();
let store = store.clone();
@ -216,9 +216,12 @@ impl Server {
) -> Box<Future<Item = Option<Arc<RwLock<Peer>>>, Error = Error>> {
if let Some(p) = self.get_peer(&addr) {
// if we're already connected to the addr, just return the peer
debug!(LOGGER, "connect_peer: already connected {}", addr);
return Box::new(future::ok(Some(p)));
}
debug!(LOGGER, "connect_peer: connecting to {}", addr);
// cloneapalooza
let peers = self.peers.clone();
let handshake = self.handshake.clone();
@ -226,9 +229,17 @@ impl Server {
let capab = self.capabilities.clone();
let self_addr = SocketAddr::new(self.config.host, self.config.port);
let socket = TcpStream::connect(&addr, &h).map_err(|e| Error::Connection(e));
let timer = Timer::default();
let socket_connect = timer.timeout(
TcpStream::connect(&addr, &h),
Duration::from_secs(5),
).map_err(|e| {
debug!(LOGGER, "connect_peer: socket connect error - {:?}", e);
Error::Connection(e)
});
let h2 = h.clone();
let request = socket
let request = socket_connect
.and_then(move |socket| {
let peers = peers.clone();
let total_diff = adapter.clone().total_difficulty();
@ -272,28 +283,59 @@ impl Server {
}
/// Have the server iterate over its peer list and prune all peers we have
/// lost connection to or have been deemed problematic. The removed peers
/// are returned.
pub fn clean_peers(&self) -> Vec<Arc<RwLock<Peer>>> {
/// lost connection to or have been deemed problematic.
/// Also avoid connected peer count getting too high.
pub fn clean_peers(&self, desired_count: usize) {
let mut rm = vec![];
// build a list of peers to be cleaned up
for peer in self.connected_peers() {
let peer_inner = peer.read().unwrap();
if peer_inner.is_banned() || !peer_inner.is_connected() {
if peer_inner.is_banned() {
debug!(LOGGER, "cleaning {:?}, peer banned", peer_inner.info.addr);
rm.push(peer.clone());
} else if !peer_inner.is_connected() {
debug!(LOGGER, "cleaning {:?}, not connected", peer_inner.info.addr);
rm.push(peer.clone());
}
}
// now clean up peer map based on the list to remove
let mut peers = self.peers.write().unwrap();
for p in rm.clone() {
let p = p.read().unwrap();
peers.remove(&p.info.addr);
{
let mut peers = self.peers.write().unwrap();
for p in rm.clone() {
let p = p.read().unwrap();
peers.remove(&p.info.addr);
}
}
rm
// ensure we do not have too many connected peers
// really fighting with the double layer of rwlocks here...
let excess_count = {
let peer_count = self.peer_count().clone() as usize;
if peer_count > desired_count {
peer_count - desired_count
} else {
0
}
};
// map peers to addrs in a block to bound how long we keep the read lock for
let addrs = {
self.connected_peers().iter().map(|x| {
let p = x.read().unwrap();
p.info.addr.clone()
}).collect::<Vec<_>>()
};
// now remove them taking a short-lived write lock each time
// maybe better to take write lock once and remove them all?
for x in addrs
.iter()
.take(excess_count) {
let mut peers = self.peers.write().unwrap();
peers.remove(x);
}
}
/// Return vec of all peers that currently have the most worked branch,
@ -342,23 +384,8 @@ impl Server {
}
/// Returns a random connected peer.
/// Only considers peers with at least our total_difficulty (ignores out of sync peers).
pub fn random_peer(&self) -> Option<Arc<RwLock<Peer>>> {
let difficulty = self.adapter.total_difficulty();
let peers = self
.connected_peers()
.iter()
.filter(|x| {
let peer = x.read().unwrap();
peer.is_connected() && peer.info.total_difficulty >= difficulty
})
.cloned()
.collect::<Vec<_>>();
if peers.len() == 0 {
return None;
}
let peers = self.connected_peers();
Some(thread_rng().choose(&peers).unwrap().clone())
}
@ -495,12 +522,20 @@ fn with_timeout<T: 'static>(
fut: Box<Future<Item = Result<T, ()>, Error = Error>>,
h: &reactor::Handle,
) -> Box<Future<Item = T, Error = Error>> {
let timeout = reactor::Timeout::new(Duration::new(5, 0), h).unwrap();
let timeout = reactor::Timeout::new(Duration::from_secs(5), h).unwrap();
let timed = fut.select(timeout.map(Err).from_err())
.then(|res| match res {
Ok((Ok(inner), _timeout)) => Ok(inner),
Ok((_, _accept)) => Err(Error::Timeout),
Err((e, _other)) => Err(e),
Ok((Ok(inner), _timeout)) => {
Ok(inner)
},
Ok((Err(inner), _accept)) => {
debug!(LOGGER, "with_timeout: ok but nested - {:?} (treating this as timeout)", inner);
Err(Error::Timeout)
},
Err((e, _other)) => {
debug!(LOGGER, "with_timeout: err - {:?} (treating this as an error)", e);
Err(e)
},
});
Box::new(timed)
}