grin/p2p/src/serv.rs

370 lines
10 KiB
Rust
Raw Normal View History

// Copyright 2018 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
[WIP] Abridged sync (#440) * Util to zip and unzip directories * First pass at sumtree request/response. Add message types, implement the exchange in the protocol, zip up the sumtree directory and stream the file over, with necessary adapter hooks. * Implement the sumtree archive receive logicGets the sumtree archive data stream from the network and write it to a file. Unzip the file, place it at the right spot and reconstruct the sumtree data structure, rewinding where to the right spot. * Sumtree hash structure validation * Simplify sumtree backend buffering logic. The backend for a sumtree has to implement some in-memory buffering logic to provide a commit/rollback interface. The backend itself is an aggregate of 3 underlying storages (an append only file, a remove log and a skip list). The buffering was previously implemented both by the backend and some of the underlying storages. Now pushing back all buffering logic to the storages to keep the backend simpler. * Add kernel append only store file to sumtrees. The chain sumtrees structure now also saves all kernels to a dedicated file. As that storage is implemented by the append only file wrapper, it's also rewind-aware. * Full state validation. Checks that: - MMRs are sane (hash and sum each node) - Tree roots match the corresponding header - Kernel signatures are valid - Sum of all kernel excesses equals the sum of UTXO commitments minus the supply * Fast sync handoff to body sync. Once the fast-sync state is fully setup, get bacj in body sync mode to get the full bodies of the last blocks we're missing. * First fully working fast sync * Facility in p2p conn to deal with attachments (raw binary after message). * Re-introduced sumtree send and receive message handling using the above. * Fixed test and finished updating all required db state after sumtree validation. * Massaged a little bit the pipeline orphan check to still work after the new sumtrees have been setup. * Various cleanup. Consolidated fast sync and full sync into a single function as they're very similar. Proper conditions to trigger a sumtree request and some checks on receiving it.
2018-02-10 01:32:16 +03:00
use std::fs::File;
use std::io::{self, Read};
2018-03-04 03:19:54 +03:00
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
use std::path::PathBuf;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use crate::chain;
use crate::core::core;
use crate::core::core::hash::Hash;
use crate::core::global;
use crate::core::pow::Difficulty;
use crate::handshake::Handshake;
use crate::peer::Peer;
use crate::peers::Peers;
use crate::store::PeerStore;
2019-01-10 04:23:02 +03:00
use crate::types::{
Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan,
TxHashSetRead,
2019-01-10 04:23:02 +03:00
};
use crate::util::StopState;
use chrono::prelude::{DateTime, Utc};
/// P2P server implementation, handling bootstrapping to find and connect to
/// peers, receiving connections from other peers and keep track of all of them.
pub struct Server {
pub config: P2PConfig,
capabilities: Capabilities,
handshake: Arc<Handshake>,
pub peers: Arc<Peers>,
stop_state: Arc<StopState>,
}
// TODO TLS
impl Server {
/// Creates a new idle p2p server with no peers
pub fn new(
db_root: &str,
capab: Capabilities,
config: P2PConfig,
adapter: Arc<dyn ChainAdapter>,
genesis: Hash,
stop_state: Arc<StopState>,
) -> Result<Server, Error> {
Ok(Server {
config: config.clone(),
capabilities: capab,
handshake: Arc::new(Handshake::new(genesis, config.clone())),
peers: Arc::new(Peers::new(PeerStore::new(db_root)?, adapter, config)),
stop_state,
})
}
/// Starts a new TCP server and listen to incoming connections. This is a
/// blocking call until the TCP server stops.
pub fn listen(&self) -> Result<(), Error> {
// start TCP listener and handle incoming connections
let addr = SocketAddr::new(self.config.host, self.config.port);
let listener = TcpListener::bind(addr)?;
listener.set_nonblocking(true)?;
let sleep_time = Duration::from_millis(5);
loop {
// Pause peer ingress connection request. Only for tests.
if self.stop_state.is_paused() {
thread::sleep(Duration::from_secs(1));
continue;
}
match listener.accept() {
Ok((stream, peer_addr)) => {
let peer_addr = PeerAddr(peer_addr);
if self.check_undesirable(&stream) {
// Shutdown the incoming TCP connection if it is not desired
if let Err(e) = stream.shutdown(Shutdown::Both) {
debug!("Error shutting down conn: {:?}", e);
}
continue;
}
match self.handle_new_peer(stream) {
Err(Error::ConnectionClose) => debug!("shutting down, ignoring a new peer"),
Err(e) => {
debug!("Error accepting peer {}: {:?}", peer_addr.to_string(), e);
let _ = self.peers.add_banned(peer_addr, ReasonForBan::BadHandshake);
}
Ok(_) => {}
}
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
// nothing to do, will retry in next iteration
}
Err(e) => {
debug!("Couldn't establish new client connection: {:?}", e);
}
}
if self.stop_state.is_stopped() {
break;
}
thread::sleep(sleep_time);
}
Ok(())
}
/// Asks the server to connect to a new peer. Directly returns the peer if
/// we're already connected to the provided address.
pub fn connect(&self, addr: PeerAddr) -> Result<Arc<Peer>, Error> {
if self.stop_state.is_stopped() {
return Err(Error::ConnectionClose);
}
if Peer::is_denied(&self.config, addr) {
debug!("connect_peer: peer {} denied, not connecting.", addr);
return Err(Error::ConnectionClose);
}
if global::is_production_mode() {
let hs = self.handshake.clone();
let addrs = hs.addrs.read();
if addrs.contains(&addr) {
debug!("connect: ignore connecting to PeerWithSelf, addr: {}", addr);
return Err(Error::PeerWithSelf);
}
}
if let Some(p) = self.peers.get_connected_peer(addr) {
// if we're already connected to the addr, just return the peer
trace!("connect_peer: already connected {}", addr);
return Ok(p);
}
trace!(
"connect_peer: on {}:{}. connecting to {}",
self.config.host,
self.config.port,
addr
);
match TcpStream::connect_timeout(&addr.0, Duration::from_secs(10)) {
Ok(stream) => {
let addr = SocketAddr::new(self.config.host, self.config.port);
let total_diff = self.peers.total_difficulty()?;
let peer = Peer::connect(
stream,
self.capabilities,
total_diff,
PeerAddr(addr),
&self.handshake,
self.peers.clone(),
)?;
let peer = Arc::new(peer);
self.peers.add_connected(peer.clone())?;
Ok(peer)
}
Err(e) => {
trace!(
"connect_peer: on {}:{}. Could not connect to {}: {:?}",
self.config.host,
self.config.port,
addr,
e
);
Err(Error::Connection(e))
}
}
}
fn handle_new_peer(&self, stream: TcpStream) -> Result<(), Error> {
if self.stop_state.is_stopped() {
return Err(Error::ConnectionClose);
}
let total_diff = self.peers.total_difficulty()?;
// accept the peer and add it to the server map
let peer = Peer::accept(
stream,
self.capabilities,
total_diff,
&self.handshake,
self.peers.clone(),
)?;
self.peers.add_connected(Arc::new(peer))?;
Ok(())
}
/// Checks whether there's any reason we don't want to accept an incoming peer
/// connection. There can be a few of them:
/// 1. Accepting the peer connection would exceed the configured maximum allowed
/// inbound peer count. Note that seed nodes may wish to increase the default
/// value for PEER_LISTENER_BUFFER_COUNT to help with network bootstrapping.
/// A default buffer of 8 peers is allowed to help with network growth.
/// 2. The peer has been previously banned and the ban period hasn't
/// expired yet.
/// 3. We're already connected to a peer at the same IP. While there are
/// many reasons multiple peers can legitimately share identical IP
/// addresses (NAT), network distribution is improved if they choose
/// different sets of peers themselves. In addition, it prevent potential
/// duplicate connections, malicious or not.
fn check_undesirable(&self, stream: &TcpStream) -> bool {
if self.peers.peer_inbound_count()
>= self.config.peer_max_inbound_count() + self.config.peer_listener_buffer_count()
{
debug!("Accepting new connection will exceed peer limit, refusing connection.");
return true;
}
if let Ok(peer_addr) = stream.peer_addr() {
let peer_addr = PeerAddr(peer_addr);
if self.peers.is_banned(peer_addr) {
debug!("Peer {} banned, refusing connection.", peer_addr);
return true;
}
// The call to is_known() can fail due to contention on the peers map.
// If it fails we want to default to refusing the connection.
match self.peers.is_known(peer_addr) {
Ok(true) => {
debug!("Peer {} already known, refusing connection.", peer_addr);
return true;
}
Err(_) => {
error!(
"Peer {} is_known check failed, refusing connection.",
peer_addr
);
return true;
}
_ => (),
}
}
false
}
pub fn stop(&self) {
self.stop_state.stop();
self.peers.stop();
}
/// Pause means: stop all the current peers connection, only for tests.
/// Note:
/// 1. must pause the 'seed' thread also, to avoid the new egress peer connection
/// 2. must pause the 'p2p-server' thread also, to avoid the new ingress peer connection.
pub fn pause(&self) {
self.peers.stop();
}
}
/// A no-op network adapter used for testing.
pub struct DummyAdapter {}
impl ChainAdapter for DummyAdapter {
fn total_difficulty(&self) -> Result<Difficulty, chain::Error> {
Ok(Difficulty::min())
}
fn total_height(&self) -> Result<u64, chain::Error> {
Ok(0)
}
fn get_transaction(&self, _h: Hash) -> Option<core::Transaction> {
None
}
fn tx_kernel_received(&self, _h: Hash, _peer_info: &PeerInfo) -> Result<bool, chain::Error> {
Ok(true)
2018-03-04 03:19:54 +03:00
}
fn transaction_received(
&self,
_: core::Transaction,
_stem: bool,
) -> Result<bool, chain::Error> {
Ok(true)
2018-03-04 03:19:54 +03:00
}
fn compact_block_received(
&self,
_cb: core::CompactBlock,
_peer_info: &PeerInfo,
) -> Result<bool, chain::Error> {
Ok(true)
2018-03-04 03:19:54 +03:00
}
fn header_received(
&self,
_bh: core::BlockHeader,
_peer_info: &PeerInfo,
) -> Result<bool, chain::Error> {
Ok(true)
}
fn block_received(&self, _: core::Block, _: &PeerInfo, _: bool) -> Result<bool, chain::Error> {
Ok(true)
}
fn headers_received(
&self,
_: &[core::BlockHeader],
_: &PeerInfo,
) -> Result<bool, chain::Error> {
Ok(true)
}
fn locate_headers(&self, _: &[Hash]) -> Result<Vec<core::BlockHeader>, chain::Error> {
Ok(vec![])
}
fn get_block(&self, _: Hash) -> Option<core::Block> {
None
}
fn kernel_data_read(&self) -> Result<File, chain::Error> {
unimplemented!()
}
fn kernel_data_write(&self, _reader: &mut dyn Read) -> Result<bool, chain::Error> {
unimplemented!()
}
fn txhashset_read(&self, _h: Hash) -> Option<TxHashSetRead> {
[WIP] Abridged sync (#440) * Util to zip and unzip directories * First pass at sumtree request/response. Add message types, implement the exchange in the protocol, zip up the sumtree directory and stream the file over, with necessary adapter hooks. * Implement the sumtree archive receive logicGets the sumtree archive data stream from the network and write it to a file. Unzip the file, place it at the right spot and reconstruct the sumtree data structure, rewinding where to the right spot. * Sumtree hash structure validation * Simplify sumtree backend buffering logic. The backend for a sumtree has to implement some in-memory buffering logic to provide a commit/rollback interface. The backend itself is an aggregate of 3 underlying storages (an append only file, a remove log and a skip list). The buffering was previously implemented both by the backend and some of the underlying storages. Now pushing back all buffering logic to the storages to keep the backend simpler. * Add kernel append only store file to sumtrees. The chain sumtrees structure now also saves all kernels to a dedicated file. As that storage is implemented by the append only file wrapper, it's also rewind-aware. * Full state validation. Checks that: - MMRs are sane (hash and sum each node) - Tree roots match the corresponding header - Kernel signatures are valid - Sum of all kernel excesses equals the sum of UTXO commitments minus the supply * Fast sync handoff to body sync. Once the fast-sync state is fully setup, get bacj in body sync mode to get the full bodies of the last blocks we're missing. * First fully working fast sync * Facility in p2p conn to deal with attachments (raw binary after message). * Re-introduced sumtree send and receive message handling using the above. * Fixed test and finished updating all required db state after sumtree validation. * Massaged a little bit the pipeline orphan check to still work after the new sumtrees have been setup. * Various cleanup. Consolidated fast sync and full sync into a single function as they're very similar. Proper conditions to trigger a sumtree request and some checks on receiving it.
2018-02-10 01:32:16 +03:00
unimplemented!()
}
fn txhashset_archive_header(&self) -> Result<core::BlockHeader, chain::Error> {
unimplemented!()
}
fn txhashset_receive_ready(&self) -> bool {
false
}
fn txhashset_write(
&self,
_h: Hash,
_txhashset_data: File,
_peer_info: &PeerInfo,
) -> Result<bool, chain::Error> {
Ok(false)
[WIP] Abridged sync (#440) * Util to zip and unzip directories * First pass at sumtree request/response. Add message types, implement the exchange in the protocol, zip up the sumtree directory and stream the file over, with necessary adapter hooks. * Implement the sumtree archive receive logicGets the sumtree archive data stream from the network and write it to a file. Unzip the file, place it at the right spot and reconstruct the sumtree data structure, rewinding where to the right spot. * Sumtree hash structure validation * Simplify sumtree backend buffering logic. The backend for a sumtree has to implement some in-memory buffering logic to provide a commit/rollback interface. The backend itself is an aggregate of 3 underlying storages (an append only file, a remove log and a skip list). The buffering was previously implemented both by the backend and some of the underlying storages. Now pushing back all buffering logic to the storages to keep the backend simpler. * Add kernel append only store file to sumtrees. The chain sumtrees structure now also saves all kernels to a dedicated file. As that storage is implemented by the append only file wrapper, it's also rewind-aware. * Full state validation. Checks that: - MMRs are sane (hash and sum each node) - Tree roots match the corresponding header - Kernel signatures are valid - Sum of all kernel excesses equals the sum of UTXO commitments minus the supply * Fast sync handoff to body sync. Once the fast-sync state is fully setup, get bacj in body sync mode to get the full bodies of the last blocks we're missing. * First fully working fast sync * Facility in p2p conn to deal with attachments (raw binary after message). * Re-introduced sumtree send and receive message handling using the above. * Fixed test and finished updating all required db state after sumtree validation. * Massaged a little bit the pipeline orphan check to still work after the new sumtrees have been setup. * Various cleanup. Consolidated fast sync and full sync into a single function as they're very similar. Proper conditions to trigger a sumtree request and some checks on receiving it.
2018-02-10 01:32:16 +03:00
}
fn txhashset_download_update(
&self,
_start_time: DateTime<Utc>,
_downloaded_size: u64,
_total_size: u64,
) -> bool {
false
}
fn get_tmp_dir(&self) -> PathBuf {
unimplemented!()
}
fn get_tmpfile_pathname(&self, _tmpfile_name: String) -> PathBuf {
unimplemented!()
}
}
impl NetAdapter for DummyAdapter {
fn find_peer_addrs(&self, _: Capabilities) -> Vec<PeerAddr> {
vec![]
}
fn peer_addrs_received(&self, _: Vec<PeerAddr>) {}
fn peer_difficulty(&self, _: PeerAddr, _: Difficulty, _: u64) {}
fn is_banned(&self, _: PeerAddr) -> bool {
false
}
}