diff --git a/grin/src/seed.rs b/grin/src/seed.rs index f216c1fb8..80578e723 100644 --- a/grin/src/seed.rs +++ b/grin/src/seed.rs @@ -16,13 +16,10 @@ //! a mining worker implementation //! -use rand::{thread_rng, Rng}; -use std::cmp::min; use std::net::SocketAddr; use std::str::{self, FromStr}; use std::sync::Arc; use std::time; -use std::time::Duration; use cpupool; use futures::{self, future, Future, Stream}; @@ -80,15 +77,16 @@ impl Seeder { tx: mpsc::UnboundedSender, ) -> Box> { let p2p_server = self.p2p.clone(); + let capabilities = self.capabilities.clone(); // now spawn a new future to regularly check if we need to acquire more peers // and if so, gets them from db let mon_loop = Timer::default() - .interval(time::Duration::from_secs(10)) + .interval(time::Duration::from_secs(30)) .for_each(move |_| { debug!( LOGGER, - "monitoring_peers: {} / {} / {}", + "monitor_peers: {} / {} / {}", p2p_server.most_work_peers().len(), p2p_server.connected_peers().len(), p2p_server.all_peers().len(), @@ -110,7 +108,7 @@ impl Seeder { debug!( LOGGER, - "monitoring_peers: all - {}, healthy - {}, banned - {}, defunct - {}", + "monitor_peers: all - {}, healthy - {}, banned - {}, defunct - {}", all_peers.len(), healthy_count, banned_count, @@ -118,27 +116,44 @@ impl Seeder { ); // maintenance step first, clean up p2p server peers - let _ = p2p_server.clean_peers(); + { + p2p_server.clean_peers(PEER_PREFERRED_COUNT as usize); + } - // we don't have enough peers, getting more from db + // not enough peers, getting more from db if p2p_server.peer_count() < PEER_PREFERRED_COUNT { - let mut peers = p2p_server.find_peers( + // loop over connected peers + // ask them for their list of peers + for p in p2p_server.connected_peers() { + if let Ok(p) = p.try_read() { + debug!( + LOGGER, + "monitor_peers: asking {} for more peers", + p.info.addr, + ); + let _ = p.send_peer_request(capabilities); + } else { + warn!( + LOGGER, + "monitor_peers: failed to get read lock on peer", + ); + } + } + + // find some peers from our db + // and queue them up for a connection attempt + let peers = p2p_server.find_peers( p2p::State::Healthy, p2p::UNKNOWN, - (2 * PEER_MAX_COUNT) as usize, + 100, ); - peers.retain(|p| !p2p_server.is_known(&p.addr)); - if peers.len() > 0 { + for p in peers { debug!( LOGGER, - "Got {} peers from db, trying to connect.", - peers.len() + "monitor_peers: queueing up {} for connection (may be already connected)", + p.addr, ); - thread_rng().shuffle(&mut peers[..]); - let sz = min(PEER_PREFERRED_COUNT as usize, peers.len()); - for p in &peers[0..sz] { - tx.unbounded_send(p.addr).unwrap(); - } + tx.unbounded_send(p.addr).unwrap(); } } Ok(()) @@ -166,29 +181,28 @@ impl Seeder { let peers = p2p_server.find_peers( p2p::State::Healthy, p2p::FULL_HIST, - (2 * PEER_MAX_COUNT) as usize, + 100, ); Ok(peers) }) - .and_then(|mut peers| { + .and_then(|peers| { // if so, get their addresses, otherwise use our seeds if peers.len() > 3 { - thread_rng().shuffle(&mut peers[..]); Box::new(future::ok(peers.iter().map(|p| p.addr).collect::>())) } else { seed_list } }) .and_then(move |peer_addrs| { - // connect to this first set of addresses - let sz = min(PEER_PREFERRED_COUNT as usize, peer_addrs.len()); - for addr in &peer_addrs[0..sz] { - debug!(LOGGER, "Connecting to seed: {}.", addr); - tx.unbounded_send(*addr).unwrap(); - } if peer_addrs.len() == 0 { warn!(LOGGER, "No seeds were retrieved."); } + + // connect to this first set of addresses + for addr in peer_addrs { + tx.unbounded_send(addr).unwrap(); + } + Ok(()) }); Box::new(seeder) @@ -285,29 +299,23 @@ fn connect_and_req( h: reactor::Handle, addr: SocketAddr, ) -> Box> { - let connect_peer = p2p.connect_peer(addr, h).map_err(|_| ()); - let timer = Timer::default(); - let timeout = timer.timeout(connect_peer, Duration::from_secs(5)); + let connect_peer = p2p.connect_peer(addr, h); let p2p_server = p2p.clone(); - - let fut = timeout.then(move |p| { + let fut = connect_peer.then(move |p| { match p { Ok(Some(p)) => { - let p = p.read().unwrap(); - let peer_result = p.send_peer_request(capab); - match peer_result { - Ok(()) => {} - Err(_) => {} + debug!(LOGGER, "connect_and_req: ok, will attempt send_peer_request"); + if let Ok(p) = p.try_read() { + let _ = p.send_peer_request(capab); } - } - Err(_) => { - let update_result = p2p_server.update_state(addr, p2p::State::Defunct); - match update_result { - Ok(()) => {} - Err(_) => {} - } - } - _ => {} + }, + Ok(None) => { + debug!(LOGGER, "connect_and_req: ok but none inner (what does this mean?), {}", addr); + }, + Err(e) => { + debug!(LOGGER, "connect_and_req: err - {:?}, {}, flagging as defunct", e, addr); + let _ = p2p_server.update_state(addr, p2p::State::Defunct); + }, } Ok(()) }); diff --git a/grin/src/sync.rs b/grin/src/sync.rs index d285e5ddd..4e0963ee6 100644 --- a/grin/src/sync.rs +++ b/grin/src/sync.rs @@ -106,12 +106,14 @@ fn body_sync( } hashes.reverse(); - // let peer_count = p2p_server.most_work_peers().len(); + let peer_count = { + p2p_server.most_work_peers().len() + }; + let hashes_to_get = hashes .iter() .filter(|x| !chain.get_block(&x).is_ok()) - .take(10) - // .take(peer_count * 2) + .take(peer_count * 2) .cloned() .collect::>(); @@ -162,26 +164,22 @@ fn request_headers( chain: Arc, ) -> Result<(), Error> { let locator = get_locator(chain)?; - match peer.try_read() { - Ok(peer) => { - debug!( - LOGGER, - "sync: request_headers: asking {} for headers, {:?}", - peer.info.addr, - locator, - ); - let _ = peer.send_header_request(locator); - Ok(()) - }, - Err(_) => { - // not much we can do here, log and try again next time - warn!( - LOGGER, - "sync: request_headers: failed to get read lock on peer", - ); - Ok(()) - }, + if let Ok(peer) = peer.try_read() { + debug!( + LOGGER, + "sync: request_headers: asking {} for headers, {:?}", + peer.info.addr, + locator, + ); + let _ = peer.send_header_request(locator); + } else { + // not much we can do here, log and try again next time + debug!( + LOGGER, + "sync: request_headers: failed to get read lock on peer", + ); } + Ok(()) } /// We build a locator based on sync_head. diff --git a/p2p/src/server.rs b/p2p/src/server.rs index 49361ce9b..04ff4b064 100644 --- a/p2p/src/server.rs +++ b/p2p/src/server.rs @@ -113,7 +113,7 @@ impl Server { // main peer acceptance future handling handshake let hp = h.clone(); let peers_listen = socket.incoming().map_err(From::from).map(move |(conn, _)| { - + // aaaand.. reclone for the internal closures let adapter = adapter.clone(); let store = store.clone(); @@ -216,9 +216,12 @@ impl Server { ) -> Box>>, Error = Error>> { if let Some(p) = self.get_peer(&addr) { // if we're already connected to the addr, just return the peer + debug!(LOGGER, "connect_peer: already connected {}", addr); return Box::new(future::ok(Some(p))); } + debug!(LOGGER, "connect_peer: connecting to {}", addr); + // cloneapalooza let peers = self.peers.clone(); let handshake = self.handshake.clone(); @@ -226,9 +229,17 @@ impl Server { let capab = self.capabilities.clone(); let self_addr = SocketAddr::new(self.config.host, self.config.port); - let socket = TcpStream::connect(&addr, &h).map_err(|e| Error::Connection(e)); + let timer = Timer::default(); + let socket_connect = timer.timeout( + TcpStream::connect(&addr, &h), + Duration::from_secs(5), + ).map_err(|e| { + debug!(LOGGER, "connect_peer: socket connect error - {:?}", e); + Error::Connection(e) + }); + let h2 = h.clone(); - let request = socket + let request = socket_connect .and_then(move |socket| { let peers = peers.clone(); let total_diff = adapter.clone().total_difficulty(); @@ -272,28 +283,59 @@ impl Server { } /// Have the server iterate over its peer list and prune all peers we have - /// lost connection to or have been deemed problematic. The removed peers - /// are returned. - pub fn clean_peers(&self) -> Vec>> { + /// lost connection to or have been deemed problematic. + /// Also avoid connected peer count getting too high. + pub fn clean_peers(&self, desired_count: usize) { let mut rm = vec![]; // build a list of peers to be cleaned up for peer in self.connected_peers() { let peer_inner = peer.read().unwrap(); - if peer_inner.is_banned() || !peer_inner.is_connected() { + if peer_inner.is_banned() { + debug!(LOGGER, "cleaning {:?}, peer banned", peer_inner.info.addr); + rm.push(peer.clone()); + } else if !peer_inner.is_connected() { debug!(LOGGER, "cleaning {:?}, not connected", peer_inner.info.addr); rm.push(peer.clone()); } } // now clean up peer map based on the list to remove - let mut peers = self.peers.write().unwrap(); - for p in rm.clone() { - let p = p.read().unwrap(); - peers.remove(&p.info.addr); + { + let mut peers = self.peers.write().unwrap(); + for p in rm.clone() { + let p = p.read().unwrap(); + peers.remove(&p.info.addr); + } } - rm + // ensure we do not have too many connected peers + // really fighting with the double layer of rwlocks here... + let excess_count = { + let peer_count = self.peer_count().clone() as usize; + if peer_count > desired_count { + peer_count - desired_count + } else { + 0 + } + }; + + // map peers to addrs in a block to bound how long we keep the read lock for + let addrs = { + self.connected_peers().iter().map(|x| { + let p = x.read().unwrap(); + p.info.addr.clone() + }).collect::>() + }; + + // now remove them taking a short-lived write lock each time + // maybe better to take write lock once and remove them all? + for x in addrs + .iter() + .take(excess_count) { + let mut peers = self.peers.write().unwrap(); + peers.remove(x); + } } /// Return vec of all peers that currently have the most worked branch, @@ -342,23 +384,8 @@ impl Server { } /// Returns a random connected peer. - /// Only considers peers with at least our total_difficulty (ignores out of sync peers). pub fn random_peer(&self) -> Option>> { - let difficulty = self.adapter.total_difficulty(); - - let peers = self - .connected_peers() - .iter() - .filter(|x| { - let peer = x.read().unwrap(); - peer.is_connected() && peer.info.total_difficulty >= difficulty - }) - .cloned() - .collect::>(); - - if peers.len() == 0 { - return None; - } + let peers = self.connected_peers(); Some(thread_rng().choose(&peers).unwrap().clone()) } @@ -495,12 +522,20 @@ fn with_timeout( fut: Box, Error = Error>>, h: &reactor::Handle, ) -> Box> { - let timeout = reactor::Timeout::new(Duration::new(5, 0), h).unwrap(); + let timeout = reactor::Timeout::new(Duration::from_secs(5), h).unwrap(); let timed = fut.select(timeout.map(Err).from_err()) .then(|res| match res { - Ok((Ok(inner), _timeout)) => Ok(inner), - Ok((_, _accept)) => Err(Error::Timeout), - Err((e, _other)) => Err(e), + Ok((Ok(inner), _timeout)) => { + Ok(inner) + }, + Ok((Err(inner), _accept)) => { + debug!(LOGGER, "with_timeout: ok but nested - {:?} (treating this as timeout)", inner); + Err(Error::Timeout) + }, + Err((e, _other)) => { + debug!(LOGGER, "with_timeout: err - {:?} (treating this as an error)", e); + Err(e) + }, }); Box::new(timed) }