diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index b918e26b9..13a12e9f6 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -118,6 +118,16 @@ impl Peers { self.peers.read().contains_key(addr) } + /// Check whether an ip address is in the active peers list, ignore the port + pub fn is_known_ip(&self, addr: &SocketAddr) -> bool { + for socket in self.peers.read().keys() { + if addr.ip() == socket.ip() { + return true; + } + } + return false; + } + /// Get vec of peers we are currently connected to. pub fn connected_peers(&self) -> Vec> { let mut res = self diff --git a/p2p/src/serv.rs b/p2p/src/serv.rs index 6c1a3795a..853bf6ad3 100644 --- a/p2p/src/serv.rs +++ b/p2p/src/serv.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::fs::File; use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream}; use std::sync::Arc; @@ -70,6 +71,8 @@ impl Server { let listener = TcpListener::bind(addr)?; listener.set_nonblocking(true)?; + let mut connected_sockets: HashMap = HashMap::new(); + let sleep_time = Duration::from_millis(1); loop { // Pause peer ingress connection request. Only for tests. @@ -81,10 +84,17 @@ impl Server { match listener.accept() { Ok((stream, peer_addr)) => { if !self.check_banned(&stream) { + let sc = stream.try_clone(); if let Err(e) = self.handle_new_peer(stream) { warn!("Error accepting peer {}: {:?}", peer_addr.to_string(), e); + } else { + if let Ok(s) = sc { + connected_sockets.insert(peer_addr, s); + } } } + // if any active socket not in our peers list, close it + self.clean_lost_sockets(&mut connected_sockets); } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { // nothing to do, will retry in next iteration @@ -191,6 +201,33 @@ impl Server { false } + /// For all kinds of exception cases, the node could accepted / initiated a peer connection successfully but + /// failed on the Handshake protocol communication, or a connected peer was closed but without a successful + /// clean-up on its socket, that will cause this connected (on TcpStream) peer becomes so-called "invisible" peer! + /// i.e. a peer not included in the 'self.peers.peers' hashmap. This "invisible" peer will cause some security + /// concern because it still can send something to this node, but without enough visibility as other connected peers. + /// Another impact is these connections could never be closed, which make the node fully occupied by all such + /// kind of connections and become un-connectable. + /// This function can help to clean the peer connections which is "invisible" for this node. + fn clean_lost_sockets(&self, sockets: &mut HashMap) { + let mut lost_sockets: Vec = vec![]; + for (socket, stream) in sockets.iter() { + if !self.peers.is_known_ip(&socket) { + if let Ok(_) = stream.shutdown(Shutdown::Both) { + debug!( + "clean_lost_sockets: {} cleaned which's not in peers list", + socket + ); + } + lost_sockets.push(socket.clone()); + } + } + + for socket in lost_sockets { + sockets.remove(&socket); + } + } + pub fn stop(&self) { self.stop_state.lock().stop(); self.peers.stop();