grin/p2p/src/serv.rs
Antioch Peverell 4ef4212f1f
Simplify peer connection handling (#2801)
* connection no longer wrapped in an Option in peer

* introduce peer.send()

* remove some Arc indirection

* self.send() cleanup

* extract Peer:new() from connect and accept

* fixup

* cleanup
2019-05-03 15:56:25 +01:00

327 lines
8.7 KiB
Rust

// Copyright 2018 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fs::File;
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use std::{io, thread};
use crate::chain;
use crate::core::core;
use crate::core::core::hash::Hash;
use crate::core::global;
use crate::core::pow::Difficulty;
use crate::handshake::Handshake;
use crate::peer::Peer;
use crate::peers::Peers;
use crate::store::PeerStore;
use crate::types::{
Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan,
TxHashSetRead,
};
use crate::util::{Mutex, StopState};
use chrono::prelude::{DateTime, Utc};
/// P2P server implementation, handling bootstrapping to find and connect to
/// peers, receiving connections from other peers and keep track of all of them.
pub struct Server {
pub config: P2PConfig,
capabilities: Capabilities,
handshake: Arc<Handshake>,
pub peers: Arc<Peers>,
stop_state: Arc<Mutex<StopState>>,
}
// TODO TLS
impl Server {
/// Creates a new idle p2p server with no peers
pub fn new(
db_root: &str,
capab: Capabilities,
config: P2PConfig,
adapter: Arc<dyn ChainAdapter>,
genesis: Hash,
stop_state: Arc<Mutex<StopState>>,
) -> Result<Server, Error> {
Ok(Server {
config: config.clone(),
capabilities: capab,
handshake: Arc::new(Handshake::new(genesis, config.clone())),
peers: Arc::new(Peers::new(PeerStore::new(db_root)?, adapter, config)),
stop_state,
})
}
/// Starts a new TCP server and listen to incoming connections. This is a
/// blocking call until the TCP server stops.
pub fn listen(&self) -> Result<(), Error> {
// start TCP listener and handle incoming connections
let addr = SocketAddr::new(self.config.host, self.config.port);
let listener = TcpListener::bind(addr)?;
listener.set_nonblocking(true)?;
let sleep_time = Duration::from_millis(5);
loop {
// Pause peer ingress connection request. Only for tests.
if self.stop_state.lock().is_paused() {
thread::sleep(Duration::from_secs(1));
continue;
}
match listener.accept() {
Ok((stream, peer_addr)) => {
let peer_addr = PeerAddr(peer_addr);
if self.check_undesirable(&stream) {
continue;
}
if let Err(e) = self.handle_new_peer(stream) {
debug!("Error accepting peer {}: {:?}", peer_addr.to_string(), e);
let _ = self.peers.add_banned(peer_addr, ReasonForBan::BadHandshake);
}
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
// nothing to do, will retry in next iteration
}
Err(e) => {
debug!("Couldn't establish new client connection: {:?}", e);
}
}
if self.stop_state.lock().is_stopped() {
break;
}
thread::sleep(sleep_time);
}
Ok(())
}
/// Asks the server to connect to a new peer. Directly returns the peer if
/// we're already connected to the provided address.
pub fn connect(&self, addr: PeerAddr) -> Result<Arc<Peer>, Error> {
if Peer::is_denied(&self.config, addr) {
debug!("connect_peer: peer {} denied, not connecting.", addr);
return Err(Error::ConnectionClose);
}
if global::is_production_mode() {
let hs = self.handshake.clone();
let addrs = hs.addrs.read();
if addrs.contains(&addr) {
debug!("connect: ignore connecting to PeerWithSelf, addr: {}", addr);
return Err(Error::PeerWithSelf);
}
}
if let Some(p) = self.peers.get_connected_peer(addr) {
// if we're already connected to the addr, just return the peer
trace!("connect_peer: already connected {}", addr);
return Ok(p);
}
trace!(
"connect_peer: on {}:{}. connecting to {}",
self.config.host,
self.config.port,
addr
);
match TcpStream::connect_timeout(&addr.0, Duration::from_secs(10)) {
Ok(stream) => {
let addr = SocketAddr::new(self.config.host, self.config.port);
let total_diff = self.peers.total_difficulty()?;
let peer = Peer::connect(
stream,
self.capabilities,
total_diff,
PeerAddr(addr),
&self.handshake,
self.peers.clone(),
)?;
let peer = Arc::new(peer);
self.peers.add_connected(peer.clone())?;
Ok(peer)
}
Err(e) => {
trace!(
"connect_peer: on {}:{}. Could not connect to {}: {:?}",
self.config.host,
self.config.port,
addr,
e
);
Err(Error::Connection(e))
}
}
}
fn handle_new_peer(&self, stream: TcpStream) -> Result<(), Error> {
let total_diff = self.peers.total_difficulty()?;
// accept the peer and add it to the server map
let peer = Peer::accept(
stream,
self.capabilities,
total_diff,
&self.handshake,
self.peers.clone(),
)?;
self.peers.add_connected(Arc::new(peer))?;
Ok(())
}
/// Checks whether there's any reason we don't want to accept a peer
/// connection. There can be a couple of them:
/// 1. The peer has been previously banned and the ban period hasn't
/// expired yet.
/// 2. We're already connected to a peer at the same IP. While there are
/// many reasons multiple peers can legitimately share identical IP
/// addresses (NAT), network distribution is improved if they choose
/// different sets of peers themselves. In addition, it prevent potential
/// duplicate connections, malicious or not.
fn check_undesirable(&self, stream: &TcpStream) -> bool {
if let Ok(peer_addr) = stream.peer_addr() {
let peer_addr = PeerAddr(peer_addr);
if self.peers.is_banned(peer_addr) {
debug!("Peer {} banned, refusing connection.", peer_addr);
if let Err(e) = stream.shutdown(Shutdown::Both) {
debug!("Error shutting down conn: {:?}", e);
}
return true;
}
if self.peers.is_known(peer_addr) {
debug!("Peer {} already known, refusing connection.", peer_addr);
if let Err(e) = stream.shutdown(Shutdown::Both) {
debug!("Error shutting down conn: {:?}", e);
}
return true;
}
}
false
}
pub fn stop(&self) {
self.stop_state.lock().stop();
self.peers.stop();
}
/// Pause means: stop all the current peers connection, only for tests.
/// Note:
/// 1. must pause the 'seed' thread also, to avoid the new egress peer connection
/// 2. must pause the 'p2p-server' thread also, to avoid the new ingress peer connection.
pub fn pause(&self) {
self.peers.stop();
}
}
/// A no-op network adapter used for testing.
pub struct DummyAdapter {}
impl ChainAdapter for DummyAdapter {
fn total_difficulty(&self) -> Result<Difficulty, chain::Error> {
Ok(Difficulty::min())
}
fn total_height(&self) -> Result<u64, chain::Error> {
Ok(0)
}
fn get_transaction(&self, _h: Hash) -> Option<core::Transaction> {
None
}
fn tx_kernel_received(&self, _h: Hash, _peer_info: &PeerInfo) -> Result<bool, chain::Error> {
Ok(true)
}
fn transaction_received(
&self,
_: core::Transaction,
_stem: bool,
) -> Result<bool, chain::Error> {
Ok(true)
}
fn compact_block_received(
&self,
_cb: core::CompactBlock,
_peer_info: &PeerInfo,
) -> Result<bool, chain::Error> {
Ok(true)
}
fn header_received(
&self,
_bh: core::BlockHeader,
_peer_info: &PeerInfo,
) -> Result<bool, chain::Error> {
Ok(true)
}
fn block_received(&self, _: core::Block, _: &PeerInfo, _: bool) -> Result<bool, chain::Error> {
Ok(true)
}
fn headers_received(
&self,
_: &[core::BlockHeader],
_: &PeerInfo,
) -> Result<bool, chain::Error> {
Ok(true)
}
fn locate_headers(&self, _: &[Hash]) -> Result<Vec<core::BlockHeader>, chain::Error> {
Ok(vec![])
}
fn get_block(&self, _: Hash) -> Option<core::Block> {
None
}
fn txhashset_read(&self, _h: Hash) -> Option<TxHashSetRead> {
unimplemented!()
}
fn txhashset_receive_ready(&self) -> bool {
false
}
fn txhashset_write(
&self,
_h: Hash,
_txhashset_data: File,
_peer_info: &PeerInfo,
) -> Result<bool, chain::Error> {
Ok(false)
}
fn txhashset_download_update(
&self,
_start_time: DateTime<Utc>,
_downloaded_size: u64,
_total_size: u64,
) -> bool {
false
}
fn get_tmp_dir(&self) -> PathBuf {
unimplemented!()
}
fn get_tmpfile_pathname(&self, _tmpfile_name: String) -> PathBuf {
unimplemented!()
}
}
impl NetAdapter for DummyAdapter {
fn find_peer_addrs(&self, _: Capabilities) -> Vec<PeerAddr> {
vec![]
}
fn peer_addrs_received(&self, _: Vec<PeerAddr>) {}
fn peer_difficulty(&self, _: PeerAddr, _: Difficulty, _: u64) {}
fn is_banned(&self, _: PeerAddr) -> bool {
false
}
}