Regularly ping all peers to check liveness

Ping all our peers every 20 sec. If no pong occurs within a
reasonable time (5 sec), disconnects.
This commit is contained in:
Ignotus Peverell 2017-11-19 20:33:45 -05:00
parent c1be4e2113
commit 8ce494536e
No known key found for this signature in database
GPG key ID: 99CD25F39F8F8211
3 changed files with 34 additions and 5 deletions

View file

@ -268,7 +268,7 @@ impl TimeoutConnection {
let expects = Arc::new(Mutex::new(vec![]));
// Decorates the handler to remove the "subscription" from the expected
// responses. We got our replies, so no timeout should occur.
// responses. We got our replies, so no timeout should occur.
let exp = expects.clone();
let (conn, fut) = Connection::listen(conn, move |sender, header: MsgHeader, data| {
let msg_type = header.msg_type;
@ -293,8 +293,9 @@ impl TimeoutConnection {
.interval(Duration::new(2, 0))
.fold((), move |_, _| {
let exp = exp.lock().unwrap();
for &(_, _, t) in exp.deref() {
if Instant::now() - t > Duration::new(2, 0) {
for &(ty, h, t) in exp.deref() {
if Instant::now() - t > Duration::new(5, 0) {
trace!(LOGGER, "Too long: {:?} {:?}", ty, h);
return Err(TimerError::TooLong);
}
}

View file

@ -93,6 +93,7 @@ impl Peer {
let addr = self.info.addr;
let state = self.state.clone();
let adapter = Arc::new(self.tracking_adapter.clone());
Box::new(self.proto.handle(conn, adapter).then(move |res| {
// handle disconnection, standard disconnections aren't considered an error
let mut state = state.write().unwrap();

View file

@ -28,6 +28,7 @@ use futures::future::{self, IntoFuture};
use rand::{self, Rng};
use tokio_core::net::{TcpListener, TcpStream};
use tokio_core::reactor;
use tokio_timer::Timer;
use core::core;
use core::core::hash::Hash;
@ -101,7 +102,7 @@ impl Server {
// main peer acceptance future handling handshake
let hp = h.clone();
let peers = socket.incoming().map_err(From::from).map(move |(conn, _)| {
let peers_listen = socket.incoming().map_err(From::from).map(move |(conn, _)| {
let adapter = adapter.clone();
let total_diff = adapter.total_difficulty();
let peers = peers.clone();
@ -119,7 +120,7 @@ impl Server {
// spawn each peer future to its own task
let hs = h.clone();
let server = peers.for_each(move |peer| {
let server = peers_listen.for_each(move |peer| {
hs.spawn(peer.then(|res| {
match res {
Err(e) => info!(LOGGER, "Client error: {:?}", e),
@ -136,9 +137,24 @@ impl Server {
let mut stop_mut = self.stop.borrow_mut();
*stop_mut = Some(stop);
}
// timer to regularly check on our peers by pinging them
let peers_inner = self.peers.clone();
let peers_timer = Timer::default()
.interval(Duration::new(20, 0))
.fold((), move |_, _| {
check_peers(peers_inner.clone());
Ok(())
});
Box::new(
server
.select(stop_rx.map_err(|_| Error::ConnectionClose))
.then(|res| match res {
Ok((_, _)) => Ok(()),
Err((e, _)) => Err(e),
})
.select(peers_timer.map_err(|_| Error::Timeout))
.then(|res| match res {
Ok((_, _)) => Ok(()),
Err((e, _)) => Err(e),
@ -328,6 +344,17 @@ where
Box::new(peer_add)
}
// Ping all our connected peers. Always automatically expects a pong back or
// disconnects. This acts as a liveness test.
fn check_peers(peers: Arc<RwLock<HashMap<SocketAddr, Arc<Peer>>>>) {
let peers_map = peers.read().unwrap();
for p in peers_map.values() {
if p.is_connected() {
let _ = p.send_ping();
}
}
}
// Adds a timeout to a future
fn with_timeout<T: 'static>(
fut: Box<Future<Item = Result<T, ()>, Error = Error>>,