[testnet1] De-Duplicate expected responses in TimeoutConnection (#484) (#485)

* timeout connection already tracks "expected" responses
use this to deduplicate requests and do not ask a peer for the same thing again
(until either success or timeout)

* do not ask for orphan blocks repeatedly
allow more than preferred number of peers (clean if we exceed max number)
This commit is contained in:
AntiochP 2017-12-14 12:23:35 -05:00 committed by GitHub
parent 38741c91e2
commit b17e42fb42
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 68 additions and 17 deletions

View file

@ -211,6 +211,11 @@ impl Chain {
} }
} }
pub fn is_orphan(&self, hash: &Hash) -> bool {
let orphans = self.orphans.lock().unwrap();
orphans.iter().any(|&(_, ref x)| x.hash() == hash.clone())
}
/// Pop orphans out of the queue and check if we can now accept them. /// Pop orphans out of the queue and check if we can now accept them.
fn check_orphans(&self) { fn check_orphans(&self) {
// first check how many we have to retry, unfort. we can't extend the lock // first check how many we have to retry, unfort. we can't extend the lock

View file

@ -115,7 +115,7 @@ impl Seeder {
// maintenance step first, clean up p2p server peers // maintenance step first, clean up p2p server peers
{ {
peers.clean_peers(PEER_PREFERRED_COUNT as usize); peers.clean_peers(PEER_MAX_COUNT as usize);
} }
// not enough peers, getting more from db // not enough peers, getting more from db

View file

@ -119,7 +119,11 @@ fn body_sync(peers: Peers, chain: Arc<chain::Chain>) {
let hashes_to_get = hashes let hashes_to_get = hashes
.iter() .iter()
.filter(|x| !chain.get_block(&x).is_ok()) .filter(|x| {
// only ask for blocks that we have not yet processed
// either successfully stored or in our orphan list
!chain.get_block(x).is_ok() && !chain.is_orphan(x)
})
.take(block_count) .take(block_count)
.cloned() .cloned()
.collect::<Vec<_>>(); .collect::<Vec<_>>();

View file

@ -16,7 +16,6 @@
//! or receiving data from the TCP socket, as well as dealing with timeouts. //! or receiving data from the TCP socket, as well as dealing with timeouts.
use std::iter; use std::iter;
use std::ops::Deref;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@ -268,8 +267,14 @@ impl Connection {
/// a timeout. /// a timeout.
pub struct TimeoutConnection { pub struct TimeoutConnection {
underlying: Connection, underlying: Connection,
expected_responses: Arc<Mutex<Vec<InFlightRequest>>>,
}
expected_responses: Arc<Mutex<Vec<(Type, Option<Hash>, Instant)>>>, #[derive(Debug, Clone)]
struct InFlightRequest {
msg_type: Type,
hash: Option<Hash>,
time: Instant,
} }
impl TimeoutConnection { impl TimeoutConnection {
@ -282,7 +287,7 @@ impl TimeoutConnection {
where where
F: Handler + 'static, F: Handler + 'static,
{ {
let expects = Arc::new(Mutex::new(vec![])); let expects: Arc<Mutex<Vec<InFlightRequest>>> = Arc::new(Mutex::new(vec![]));
// Decorates the handler to remove the "subscription" from the expected // Decorates the handler to remove the "subscription" from the expected
// responses. We got our replies, so no timeout should occur. // responses. We got our replies, so no timeout should occur.
@ -294,10 +299,14 @@ impl TimeoutConnection {
let mut expects = exp.lock().unwrap(); let mut expects = exp.lock().unwrap();
let filtered = expects let filtered = expects
.iter() .iter()
.filter(|&&(typ, h, _): &&(Type, Option<Hash>, Instant)| { .filter(|x| {
msg_type != typ || h.is_some() && recv_h != h let res = x.msg_type != msg_type || x.hash.is_some() && x.hash != recv_h;
if res {
trace!(LOGGER, "timeout_conn: received: {:?}, {:?}", x.msg_type, x.hash);
}
res
}) })
.map(|&x| x) .cloned()
.collect::<Vec<_>>(); .collect::<Vec<_>>();
*expects = filtered; *expects = filtered;
@ -310,9 +319,11 @@ impl TimeoutConnection {
.interval(Duration::new(2, 0)) .interval(Duration::new(2, 0))
.fold((), move |_, _| { .fold((), move |_, _| {
let exp = exp.lock().unwrap(); let exp = exp.lock().unwrap();
for &(ty, h, t) in exp.deref() { trace!(LOGGER, "timeout_conn: currently registered: {:?}", exp.len());
if Instant::now() - t > Duration::new(5, 0) {
trace!(LOGGER, "Too long: {:?} {:?}", ty, h); for x in exp.iter() {
if Instant::now() - x.time > Duration::new(5, 0) {
trace!(LOGGER, "timeout_conn: timeout: {:?}, {:?}", x.msg_type, x.hash);
return Err(TimerError::TooLong); return Err(TimerError::TooLong);
} }
} }
@ -332,6 +343,8 @@ impl TimeoutConnection {
/// Sends a request and registers a timer on the provided message type and /// Sends a request and registers a timer on the provided message type and
/// optionally the hash of the sent data. /// optionally the hash of the sent data.
/// Skips the request if we have already sent the same response and
/// we are still waiting for a response (not yet timed out).
pub fn send_request<W: ser::Writeable>( pub fn send_request<W: ser::Writeable>(
&self, &self,
t: Type, t: Type,
@ -339,10 +352,40 @@ impl TimeoutConnection {
body: &W, body: &W,
expect_h: Option<(Hash)>, expect_h: Option<(Hash)>,
) -> Result<(), Error> { ) -> Result<(), Error> {
{
let expects = self.expected_responses.lock().unwrap();
let existing = expects.iter().find(|x| {
x.msg_type == rt && x.hash == expect_h
});
if let Some(x) = existing {
trace!(
LOGGER,
"timeout_conn: in_flight: {:?}, {:?} (skipping)",
x.msg_type,
x.hash,
);
return Ok(())
}
}
let _sent = try!(self.underlying.send_msg(t, body)); let _sent = try!(self.underlying.send_msg(t, body));
let mut expects = self.expected_responses.lock().unwrap(); {
expects.push((rt, expect_h, Instant::now())); let mut expects = self.expected_responses.lock().unwrap();
let req = InFlightRequest {
msg_type: rt,
hash: expect_h,
time: Instant::now(),
};
trace!(
LOGGER,
"timeout_conn: registering: {:?}, {:?}",
req.msg_type,
req.hash,
);
expects.push(req);
}
Ok(()) Ok(())
} }

View file

@ -236,7 +236,7 @@ impl Peers {
/// Iterate over the peer list and prune all peers we have /// Iterate over the peer list and prune all peers we have
/// lost connection to or have been deemed problematic. /// lost connection to or have been deemed problematic.
/// Also avoid connected peer count getting too high. /// Also avoid connected peer count getting too high.
pub fn clean_peers(&self, desired_count: usize) { pub fn clean_peers(&self, max_count: usize) {
let mut rm = vec![]; let mut rm = vec![];
// build a list of peers to be cleaned up // build a list of peers to be cleaned up
@ -261,11 +261,10 @@ impl Peers {
} }
// ensure we do not have too many connected peers // ensure we do not have too many connected peers
// really fighting with the double layer of rwlocks here...
let excess_count = { let excess_count = {
let peer_count = self.peer_count().clone() as usize; let peer_count = self.peer_count().clone() as usize;
if peer_count > desired_count { if peer_count > max_count {
peer_count - desired_count peer_count - max_count
} else { } else {
0 0
} }