grin/p2p/src/serv.rs

277 lines
7.5 KiB
Rust
Raw Normal View History

// Copyright 2018 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
[WIP] Abridged sync (#440) * Util to zip and unzip directories * First pass at sumtree request/response. Add message types, implement the exchange in the protocol, zip up the sumtree directory and stream the file over, with necessary adapter hooks. * Implement the sumtree archive receive logicGets the sumtree archive data stream from the network and write it to a file. Unzip the file, place it at the right spot and reconstruct the sumtree data structure, rewinding where to the right spot. * Sumtree hash structure validation * Simplify sumtree backend buffering logic. The backend for a sumtree has to implement some in-memory buffering logic to provide a commit/rollback interface. The backend itself is an aggregate of 3 underlying storages (an append only file, a remove log and a skip list). The buffering was previously implemented both by the backend and some of the underlying storages. Now pushing back all buffering logic to the storages to keep the backend simpler. * Add kernel append only store file to sumtrees. The chain sumtrees structure now also saves all kernels to a dedicated file. As that storage is implemented by the append only file wrapper, it's also rewind-aware. * Full state validation. Checks that: - MMRs are sane (hash and sum each node) - Tree roots match the corresponding header - Kernel signatures are valid - Sum of all kernel excesses equals the sum of UTXO commitments minus the supply * Fast sync handoff to body sync. Once the fast-sync state is fully setup, get bacj in body sync mode to get the full bodies of the last blocks we're missing. * First fully working fast sync * Facility in p2p conn to deal with attachments (raw binary after message). * Re-introduced sumtree send and receive message handling using the above. * Fixed test and finished updating all required db state after sumtree validation. * Massaged a little bit the pipeline orphan check to still work after the new sumtrees have been setup. * Various cleanup. Consolidated fast sync and full sync into a single function as they're very similar. Proper conditions to trigger a sumtree request and some checks on receiving it.
2018-02-10 01:32:16 +03:00
use std::fs::File;
2018-03-04 03:19:54 +03:00
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
use std::sync::atomic::{AtomicBool, Ordering};
Minimal Transaction Pool (#1067) * verify a tx like we verify a block (experimental) * first minimal_pool test up and running but not testing what we need to * rework tx_pool validation to use txhashset extension * minimal tx pool wired up but rough * works locally (rough statew though) delete "legacy" pool and graph code * rework the new pool into TransactionPool and Pool impls * rework pool to store pool entries with associated timer and source etc. * all_transactions * extra_txs so we can validate stempool against existing txpool * rework reconcile_block * txhashset apply_raw_tx can now rewind to a checkpoint (prev raw tx) * wip - txhashset tx tests * more flexible rewind on MMRs * add tests to cover apply_raw_txs on txhashset extension * add_to_stempool and add_to_txpool * deaggregate multi kernel tx when adding to txpoool * handle freshness in stempool handle propagation of stempool txs via dandelion monitor * patience timer and fluff if we cannot propagate to next relay * aggregate and fluff stempool is we have no relay * refactor coinbase maturity * rewrote basic tx pool tests to use a real txhashset via chain adapter * rework dandelion monitor to reflect recent discussion works locally but needs a cleanup * refactor dandelion_monitor - split out phases * more pool test coverage * remove old test code from pool (still wip) * block_building and block_reconciliation tests * tracked down chain test failure... * fix test_coinbase_maturity * dandelion_monitor now runs... * refactor dandelion config, shared across p2p and pool components * fix pool tests with new config * fix p2p tests * rework tx pool to deal with duplicate commitments (testnet2 limitation) * cleanup and address some PR feedback * add big comment about pre_tx...
2018-05-30 23:57:13 +03:00
use std::sync::{Arc, RwLock};
use std::time::Duration;
use std::{io, thread};
Wallet LMDB backend (#1151) * Migrate main node store to LMDB In preparation to using LMDB as a wallet database, migrate the node db. There's no point in having 2 key-value stores. In addition LMDB provides a few advantages as a node db, namely a much faster build (compared to RocksDb), lesser dependencies and transactions. * Migrated p2p store to lmdb, stuff compiles * More fixes, chain tests starting to pass * Fixed txhashset rollback messing with block save and general batch delimitation. Chain tests passing. * rustfmt * LMDB max map size of 10MB isn't really workable. Half TB seems reasonable. * Fix wallet tests * Rather crucial commit was missing * rustfmt * Fixing new merged tests following lmdb changes * rustfmt * * Make txhashset validation read-only on fast sync to avoid having a really long open transaction. * Fix deadlock in new block processing, batch should always be created within a txhashset lock (when they interact). * Comment about batch and txhashset interlacing * Fix store tests to use batch * Externalize wallet config and seed * Converted direct read access to file outputs map to an iterator * Cleaned up and simplified wallet Backend trait: * No more direct mutable access to internal structures (HashMap) * Batch interface for all writes * Remove unneeded read wrapper (read_wallet) * rustfmt * First (incomplete) pass at wallet LMDB backend * Progressing on lmdb backent iml * Added batch impl for LMDB wallet backend. Pretty much done with it, but not sure how to deal with commit (owned). * rustfmt * Wrapping LMDB batch around a refcell to work around borrow rules * Compilation up to grin chain
2018-06-22 11:08:06 +03:00
use lmdb;
use core::core;
use core::core::hash::Hash;
use core::core::target::Difficulty;
use handshake::Handshake;
use peer::Peer;
use peers::Peers;
use store::PeerStore;
use types::{Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, TxHashSetRead};
use util::LOGGER;
/// P2P server implementation, handling bootstrapping to find and connect to
/// peers, receiving connections from other peers and keep track of all of them.
pub struct Server {
pub config: P2PConfig,
capabilities: Capabilities,
handshake: Arc<Handshake>,
pub peers: Arc<Peers>,
stop: Arc<AtomicBool>,
}
unsafe impl Sync for Server {}
unsafe impl Send for Server {}
// TODO TLS
impl Server {
/// Creates a new idle p2p server with no peers
pub fn new(
Wallet LMDB backend (#1151) * Migrate main node store to LMDB In preparation to using LMDB as a wallet database, migrate the node db. There's no point in having 2 key-value stores. In addition LMDB provides a few advantages as a node db, namely a much faster build (compared to RocksDb), lesser dependencies and transactions. * Migrated p2p store to lmdb, stuff compiles * More fixes, chain tests starting to pass * Fixed txhashset rollback messing with block save and general batch delimitation. Chain tests passing. * rustfmt * LMDB max map size of 10MB isn't really workable. Half TB seems reasonable. * Fix wallet tests * Rather crucial commit was missing * rustfmt * Fixing new merged tests following lmdb changes * rustfmt * * Make txhashset validation read-only on fast sync to avoid having a really long open transaction. * Fix deadlock in new block processing, batch should always be created within a txhashset lock (when they interact). * Comment about batch and txhashset interlacing * Fix store tests to use batch * Externalize wallet config and seed * Converted direct read access to file outputs map to an iterator * Cleaned up and simplified wallet Backend trait: * No more direct mutable access to internal structures (HashMap) * Batch interface for all writes * Remove unneeded read wrapper (read_wallet) * rustfmt * First (incomplete) pass at wallet LMDB backend * Progressing on lmdb backent iml * Added batch impl for LMDB wallet backend. Pretty much done with it, but not sure how to deal with commit (owned). * rustfmt * Wrapping LMDB batch around a refcell to work around borrow rules * Compilation up to grin chain
2018-06-22 11:08:06 +03:00
db_env: Arc<lmdb::Environment>,
mut capab: Capabilities,
config: P2PConfig,
adapter: Arc<ChainAdapter>,
genesis: Hash,
stop: Arc<AtomicBool>,
archive_mode: bool,
block_1_hash: Option<Hash>,
) -> Result<Server, Error> {
// In the case of an archive node, check that we do have the first block.
// In case of first sync we do not perform this check.
if archive_mode && adapter.total_height() > 0 {
// Check that we have block 1
match block_1_hash {
Some(hash) => match adapter.get_block(hash) {
Some(_) => debug!(LOGGER, "Full block 1 found, archive capabilities confirmed"),
None => {
debug!(
LOGGER,
"Full block 1 not found, archive capabilities disabled"
);
capab.remove(Capabilities::FULL_HIST);
}
},
None => {
debug!(LOGGER, "Block 1 not found, archive capabilities disabled");
capab.remove(Capabilities::FULL_HIST);
}
}
}
Ok(Server {
config: config.clone(),
capabilities: capab,
handshake: Arc::new(Handshake::new(genesis, config.clone())),
Wallet LMDB backend (#1151) * Migrate main node store to LMDB In preparation to using LMDB as a wallet database, migrate the node db. There's no point in having 2 key-value stores. In addition LMDB provides a few advantages as a node db, namely a much faster build (compared to RocksDb), lesser dependencies and transactions. * Migrated p2p store to lmdb, stuff compiles * More fixes, chain tests starting to pass * Fixed txhashset rollback messing with block save and general batch delimitation. Chain tests passing. * rustfmt * LMDB max map size of 10MB isn't really workable. Half TB seems reasonable. * Fix wallet tests * Rather crucial commit was missing * rustfmt * Fixing new merged tests following lmdb changes * rustfmt * * Make txhashset validation read-only on fast sync to avoid having a really long open transaction. * Fix deadlock in new block processing, batch should always be created within a txhashset lock (when they interact). * Comment about batch and txhashset interlacing * Fix store tests to use batch * Externalize wallet config and seed * Converted direct read access to file outputs map to an iterator * Cleaned up and simplified wallet Backend trait: * No more direct mutable access to internal structures (HashMap) * Batch interface for all writes * Remove unneeded read wrapper (read_wallet) * rustfmt * First (incomplete) pass at wallet LMDB backend * Progressing on lmdb backent iml * Added batch impl for LMDB wallet backend. Pretty much done with it, but not sure how to deal with commit (owned). * rustfmt * Wrapping LMDB batch around a refcell to work around borrow rules * Compilation up to grin chain
2018-06-22 11:08:06 +03:00
peers: Arc::new(Peers::new(PeerStore::new(db_env)?, adapter, config)),
stop: stop,
})
}
/// Starts a new TCP server and listen to incoming connections. This is a
/// blocking call until the TCP server stops.
pub fn listen(&self) -> Result<(), Error> {
// start peer monitoring thread
let peers_inner = self.peers.clone();
let stop = self.stop.clone();
2018-03-04 03:19:54 +03:00
let _ = thread::Builder::new()
.name("p2p-monitor".to_string())
.spawn(move || loop {
let total_diff = peers_inner.total_difficulty();
let total_height = peers_inner.total_height();
peers_inner.check_all(total_diff, total_height);
thread::sleep(Duration::from_secs(10));
if stop.load(Ordering::Relaxed) {
break;
}
2018-03-04 03:19:54 +03:00
});
// start TCP listener and handle incoming connections
let addr = SocketAddr::new(self.config.host, self.config.port);
let listener = TcpListener::bind(addr)?;
listener.set_nonblocking(true)?;
let sleep_time = Duration::from_millis(1);
loop {
match listener.accept() {
Ok((stream, peer_addr)) => {
if !self.check_banned(&stream) {
if let Err(e) = self.handle_new_peer(stream) {
warn!(
LOGGER,
"Error accepting peer {}: {:?}",
peer_addr.to_string(),
2018-03-04 03:19:54 +03:00
e
);
}
}
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
// nothing to do, will retry in next iteration
}
Err(e) => {
warn!(LOGGER, "Couldn't establish new client connection: {:?}", e);
}
}
if self.stop.load(Ordering::Relaxed) {
break;
}
thread::sleep(sleep_time);
}
Ok(())
}
/// Asks the server to connect to a new peer. Directly returns the peer if
/// we're already connected to the provided address.
pub fn connect(&self, addr: &SocketAddr) -> Result<Arc<RwLock<Peer>>, Error> {
if Peer::is_denied(&self.config, &addr) {
debug!(LOGGER, "Peer {} denied, not connecting.", addr);
return Err(Error::ConnectionClose);
}
if let Some(p) = self.peers.get_connected_peer(addr) {
// if we're already connected to the addr, just return the peer
trace!(LOGGER, "connect_peer: already connected {}", addr);
return Ok(p);
}
trace!(LOGGER, "connect_peer: connecting to {}", addr);
match TcpStream::connect_timeout(addr, Duration::from_secs(10)) {
Ok(mut stream) => {
let addr = SocketAddr::new(self.config.host, self.config.port);
let total_diff = self.peers.total_difficulty();
let peer = Peer::connect(
&mut stream,
self.capabilities,
total_diff,
addr,
&self.handshake,
self.peers.clone(),
)?;
let added = self.peers.add_connected(peer);
{
let mut peer = added.write().unwrap();
peer.start(stream);
}
Ok(added)
}
Err(e) => {
debug!(LOGGER, "Could not connect to {}: {:?}", addr, e);
Err(Error::Connection(e))
}
}
}
fn handle_new_peer(&self, mut stream: TcpStream) -> Result<(), Error> {
let total_diff = self.peers.total_difficulty();
// accept the peer and add it to the server map
let peer = Peer::accept(
&mut stream,
self.capabilities,
total_diff,
&self.handshake,
self.peers.clone(),
)?;
let added = self.peers.add_connected(peer);
let mut peer = added.write().unwrap();
peer.start(stream);
Ok(())
}
fn check_banned(&self, stream: &TcpStream) -> bool {
// peer has been banned, go away!
if let Ok(peer_addr) = stream.peer_addr() {
if self.peers.is_banned(peer_addr) {
debug!(LOGGER, "Peer {} banned, refusing connection.", peer_addr);
if let Err(e) = stream.shutdown(Shutdown::Both) {
debug!(LOGGER, "Error shutting down conn: {:?}", e);
}
return true;
}
}
false
}
pub fn stop(&self) {
self.stop.store(true, Ordering::Relaxed);
self.peers.stop();
}
}
/// A no-op network adapter used for testing.
pub struct DummyAdapter {}
impl ChainAdapter for DummyAdapter {
fn total_difficulty(&self) -> Difficulty {
Difficulty::one()
}
fn total_height(&self) -> u64 {
0
}
fn transaction_received(&self, _: core::Transaction, _stem: bool) {}
2018-03-04 03:19:54 +03:00
fn compact_block_received(&self, _cb: core::CompactBlock, _addr: SocketAddr) -> bool {
true
}
fn header_received(&self, _bh: core::BlockHeader, _addr: SocketAddr) -> bool {
true
}
fn block_received(&self, _: core::Block, _: SocketAddr) -> bool {
true
}
fn headers_received(&self, _: Vec<core::BlockHeader>, _: SocketAddr) {}
fn locate_headers(&self, _: Vec<Hash>) -> Vec<core::BlockHeader> {
vec![]
}
fn get_block(&self, _: Hash) -> Option<core::Block> {
None
}
fn txhashset_read(&self, _h: Hash) -> Option<TxHashSetRead> {
[WIP] Abridged sync (#440) * Util to zip and unzip directories * First pass at sumtree request/response. Add message types, implement the exchange in the protocol, zip up the sumtree directory and stream the file over, with necessary adapter hooks. * Implement the sumtree archive receive logicGets the sumtree archive data stream from the network and write it to a file. Unzip the file, place it at the right spot and reconstruct the sumtree data structure, rewinding where to the right spot. * Sumtree hash structure validation * Simplify sumtree backend buffering logic. The backend for a sumtree has to implement some in-memory buffering logic to provide a commit/rollback interface. The backend itself is an aggregate of 3 underlying storages (an append only file, a remove log and a skip list). The buffering was previously implemented both by the backend and some of the underlying storages. Now pushing back all buffering logic to the storages to keep the backend simpler. * Add kernel append only store file to sumtrees. The chain sumtrees structure now also saves all kernels to a dedicated file. As that storage is implemented by the append only file wrapper, it's also rewind-aware. * Full state validation. Checks that: - MMRs are sane (hash and sum each node) - Tree roots match the corresponding header - Kernel signatures are valid - Sum of all kernel excesses equals the sum of UTXO commitments minus the supply * Fast sync handoff to body sync. Once the fast-sync state is fully setup, get bacj in body sync mode to get the full bodies of the last blocks we're missing. * First fully working fast sync * Facility in p2p conn to deal with attachments (raw binary after message). * Re-introduced sumtree send and receive message handling using the above. * Fixed test and finished updating all required db state after sumtree validation. * Massaged a little bit the pipeline orphan check to still work after the new sumtrees have been setup. * Various cleanup. Consolidated fast sync and full sync into a single function as they're very similar. Proper conditions to trigger a sumtree request and some checks on receiving it.
2018-02-10 01:32:16 +03:00
unimplemented!()
}
fn txhashset_receive_ready(&self) -> bool {
false
}
fn txhashset_write(
2018-03-04 03:19:54 +03:00
&self,
_h: Hash,
_txhashset_data: File,
2018-03-04 03:19:54 +03:00
_peer_addr: SocketAddr,
) -> bool {
[WIP] Abridged sync (#440) * Util to zip and unzip directories * First pass at sumtree request/response. Add message types, implement the exchange in the protocol, zip up the sumtree directory and stream the file over, with necessary adapter hooks. * Implement the sumtree archive receive logicGets the sumtree archive data stream from the network and write it to a file. Unzip the file, place it at the right spot and reconstruct the sumtree data structure, rewinding where to the right spot. * Sumtree hash structure validation * Simplify sumtree backend buffering logic. The backend for a sumtree has to implement some in-memory buffering logic to provide a commit/rollback interface. The backend itself is an aggregate of 3 underlying storages (an append only file, a remove log and a skip list). The buffering was previously implemented both by the backend and some of the underlying storages. Now pushing back all buffering logic to the storages to keep the backend simpler. * Add kernel append only store file to sumtrees. The chain sumtrees structure now also saves all kernels to a dedicated file. As that storage is implemented by the append only file wrapper, it's also rewind-aware. * Full state validation. Checks that: - MMRs are sane (hash and sum each node) - Tree roots match the corresponding header - Kernel signatures are valid - Sum of all kernel excesses equals the sum of UTXO commitments minus the supply * Fast sync handoff to body sync. Once the fast-sync state is fully setup, get bacj in body sync mode to get the full bodies of the last blocks we're missing. * First fully working fast sync * Facility in p2p conn to deal with attachments (raw binary after message). * Re-introduced sumtree send and receive message handling using the above. * Fixed test and finished updating all required db state after sumtree validation. * Massaged a little bit the pipeline orphan check to still work after the new sumtrees have been setup. * Various cleanup. Consolidated fast sync and full sync into a single function as they're very similar. Proper conditions to trigger a sumtree request and some checks on receiving it.
2018-02-10 01:32:16 +03:00
false
}
}
impl NetAdapter for DummyAdapter {
fn find_peer_addrs(&self, _: Capabilities) -> Vec<SocketAddr> {
vec![]
}
fn peer_addrs_received(&self, _: Vec<SocketAddr>) {}
2018-03-04 03:19:54 +03:00
fn peer_difficulty(&self, _: SocketAddr, _: Difficulty, _: u64) {}
fn is_banned(&self, _: SocketAddr) -> bool {
false
}
}