// Copyright 2016-2018 The Grin Developers // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. use std::fs::File; use std::net::{TcpListener, TcpStream, SocketAddr, Shutdown}; use std::sync::{Arc, RwLock}; use std::thread; use std::time::Duration; use core::core; use core::core::hash::Hash; use core::core::target::Difficulty; use handshake::Handshake; use peer::Peer; use peers::Peers; use store::PeerStore; use types::*; use util::LOGGER; /// P2P server implementation, handling bootstrapping to find and connect to /// peers, receiving connections from other peers and keep track of all of them. pub struct Server { config: P2PConfig, capabilities: Capabilities, handshake: Arc, pub peers: Peers, } unsafe impl Sync for Server {} unsafe impl Send for Server {} // TODO TLS impl Server { /// Creates a new idle p2p server with no peers pub fn new( db_root: String, capab: Capabilities, config: P2PConfig, adapter: Arc, genesis: Hash, ) -> Result { Ok(Server { config: config.clone(), capabilities: capab, handshake: Arc::new(Handshake::new(genesis, config.clone())), peers: Peers::new(PeerStore::new(db_root)?, adapter, config), }) } /// Starts a new TCP server and listen to incoming connections. This is a /// blocking call until the TCP server stops. pub fn listen(&self) -> Result<(), Error> { // start peer monitoring thread let peers_inner = self.peers.clone(); let _ = thread::Builder::new().name("p2p-monitor".to_string()).spawn(move || { loop { let total_diff = peers_inner.total_difficulty(); let total_height = peers_inner.total_height(); peers_inner.check_all(total_diff, total_height); thread::sleep(Duration::from_secs(20)); } }); // start TCP listener and handle incoming connections let addr = SocketAddr::new(self.config.host, self.config.port); let listener = TcpListener::bind(addr)?; for stream in listener.incoming() { match stream { Ok(stream) => { if !self.check_banned(&stream) { let peer_addr = stream.peer_addr(); if let Err(e) = self.handle_new_peer(stream) { debug!( LOGGER, "Error accepting peer {}: {:?}", peer_addr.map(|a| a.to_string()).unwrap_or("?".to_owned()), e); } } } Err(e) => { warn!(LOGGER, "Couldn't establish new client connection: {:?}", e); } } } Ok(()) } /// Asks the server to connect to a new peer. Directly returns the peer if /// we're already connected to the provided address. pub fn connect(&self, addr: &SocketAddr) -> Result>, Error> { if Peer::is_denied(&self.config, &addr) { debug!(LOGGER, "Peer {} denied, not connecting.", addr); return Err(Error::ConnectionClose); } if let Some(p) = self.peers.get_connected_peer(addr) { // if we're already connected to the addr, just return the peer debug!(LOGGER, "connect_peer: already connected {}", addr); return Ok(p); } debug!(LOGGER, "connect_peer: connecting to {}", addr); match TcpStream::connect_timeout(addr, Duration::from_secs(10)) { Ok(mut stream) => { let addr = SocketAddr::new(self.config.host, self.config.port); let total_diff = self.peers.total_difficulty(); let peer = Peer::connect( &mut stream, self.capabilities, total_diff, addr, &self.handshake, Arc::new(self.peers.clone()), )?; let added = self.peers.add_connected(peer); { let mut peer = added.write().unwrap(); peer.start(stream); } Ok(added) } Err(e) => { debug!(LOGGER, "Couldn not connect to {}: {:?}", addr, e); Err(Error::Connection(e)) } } } fn handle_new_peer(&self, mut stream: TcpStream) -> Result<(), Error> { let total_diff = self.peers.total_difficulty(); // accept the peer and add it to the server map let peer = Peer::accept( &mut stream, self.capabilities, total_diff, &self.handshake, Arc::new(self.peers.clone()), )?; let added = self.peers.add_connected(peer); let mut peer = added.write().unwrap(); peer.start(stream); Ok(()) } fn check_banned(&self, stream: &TcpStream) -> bool { // peer has been banned, go away! if let Ok(peer_addr) = stream.peer_addr() { if self.peers.is_banned(peer_addr) { debug!(LOGGER, "Peer {} banned, refusing connection.", peer_addr); if let Err(e) = stream.shutdown(Shutdown::Both) { debug!(LOGGER, "Error shutting down conn: {:?}", e); } return true; } } false } } /// A no-op network adapter used for testing. pub struct DummyAdapter {} impl ChainAdapter for DummyAdapter { fn total_difficulty(&self) -> Difficulty { Difficulty::one() } fn total_height(&self) -> u64 { 0 } fn transaction_received(&self, _: core::Transaction) {} fn compact_block_received(&self, _cb: core::CompactBlock, _addr: SocketAddr) -> bool { true } fn header_received(&self, _bh: core::BlockHeader, _addr: SocketAddr) -> bool { true } fn block_received(&self, _: core::Block, _: SocketAddr) -> bool { true } fn headers_received(&self, _: Vec, _:SocketAddr) {} fn locate_headers(&self, _: Vec) -> Vec { vec![] } fn get_block(&self, _: Hash) -> Option { None } fn sumtrees_read(&self, _h: Hash) -> Option { unimplemented!() } fn sumtrees_write(&self, _h: Hash, _rewind_to_output: u64, _rewind_to_kernel: u64, _sumtree_data: File, _peer_addr: SocketAddr) -> bool { false } } impl NetAdapter for DummyAdapter { fn find_peer_addrs(&self, _: Capabilities) -> Vec { vec![] } fn peer_addrs_received(&self, _: Vec) {} fn peer_difficulty(&self, _: SocketAddr, _: Difficulty, _:u64) {} }