fix: one peer occupy multiple tcp connections (#2262)

* shutdown the tcpstream when peer connect or accept fail

* remove the unnecessary 3 times retry on peer connecting

* connect/accept are actually handshakings instead of tcpstream connect/accept
This commit is contained in:
Gary Yu 2018-12-31 19:24:30 +08:00 committed by GitHub
parent ea7eea3f84
commit 738d49d560
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 49 additions and 40 deletions

View file

@ -14,7 +14,7 @@
use crate::util::RwLock;
use std::collections::VecDeque;
use std::net::{Shutdown, SocketAddr, TcpStream};
use std::net::{SocketAddr, TcpStream};
use std::sync::Arc;
use chrono::prelude::*;
@ -28,7 +28,13 @@ use crate::msg::{
use crate::peer::Peer;
use crate::types::{Capabilities, Direction, Error, P2PConfig, PeerInfo, PeerLiveInfo};
/// Local generated nonce for peer connecting.
/// Used for self-connecting detection (on receiver side),
/// nonce(s) in recent 100 connecting requests are saved
const NONCES_CAP: usize = 100;
/// Socket addresses of self, extracted from stream when a self-connecting is detected.
/// Used in connecting request to avoid self-connecting request,
/// 10 should be enough since most of servers don't have more than 10 IP addresses.
const ADDRS_CAP: usize = 10;
/// Handles the handshake negotiation when two peers connect and decides on
@ -157,9 +163,6 @@ impl Handshake {
if addrs.len() >= ADDRS_CAP {
addrs.pop_front();
}
if let Err(e) = conn.shutdown(Shutdown::Both) {
debug!("Error shutting down conn: {:?}", e);
}
return Err(Error::PeerWithSelf);
}
}

View file

@ -14,7 +14,7 @@
use crate::util::{Mutex, RwLock};
use std::fs::File;
use std::net::{SocketAddr, TcpStream};
use std::net::{Shutdown, SocketAddr, TcpStream};
use std::sync::Arc;
use crate::conn;
@ -71,8 +71,22 @@ impl Peer {
hs: &Handshake,
adapter: Arc<dyn NetAdapter>,
) -> Result<Peer, Error> {
let info = hs.accept(capab, total_difficulty, conn)?;
Ok(Peer::new(info, adapter))
debug!("accept: handshaking from {:?}", conn.peer_addr());
let info = hs.accept(capab, total_difficulty, conn);
match info {
Ok(peer_info) => Ok(Peer::new(peer_info, adapter)),
Err(e) => {
debug!(
"accept: handshaking from {:?} failed with error: {:?}",
conn.peer_addr(),
e
);
if let Err(e) = conn.shutdown(Shutdown::Both) {
debug!("Error shutting down conn: {:?}", e);
}
Err(e)
}
}
}
pub fn connect(
@ -83,8 +97,22 @@ impl Peer {
hs: &Handshake,
na: Arc<dyn NetAdapter>,
) -> Result<Peer, Error> {
let info = hs.initiate(capab, total_difficulty, self_addr, conn)?;
Ok(Peer::new(info, na))
debug!("connect: handshaking with {:?}", conn.peer_addr().unwrap());
let info = hs.initiate(capab, total_difficulty, self_addr, conn);
match info {
Ok(peer_info) => Ok(Peer::new(peer_info, na)),
Err(e) => {
debug!(
"connect: handshaking with {:?} failed with error: {:?}",
conn.peer_addr().unwrap(),
e
);
if let Err(e) = conn.shutdown(Shutdown::Both) {
debug!("Error shutting down conn: {:?}", e);
}
Err(e)
}
}
}
/// Main peer loop listening for messages and forwarding to the rest of the

View file

@ -113,10 +113,7 @@ impl Server {
let hs = self.handshake.clone();
let addrs = hs.addrs.read();
if addrs.contains(&addr) {
debug!(
"connect: ignore the connecting to PeerWithSelf, addr: {}",
addr
);
debug!("connect: ignore connecting to PeerWithSelf, addr: {}", addr);
return Err(Error::PeerWithSelf);
}
}

View file

@ -22,7 +22,7 @@ use chrono::{Duration, MIN_DATE};
use rand::{thread_rng, Rng};
use std::net::{SocketAddr, ToSocketAddrs};
use std::sync::{mpsc, Arc};
use std::{cmp, io, str, thread, time};
use std::{cmp, str, thread, time};
use crate::core::global;
use crate::p2p;
@ -303,32 +303,13 @@ fn listen_for_addrs(
let p2p_c = p2p.clone();
let _ = thread::Builder::new()
.name("peer_connect".to_string())
.spawn(move || {
// connect and retry on fail, but for 3 times at most
for _ in 0..3 {
match p2p_c.connect(&addr) {
Ok(p) => {
let _ = p.send_peer_request(capab);
let _ = peers_c.update_state(addr, p2p::State::Healthy);
break;
}
Err(e) => {
let _ = peers_c.update_state(addr, p2p::State::Defunct);
// don't retry if connection refused or PeerWithSelf
match e {
p2p::Error::Connection(io_err) => {
if io::ErrorKind::ConnectionRefused == io_err.kind() {
break;
}
}
p2p::Error::PeerWithSelf => break,
_ => (), // allow to retry on any other error
}
}
}
thread::sleep(time::Duration::from_secs(1));
.spawn(move || match p2p_c.connect(&addr) {
Ok(p) => {
let _ = p.send_peer_request(capab);
let _ = peers_c.update_state(addr, p2p::State::Healthy);
}
Err(_) => {
let _ = peers_c.update_state(addr, p2p::State::Defunct);
}
});
}