diff --git a/chain/src/chain.rs b/chain/src/chain.rs index 081202d03..419f37e83 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -211,6 +211,11 @@ impl Chain { } } + pub fn is_orphan(&self, hash: &Hash) -> bool { + let orphans = self.orphans.lock().unwrap(); + orphans.iter().any(|&(_, ref x)| x.hash() == hash.clone()) + } + /// Pop orphans out of the queue and check if we can now accept them. fn check_orphans(&self) { // first check how many we have to retry, unfort. we can't extend the lock diff --git a/grin/src/seed.rs b/grin/src/seed.rs index 5ab7971b8..9de05704c 100644 --- a/grin/src/seed.rs +++ b/grin/src/seed.rs @@ -115,7 +115,7 @@ impl Seeder { // maintenance step first, clean up p2p server peers { - peers.clean_peers(PEER_PREFERRED_COUNT as usize); + peers.clean_peers(PEER_MAX_COUNT as usize); } // not enough peers, getting more from db diff --git a/grin/src/sync.rs b/grin/src/sync.rs index 1571ff426..93eb8425b 100644 --- a/grin/src/sync.rs +++ b/grin/src/sync.rs @@ -119,7 +119,11 @@ fn body_sync(peers: Peers, chain: Arc) { let hashes_to_get = hashes .iter() - .filter(|x| !chain.get_block(&x).is_ok()) + .filter(|x| { + // only ask for blocks that we have not yet processed + // either successfully stored or in our orphan list + !chain.get_block(x).is_ok() && !chain.is_orphan(x) + }) .take(block_count) .cloned() .collect::>(); diff --git a/p2p/src/conn.rs b/p2p/src/conn.rs index 9b7e5c7dd..8d0d0400d 100644 --- a/p2p/src/conn.rs +++ b/p2p/src/conn.rs @@ -16,7 +16,6 @@ //! or receiving data from the TCP socket, as well as dealing with timeouts. use std::iter; -use std::ops::Deref; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; @@ -268,8 +267,14 @@ impl Connection { /// a timeout. pub struct TimeoutConnection { underlying: Connection, + expected_responses: Arc>>, +} - expected_responses: Arc, Instant)>>>, +#[derive(Debug, Clone)] +struct InFlightRequest { + msg_type: Type, + hash: Option, + time: Instant, } impl TimeoutConnection { @@ -282,7 +287,7 @@ impl TimeoutConnection { where F: Handler + 'static, { - let expects = Arc::new(Mutex::new(vec![])); + let expects: Arc>> = Arc::new(Mutex::new(vec![])); // Decorates the handler to remove the "subscription" from the expected // responses. We got our replies, so no timeout should occur. @@ -294,10 +299,14 @@ impl TimeoutConnection { let mut expects = exp.lock().unwrap(); let filtered = expects .iter() - .filter(|&&(typ, h, _): &&(Type, Option, Instant)| { - msg_type != typ || h.is_some() && recv_h != h + .filter(|x| { + let res = x.msg_type != msg_type || x.hash.is_some() && x.hash != recv_h; + if res { + trace!(LOGGER, "timeout_conn: received: {:?}, {:?}", x.msg_type, x.hash); + } + res }) - .map(|&x| x) + .cloned() .collect::>(); *expects = filtered; @@ -310,9 +319,11 @@ impl TimeoutConnection { .interval(Duration::new(2, 0)) .fold((), move |_, _| { let exp = exp.lock().unwrap(); - for &(ty, h, t) in exp.deref() { - if Instant::now() - t > Duration::new(5, 0) { - trace!(LOGGER, "Too long: {:?} {:?}", ty, h); + trace!(LOGGER, "timeout_conn: currently registered: {:?}", exp.len()); + + for x in exp.iter() { + if Instant::now() - x.time > Duration::new(5, 0) { + trace!(LOGGER, "timeout_conn: timeout: {:?}, {:?}", x.msg_type, x.hash); return Err(TimerError::TooLong); } } @@ -332,6 +343,8 @@ impl TimeoutConnection { /// Sends a request and registers a timer on the provided message type and /// optionally the hash of the sent data. + /// Skips the request if we have already sent the same response and + /// we are still waiting for a response (not yet timed out). pub fn send_request( &self, t: Type, @@ -339,10 +352,40 @@ impl TimeoutConnection { body: &W, expect_h: Option<(Hash)>, ) -> Result<(), Error> { + { + let expects = self.expected_responses.lock().unwrap(); + let existing = expects.iter().find(|x| { + x.msg_type == rt && x.hash == expect_h + }); + if let Some(x) = existing { + trace!( + LOGGER, + "timeout_conn: in_flight: {:?}, {:?} (skipping)", + x.msg_type, + x.hash, + ); + return Ok(()) + } + } + let _sent = try!(self.underlying.send_msg(t, body)); - let mut expects = self.expected_responses.lock().unwrap(); - expects.push((rt, expect_h, Instant::now())); + { + let mut expects = self.expected_responses.lock().unwrap(); + let req = InFlightRequest { + msg_type: rt, + hash: expect_h, + time: Instant::now(), + }; + trace!( + LOGGER, + "timeout_conn: registering: {:?}, {:?}", + req.msg_type, + req.hash, + ); + expects.push(req); + } + Ok(()) } diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index 892d68b7f..67e1d9f65 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -236,7 +236,7 @@ impl Peers { /// Iterate over the peer list and prune all peers we have /// lost connection to or have been deemed problematic. /// Also avoid connected peer count getting too high. - pub fn clean_peers(&self, desired_count: usize) { + pub fn clean_peers(&self, max_count: usize) { let mut rm = vec![]; // build a list of peers to be cleaned up @@ -261,11 +261,10 @@ impl Peers { } // ensure we do not have too many connected peers - // really fighting with the double layer of rwlocks here... let excess_count = { let peer_count = self.peer_count().clone() as usize; - if peer_count > desired_count { - peer_count - desired_count + if peer_count > max_count { + peer_count - max_count } else { 0 }