fix for unstable travis-ci test on servers module (#1434)

* fix for unstable travis-ci test on servers module

* skip the PeerWithSelf connection request on sending side

* logs and comments change; and remove an unnecessary unwrap() in test
This commit is contained in:
Gary Yu 2018-09-04 16:52:11 +08:00 committed by hashmap
parent 0172a3e020
commit ed73db671f
4 changed files with 120 additions and 16 deletions

View file

@ -144,17 +144,35 @@ impl Server {
/// we're already connected to the provided address. /// we're already connected to the provided address.
pub fn connect(&self, addr: &SocketAddr) -> Result<Arc<RwLock<Peer>>, Error> { pub fn connect(&self, addr: &SocketAddr) -> Result<Arc<RwLock<Peer>>, Error> {
if Peer::is_denied(&self.config, &addr) { if Peer::is_denied(&self.config, &addr) {
debug!(LOGGER, "Peer {} denied, not connecting.", addr); debug!(
LOGGER,
"connect_peer: peer {} denied, not connecting.", addr
);
return Err(Error::ConnectionClose); return Err(Error::ConnectionClose);
} }
// check ip and port to see if we are trying to connect to ourselves
// todo: this can't detect all cases of PeerWithSelf, for example config.host is '0.0.0.0'
//
if self.config.port == addr.port()
&& (addr.ip().is_loopback() || addr.ip() == self.config.host)
{
return Err(Error::PeerWithSelf);
}
if let Some(p) = self.peers.get_connected_peer(addr) { if let Some(p) = self.peers.get_connected_peer(addr) {
// if we're already connected to the addr, just return the peer // if we're already connected to the addr, just return the peer
trace!(LOGGER, "connect_peer: already connected {}", addr); trace!(LOGGER, "connect_peer: already connected {}", addr);
return Ok(p); return Ok(p);
} }
trace!(LOGGER, "connect_peer: connecting to {}", addr); trace!(
LOGGER,
"connect_peer: on {}:{}. connecting to {}",
self.config.host,
self.config.port,
addr
);
match TcpStream::connect_timeout(addr, Duration::from_secs(10)) { match TcpStream::connect_timeout(addr, Duration::from_secs(10)) {
Ok(mut stream) => { Ok(mut stream) => {
let addr = SocketAddr::new(self.config.host, self.config.port); let addr = SocketAddr::new(self.config.host, self.config.port);
@ -176,7 +194,14 @@ impl Server {
Ok(added) Ok(added)
} }
Err(e) => { Err(e) => {
debug!(LOGGER, "Could not connect to {}: {:?}", addr, e); debug!(
LOGGER,
"connect_peer: on {}:{}. Could not connect to {}: {:?}",
self.config.host,
self.config.port,
addr,
e
);
Err(Error::Connection(e)) Err(Error::Connection(e))
} }
} }

View file

@ -18,6 +18,7 @@
use chrono::prelude::Utc; use chrono::prelude::Utc;
use chrono::Duration; use chrono::Duration;
use std::io;
use std::net::{SocketAddr, ToSocketAddrs}; use std::net::{SocketAddr, ToSocketAddrs};
use std::str; use std::str;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
@ -138,8 +139,10 @@ fn monitor_peers(
debug!( debug!(
LOGGER, LOGGER,
"monitor_peers: {} connected ({} most_work). \ "monitor_peers: on {}:{}, {} connected ({} most_work). \
all {} = {} healthy + {} banned + {} defunct", all {} = {} healthy + {} banned + {} defunct",
config.host,
config.port,
peers.peer_count(), peers.peer_count(),
peers.most_work_peers().len(), peers.most_work_peers().len(),
total_count, total_count,
@ -161,7 +164,10 @@ fn monitor_peers(
let mut connected_peers: Vec<SocketAddr> = vec![]; let mut connected_peers: Vec<SocketAddr> = vec![];
for p in peers.connected_peers() { for p in peers.connected_peers() {
if let Ok(p) = p.try_read() { if let Ok(p) = p.try_read() {
debug!(LOGGER, "monitor_peers: ask {} for more peers", p.info.addr); debug!(
LOGGER,
"monitor_peers: {}:{} ask {} for more peers", config.host, config.port, p.info.addr,
);
let _ = p.send_peer_request(capabilities); let _ = p.send_peer_request(capabilities);
connected_peers.push(p.info.addr) connected_peers.push(p.info.addr)
} else { } else {
@ -193,7 +199,10 @@ fn monitor_peers(
config.peer_max_count() as usize, config.peer_max_count() as usize,
); );
for p in new_peers.iter().filter(|p| !peers.is_known(&p.addr)) { for p in new_peers.iter().filter(|p| !peers.is_known(&p.addr)) {
debug!(LOGGER, "monitor_peers: queue to soon try {}", p.addr); debug!(
LOGGER,
"monitor_peers: on {}:{}, queue to soon try {}", config.host, config.port, p.addr,
);
tx.send(p.addr).unwrap(); tx.send(p.addr).unwrap();
} }
} }
@ -266,19 +275,63 @@ fn listen_for_addrs(
let _ = thread::Builder::new() let _ = thread::Builder::new()
.name("peer_connect".to_string()) .name("peer_connect".to_string())
.spawn(move || { .spawn(move || {
let mut connect_retry_count = 0;
loop {
let connect_peer = p2p_c.connect(&addr); let connect_peer = p2p_c.connect(&addr);
match connect_peer { match connect_peer {
Ok(p) => { Ok(p) => {
trace!(LOGGER, "connect_and_req: ok. attempting send_peer_request"); trace!(
LOGGER,
"connect_and_req: on {}:{}. connect to {} ok. attempting send_peer_request",
p2p_c.config.host,
p2p_c.config.port,
addr
);
if let Ok(p) = p.try_read() { if let Ok(p) = p.try_read() {
let _ = p.send_peer_request(capab); let _ = p.send_peer_request(capab);
} }
let _ = peers_c.update_state(addr, p2p::State::Healthy);
break;
} }
Err(e) => { Err(e) => {
debug!(LOGGER, "connect_and_req: {} is Defunct; {:?}", addr, e); debug!(
LOGGER,
"connect_and_req: on {}:{}. connect to {} is Defunct. {:?}",
p2p_c.config.host,
p2p_c.config.port,
addr,
e,
);
let _ = peers_c.update_state(addr, p2p::State::Defunct); let _ = peers_c.update_state(addr, p2p::State::Defunct);
// don't retry if connection refused or PeerWithSelf
match e {
p2p::Error::Connection(io_err) => {
if io::ErrorKind::ConnectionRefused == io_err.kind() {
break;
} }
} }
p2p::Error::PeerWithSelf => break,
_ => continue,
}
}
}
// retry for 3 times
thread::sleep(time::Duration::from_secs(1));
connect_retry_count += 1;
if connect_retry_count >= 3 {
break;
}
debug!(
LOGGER,
"connect_and_req: on {}:{}. connect to {} retrying {}",
p2p_c.config.host,
p2p_c.config.port,
addr,
connect_retry_count,
);
}
}); });
} }
} }

View file

@ -225,7 +225,7 @@ impl LocalServerContainer {
for p in &mut self.peer_list { for p in &mut self.peer_list {
println!("{} connecting to peer: {}", self.config.p2p_server_port, p); println!("{} connecting to peer: {}", self.config.p2p_server_port, p);
s.connect_peer(p.parse().unwrap()).unwrap(); let _ = s.connect_peer(p.parse().unwrap());
} }
if self.wallet_is_running { if self.wallet_is_running {

View file

@ -223,6 +223,8 @@ fn simulate_block_propagation() {
// monitor for a change of head on a different server and check whether // monitor for a change of head on a different server and check whether
// chain height has changed // chain height has changed
let mut success = false;
let mut time_spent = 0;
loop { loop {
let mut count = 0; let mut count = 0;
for n in 0..5 { for n in 0..5 {
@ -231,13 +233,22 @@ fn simulate_block_propagation() {
} }
} }
if count == 5 { if count == 5 {
success = true;
break; break;
} }
thread::sleep(time::Duration::from_millis(1_000)); thread::sleep(time::Duration::from_millis(1_000));
time_spent += 1;
if time_spent >= 60 {
break;
}
} }
for n in 0..5 { for n in 0..5 {
servers[n].stop(); servers[n].stop();
} }
assert_eq!(true, success);
// wait servers fully stop before start next automated test
thread::sleep(time::Duration::from_millis(1_000));
} }
/// Creates 2 different disconnected servers, mine a few blocks on one, connect /// Creates 2 different disconnected servers, mine a few blocks on one, connect
@ -263,8 +274,14 @@ fn simulate_full_sync() {
let s1_header = s1.chain.head_header().unwrap(); let s1_header = s1.chain.head_header().unwrap();
// Wait for s2 to sync up to and including the header from s1. // Wait for s2 to sync up to and including the header from s1.
let mut time_spent = 0;
while s2.head().height < s1_header.height { while s2.head().height < s1_header.height {
thread::sleep(time::Duration::from_millis(1_000)); thread::sleep(time::Duration::from_millis(1_000));
time_spent += 1;
if time_spent >= 60 {
println!("sync fail. s2.head().height: {}, s1_header.height: {}", s2.head().height, s1_header.height);
break;
}
} }
// Confirm both s1 and s2 see a consistent header at that height. // Confirm both s1 and s2 see a consistent header at that height.
@ -274,6 +291,9 @@ fn simulate_full_sync() {
// Stop our servers cleanly. // Stop our servers cleanly.
s1.stop(); s1.stop();
s2.stop(); s2.stop();
// wait servers fully stop before start next automated test
thread::sleep(time::Duration::from_millis(1_000));
} }
/// Creates 2 different disconnected servers, mine a few blocks on one, connect /// Creates 2 different disconnected servers, mine a few blocks on one, connect
@ -322,6 +342,9 @@ fn simulate_fast_sync() {
// Stop our servers cleanly. // Stop our servers cleanly.
s1.stop(); s1.stop();
s2.stop(); s2.stop();
// wait servers fully stop before start next automated test
thread::sleep(time::Duration::from_millis(1_000));
} }
// #[test] // #[test]
@ -361,6 +384,9 @@ fn simulate_fast_sync_double() {
} }
s1.stop(); s1.stop();
s2.stop(); s2.stop();
// wait servers fully stop before start next automated test
thread::sleep(time::Duration::from_millis(1_000));
} }
pub fn create_wallet( pub fn create_wallet(