From 738d49d560e89a3eb88565b5a8306bfcf1769573 Mon Sep 17 00:00:00 2001 From: Gary Yu Date: Mon, 31 Dec 2018 19:24:30 +0800 Subject: [PATCH] fix: one peer occupy multiple tcp connections (#2262) * shutdown the tcpstream when peer connect or accept fail * remove the unnecessary 3 times retry on peer connecting * connect/accept are actually handshakings instead of tcpstream connect/accept --- p2p/src/handshake.rs | 11 +++++++---- p2p/src/peer.rs | 38 +++++++++++++++++++++++++++++++++----- p2p/src/serv.rs | 5 +---- servers/src/grin/seed.rs | 35 ++++++++--------------------------- 4 files changed, 49 insertions(+), 40 deletions(-) diff --git a/p2p/src/handshake.rs b/p2p/src/handshake.rs index 041c81267..cd0ea197a 100644 --- a/p2p/src/handshake.rs +++ b/p2p/src/handshake.rs @@ -14,7 +14,7 @@ use crate::util::RwLock; use std::collections::VecDeque; -use std::net::{Shutdown, SocketAddr, TcpStream}; +use std::net::{SocketAddr, TcpStream}; use std::sync::Arc; use chrono::prelude::*; @@ -28,7 +28,13 @@ use crate::msg::{ use crate::peer::Peer; use crate::types::{Capabilities, Direction, Error, P2PConfig, PeerInfo, PeerLiveInfo}; +/// Local generated nonce for peer connecting. +/// Used for self-connecting detection (on receiver side), +/// nonce(s) in recent 100 connecting requests are saved const NONCES_CAP: usize = 100; +/// Socket addresses of self, extracted from stream when a self-connecting is detected. +/// Used in connecting request to avoid self-connecting request, +/// 10 should be enough since most of servers don't have more than 10 IP addresses. const ADDRS_CAP: usize = 10; /// Handles the handshake negotiation when two peers connect and decides on @@ -157,9 +163,6 @@ impl Handshake { if addrs.len() >= ADDRS_CAP { addrs.pop_front(); } - if let Err(e) = conn.shutdown(Shutdown::Both) { - debug!("Error shutting down conn: {:?}", e); - } return Err(Error::PeerWithSelf); } } diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 3185a67b6..aa34c539d 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -14,7 +14,7 @@ use crate::util::{Mutex, RwLock}; use std::fs::File; -use std::net::{SocketAddr, TcpStream}; +use std::net::{Shutdown, SocketAddr, TcpStream}; use std::sync::Arc; use crate::conn; @@ -71,8 +71,22 @@ impl Peer { hs: &Handshake, adapter: Arc, ) -> Result { - let info = hs.accept(capab, total_difficulty, conn)?; - Ok(Peer::new(info, adapter)) + debug!("accept: handshaking from {:?}", conn.peer_addr()); + let info = hs.accept(capab, total_difficulty, conn); + match info { + Ok(peer_info) => Ok(Peer::new(peer_info, adapter)), + Err(e) => { + debug!( + "accept: handshaking from {:?} failed with error: {:?}", + conn.peer_addr(), + e + ); + if let Err(e) = conn.shutdown(Shutdown::Both) { + debug!("Error shutting down conn: {:?}", e); + } + Err(e) + } + } } pub fn connect( @@ -83,8 +97,22 @@ impl Peer { hs: &Handshake, na: Arc, ) -> Result { - let info = hs.initiate(capab, total_difficulty, self_addr, conn)?; - Ok(Peer::new(info, na)) + debug!("connect: handshaking with {:?}", conn.peer_addr().unwrap()); + let info = hs.initiate(capab, total_difficulty, self_addr, conn); + match info { + Ok(peer_info) => Ok(Peer::new(peer_info, na)), + Err(e) => { + debug!( + "connect: handshaking with {:?} failed with error: {:?}", + conn.peer_addr().unwrap(), + e + ); + if let Err(e) = conn.shutdown(Shutdown::Both) { + debug!("Error shutting down conn: {:?}", e); + } + Err(e) + } + } } /// Main peer loop listening for messages and forwarding to the rest of the diff --git a/p2p/src/serv.rs b/p2p/src/serv.rs index fb34aaa44..6c1a3795a 100644 --- a/p2p/src/serv.rs +++ b/p2p/src/serv.rs @@ -113,10 +113,7 @@ impl Server { let hs = self.handshake.clone(); let addrs = hs.addrs.read(); if addrs.contains(&addr) { - debug!( - "connect: ignore the connecting to PeerWithSelf, addr: {}", - addr - ); + debug!("connect: ignore connecting to PeerWithSelf, addr: {}", addr); return Err(Error::PeerWithSelf); } } diff --git a/servers/src/grin/seed.rs b/servers/src/grin/seed.rs index 103dafd3b..4fcfbbbb5 100644 --- a/servers/src/grin/seed.rs +++ b/servers/src/grin/seed.rs @@ -22,7 +22,7 @@ use chrono::{Duration, MIN_DATE}; use rand::{thread_rng, Rng}; use std::net::{SocketAddr, ToSocketAddrs}; use std::sync::{mpsc, Arc}; -use std::{cmp, io, str, thread, time}; +use std::{cmp, str, thread, time}; use crate::core::global; use crate::p2p; @@ -303,32 +303,13 @@ fn listen_for_addrs( let p2p_c = p2p.clone(); let _ = thread::Builder::new() .name("peer_connect".to_string()) - .spawn(move || { - // connect and retry on fail, but for 3 times at most - for _ in 0..3 { - match p2p_c.connect(&addr) { - Ok(p) => { - let _ = p.send_peer_request(capab); - let _ = peers_c.update_state(addr, p2p::State::Healthy); - break; - } - Err(e) => { - let _ = peers_c.update_state(addr, p2p::State::Defunct); - - // don't retry if connection refused or PeerWithSelf - match e { - p2p::Error::Connection(io_err) => { - if io::ErrorKind::ConnectionRefused == io_err.kind() { - break; - } - } - p2p::Error::PeerWithSelf => break, - _ => (), // allow to retry on any other error - } - } - } - - thread::sleep(time::Duration::from_secs(1)); + .spawn(move || match p2p_c.connect(&addr) { + Ok(p) => { + let _ = p.send_peer_request(capab); + let _ = peers_c.update_state(addr, p2p::State::Healthy); + } + Err(_) => { + let _ = peers_c.update_state(addr, p2p::State::Defunct); } }); }