clean the unused p2p sockets (i.e. not in peers list) (#2298)

* clean the unused p2p sockets (i.e. not in peers list)

* add function comments
This commit is contained in:
Gary Yu 2019-01-07 14:41:41 +08:00 committed by GitHub
parent 657392b592
commit e79123fd5c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 47 additions and 0 deletions

View file

@ -118,6 +118,16 @@ impl Peers {
self.peers.read().contains_key(addr) self.peers.read().contains_key(addr)
} }
/// Check whether an ip address is in the active peers list, ignore the port
pub fn is_known_ip(&self, addr: &SocketAddr) -> bool {
for socket in self.peers.read().keys() {
if addr.ip() == socket.ip() {
return true;
}
}
return false;
}
/// Get vec of peers we are currently connected to. /// Get vec of peers we are currently connected to.
pub fn connected_peers(&self) -> Vec<Arc<Peer>> { pub fn connected_peers(&self) -> Vec<Arc<Peer>> {
let mut res = self let mut res = self

View file

@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::collections::HashMap;
use std::fs::File; use std::fs::File;
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream}; use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
use std::sync::Arc; use std::sync::Arc;
@ -70,6 +71,8 @@ impl Server {
let listener = TcpListener::bind(addr)?; let listener = TcpListener::bind(addr)?;
listener.set_nonblocking(true)?; listener.set_nonblocking(true)?;
let mut connected_sockets: HashMap<SocketAddr, TcpStream> = HashMap::new();
let sleep_time = Duration::from_millis(1); let sleep_time = Duration::from_millis(1);
loop { loop {
// Pause peer ingress connection request. Only for tests. // Pause peer ingress connection request. Only for tests.
@ -81,10 +84,17 @@ impl Server {
match listener.accept() { match listener.accept() {
Ok((stream, peer_addr)) => { Ok((stream, peer_addr)) => {
if !self.check_banned(&stream) { if !self.check_banned(&stream) {
let sc = stream.try_clone();
if let Err(e) = self.handle_new_peer(stream) { if let Err(e) = self.handle_new_peer(stream) {
warn!("Error accepting peer {}: {:?}", peer_addr.to_string(), e); warn!("Error accepting peer {}: {:?}", peer_addr.to_string(), e);
} else {
if let Ok(s) = sc {
connected_sockets.insert(peer_addr, s);
}
} }
} }
// if any active socket not in our peers list, close it
self.clean_lost_sockets(&mut connected_sockets);
} }
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
// nothing to do, will retry in next iteration // nothing to do, will retry in next iteration
@ -191,6 +201,33 @@ impl Server {
false false
} }
/// For all kinds of exception cases, the node could accepted / initiated a peer connection successfully but
/// failed on the Handshake protocol communication, or a connected peer was closed but without a successful
/// clean-up on its socket, that will cause this connected (on TcpStream) peer becomes so-called "invisible" peer!
/// i.e. a peer not included in the 'self.peers.peers' hashmap. This "invisible" peer will cause some security
/// concern because it still can send something to this node, but without enough visibility as other connected peers.
/// Another impact is these connections could never be closed, which make the node fully occupied by all such
/// kind of connections and become un-connectable.
/// This function can help to clean the peer connections which is "invisible" for this node.
fn clean_lost_sockets(&self, sockets: &mut HashMap<SocketAddr, TcpStream>) {
let mut lost_sockets: Vec<SocketAddr> = vec![];
for (socket, stream) in sockets.iter() {
if !self.peers.is_known_ip(&socket) {
if let Ok(_) = stream.shutdown(Shutdown::Both) {
debug!(
"clean_lost_sockets: {} cleaned which's not in peers list",
socket
);
}
lost_sockets.push(socket.clone());
}
}
for socket in lost_sockets {
sockets.remove(&socket);
}
}
pub fn stop(&self) { pub fn stop(&self) {
self.stop_state.lock().stop(); self.stop_state.lock().stop();
self.peers.stop(); self.peers.stop();