From ed73db671f5b681c55e13819d33c18d21e4028c1 Mon Sep 17 00:00:00 2001 From: Gary Yu Date: Tue, 4 Sep 2018 16:52:11 +0800 Subject: [PATCH] fix for unstable travis-ci test on servers module (#1434) * fix for unstable travis-ci test on servers module * skip the PeerWithSelf connection request on sending side * logs and comments change; and remove an unnecessary unwrap() in test --- p2p/src/serv.rs | 31 ++++++++++++-- servers/src/grin/seed.rs | 77 ++++++++++++++++++++++++++++------ servers/tests/framework/mod.rs | 2 +- servers/tests/simulnet.rs | 26 ++++++++++++ 4 files changed, 120 insertions(+), 16 deletions(-) diff --git a/p2p/src/serv.rs b/p2p/src/serv.rs index ee895209f..889bbf7a9 100644 --- a/p2p/src/serv.rs +++ b/p2p/src/serv.rs @@ -144,17 +144,35 @@ impl Server { /// we're already connected to the provided address. pub fn connect(&self, addr: &SocketAddr) -> Result>, Error> { if Peer::is_denied(&self.config, &addr) { - debug!(LOGGER, "Peer {} denied, not connecting.", addr); + debug!( + LOGGER, + "connect_peer: peer {} denied, not connecting.", addr + ); return Err(Error::ConnectionClose); } + // check ip and port to see if we are trying to connect to ourselves + // todo: this can't detect all cases of PeerWithSelf, for example config.host is '0.0.0.0' + // + if self.config.port == addr.port() + && (addr.ip().is_loopback() || addr.ip() == self.config.host) + { + return Err(Error::PeerWithSelf); + } + if let Some(p) = self.peers.get_connected_peer(addr) { // if we're already connected to the addr, just return the peer trace!(LOGGER, "connect_peer: already connected {}", addr); return Ok(p); } - trace!(LOGGER, "connect_peer: connecting to {}", addr); + trace!( + LOGGER, + "connect_peer: on {}:{}. connecting to {}", + self.config.host, + self.config.port, + addr + ); match TcpStream::connect_timeout(addr, Duration::from_secs(10)) { Ok(mut stream) => { let addr = SocketAddr::new(self.config.host, self.config.port); @@ -176,7 +194,14 @@ impl Server { Ok(added) } Err(e) => { - debug!(LOGGER, "Could not connect to {}: {:?}", addr, e); + debug!( + LOGGER, + "connect_peer: on {}:{}. Could not connect to {}: {:?}", + self.config.host, + self.config.port, + addr, + e + ); Err(Error::Connection(e)) } } diff --git a/servers/src/grin/seed.rs b/servers/src/grin/seed.rs index 75fc03249..8f87cf339 100644 --- a/servers/src/grin/seed.rs +++ b/servers/src/grin/seed.rs @@ -18,6 +18,7 @@ use chrono::prelude::Utc; use chrono::Duration; +use std::io; use std::net::{SocketAddr, ToSocketAddrs}; use std::str; use std::sync::atomic::{AtomicBool, Ordering}; @@ -138,8 +139,10 @@ fn monitor_peers( debug!( LOGGER, - "monitor_peers: {} connected ({} most_work). \ + "monitor_peers: on {}:{}, {} connected ({} most_work). \ all {} = {} healthy + {} banned + {} defunct", + config.host, + config.port, peers.peer_count(), peers.most_work_peers().len(), total_count, @@ -161,7 +164,10 @@ fn monitor_peers( let mut connected_peers: Vec = vec![]; for p in peers.connected_peers() { if let Ok(p) = p.try_read() { - debug!(LOGGER, "monitor_peers: ask {} for more peers", p.info.addr); + debug!( + LOGGER, + "monitor_peers: {}:{} ask {} for more peers", config.host, config.port, p.info.addr, + ); let _ = p.send_peer_request(capabilities); connected_peers.push(p.info.addr) } else { @@ -193,7 +199,10 @@ fn monitor_peers( config.peer_max_count() as usize, ); for p in new_peers.iter().filter(|p| !peers.is_known(&p.addr)) { - debug!(LOGGER, "monitor_peers: queue to soon try {}", p.addr); + debug!( + LOGGER, + "monitor_peers: on {}:{}, queue to soon try {}", config.host, config.port, p.addr, + ); tx.send(p.addr).unwrap(); } } @@ -266,18 +275,62 @@ fn listen_for_addrs( let _ = thread::Builder::new() .name("peer_connect".to_string()) .spawn(move || { - let connect_peer = p2p_c.connect(&addr); - match connect_peer { - Ok(p) => { - trace!(LOGGER, "connect_and_req: ok. attempting send_peer_request"); - if let Ok(p) = p.try_read() { - let _ = p.send_peer_request(capab); + let mut connect_retry_count = 0; + loop { + let connect_peer = p2p_c.connect(&addr); + match connect_peer { + Ok(p) => { + trace!( + LOGGER, + "connect_and_req: on {}:{}. connect to {} ok. attempting send_peer_request", + p2p_c.config.host, + p2p_c.config.port, + addr + ); + if let Ok(p) = p.try_read() { + let _ = p.send_peer_request(capab); + } + let _ = peers_c.update_state(addr, p2p::State::Healthy); + break; + } + Err(e) => { + debug!( + LOGGER, + "connect_and_req: on {}:{}. connect to {} is Defunct. {:?}", + p2p_c.config.host, + p2p_c.config.port, + addr, + e, + ); + let _ = peers_c.update_state(addr, p2p::State::Defunct); + + // don't retry if connection refused or PeerWithSelf + match e { + p2p::Error::Connection(io_err) => { + if io::ErrorKind::ConnectionRefused == io_err.kind() { + break; + } + } + p2p::Error::PeerWithSelf => break, + _ => continue, + } } } - Err(e) => { - debug!(LOGGER, "connect_and_req: {} is Defunct; {:?}", addr, e); - let _ = peers_c.update_state(addr, p2p::State::Defunct); + + // retry for 3 times + thread::sleep(time::Duration::from_secs(1)); + connect_retry_count += 1; + if connect_retry_count >= 3 { + break; } + debug!( + LOGGER, + "connect_and_req: on {}:{}. connect to {} retrying {}", + p2p_c.config.host, + p2p_c.config.port, + addr, + connect_retry_count, + ); } }); } diff --git a/servers/tests/framework/mod.rs b/servers/tests/framework/mod.rs index 898903226..db24b6515 100644 --- a/servers/tests/framework/mod.rs +++ b/servers/tests/framework/mod.rs @@ -225,7 +225,7 @@ impl LocalServerContainer { for p in &mut self.peer_list { println!("{} connecting to peer: {}", self.config.p2p_server_port, p); - s.connect_peer(p.parse().unwrap()).unwrap(); + let _ = s.connect_peer(p.parse().unwrap()); } if self.wallet_is_running { diff --git a/servers/tests/simulnet.rs b/servers/tests/simulnet.rs index 28cab7a26..2b0474ec5 100644 --- a/servers/tests/simulnet.rs +++ b/servers/tests/simulnet.rs @@ -223,6 +223,8 @@ fn simulate_block_propagation() { // monitor for a change of head on a different server and check whether // chain height has changed + let mut success = false; + let mut time_spent = 0; loop { let mut count = 0; for n in 0..5 { @@ -231,13 +233,22 @@ fn simulate_block_propagation() { } } if count == 5 { + success = true; break; } thread::sleep(time::Duration::from_millis(1_000)); + time_spent += 1; + if time_spent >= 60 { + break; + } } for n in 0..5 { servers[n].stop(); } + assert_eq!(true, success); + + // wait servers fully stop before start next automated test + thread::sleep(time::Duration::from_millis(1_000)); } /// Creates 2 different disconnected servers, mine a few blocks on one, connect @@ -263,8 +274,14 @@ fn simulate_full_sync() { let s1_header = s1.chain.head_header().unwrap(); // Wait for s2 to sync up to and including the header from s1. + let mut time_spent = 0; while s2.head().height < s1_header.height { thread::sleep(time::Duration::from_millis(1_000)); + time_spent += 1; + if time_spent >= 60 { + println!("sync fail. s2.head().height: {}, s1_header.height: {}", s2.head().height, s1_header.height); + break; + } } // Confirm both s1 and s2 see a consistent header at that height. @@ -274,6 +291,9 @@ fn simulate_full_sync() { // Stop our servers cleanly. s1.stop(); s2.stop(); + + // wait servers fully stop before start next automated test + thread::sleep(time::Duration::from_millis(1_000)); } /// Creates 2 different disconnected servers, mine a few blocks on one, connect @@ -322,6 +342,9 @@ fn simulate_fast_sync() { // Stop our servers cleanly. s1.stop(); s2.stop(); + + // wait servers fully stop before start next automated test + thread::sleep(time::Duration::from_millis(1_000)); } // #[test] @@ -361,6 +384,9 @@ fn simulate_fast_sync_double() { } s1.stop(); s2.stop(); + + // wait servers fully stop before start next automated test + thread::sleep(time::Duration::from_millis(1_000)); } pub fn create_wallet(