grin/p2p/src/peer.rs

620 lines
18 KiB
Rust
Raw Normal View History

// Copyright 2019 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::util::{Mutex, RwLock};
use std::fmt;
[WIP] Abridged sync (#440) * Util to zip and unzip directories * First pass at sumtree request/response. Add message types, implement the exchange in the protocol, zip up the sumtree directory and stream the file over, with necessary adapter hooks. * Implement the sumtree archive receive logicGets the sumtree archive data stream from the network and write it to a file. Unzip the file, place it at the right spot and reconstruct the sumtree data structure, rewinding where to the right spot. * Sumtree hash structure validation * Simplify sumtree backend buffering logic. The backend for a sumtree has to implement some in-memory buffering logic to provide a commit/rollback interface. The backend itself is an aggregate of 3 underlying storages (an append only file, a remove log and a skip list). The buffering was previously implemented both by the backend and some of the underlying storages. Now pushing back all buffering logic to the storages to keep the backend simpler. * Add kernel append only store file to sumtrees. The chain sumtrees structure now also saves all kernels to a dedicated file. As that storage is implemented by the append only file wrapper, it's also rewind-aware. * Full state validation. Checks that: - MMRs are sane (hash and sum each node) - Tree roots match the corresponding header - Kernel signatures are valid - Sum of all kernel excesses equals the sum of UTXO commitments minus the supply * Fast sync handoff to body sync. Once the fast-sync state is fully setup, get bacj in body sync mode to get the full bodies of the last blocks we're missing. * First fully working fast sync * Facility in p2p conn to deal with attachments (raw binary after message). * Re-introduced sumtree send and receive message handling using the above. * Fixed test and finished updating all required db state after sumtree validation. * Massaged a little bit the pipeline orphan check to still work after the new sumtrees have been setup. * Various cleanup. Consolidated fast sync and full sync into a single function as they're very similar. Proper conditions to trigger a sumtree request and some checks on receiving it.
2018-02-10 01:32:16 +03:00
use std::fs::File;
use std::io::Read;
use std::net::{Shutdown, TcpStream};
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use lru_cache::LruCache;
use crate::chain;
use crate::conn;
use crate::core::core::hash::{Hash, Hashed};
use crate::core::pow::Difficulty;
use crate::core::ser::Writeable;
use crate::core::{core, global};
use crate::handshake::Handshake;
use crate::msg::{
self, BanReason, GetPeerAddrs, KernelDataRequest, Locator, Msg, Ping, TxHashSetRequest, Type,
};
use crate::protocol::Protocol;
use crate::types::{
Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan,
TxHashSetRead,
};
use chrono::prelude::{DateTime, Utc};
const MAX_TRACK_SIZE: usize = 30;
const MAX_PEER_MSG_PER_MIN: u64 = 500;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
/// Remind: don't mix up this 'State' with that 'State' in p2p/src/store.rs,
/// which has different 3 states: {Healthy, Banned, Defunct}.
/// For example: 'Disconnected' state here could still be 'Healthy' and could reconnect in next loop.
enum State {
Connected,
Banned,
}
pub struct Peer {
pub info: PeerInfo,
state: Arc<RwLock<State>>,
// set of all hashes known to this peer (so no need to send)
tracking_adapter: TrackingAdapter,
tracker: Arc<conn::Tracker>,
send_handle: Mutex<conn::ConnHandle>,
// we need a special lock for stop operation, can't reuse handle mutex for that
// because it may be locked by different reasons, so we should wait for that, close
// mutex can be taken only during shutdown, it happens once
stop_handle: Mutex<conn::StopHandle>,
// Whether or not we requested a txhashset from this peer
state_sync_requested: Arc<AtomicBool>,
[1.1.0] Merge master into 1.1.0 (#2720) * cleanup legacy "3 dot" check (#2625) * Allow to peers behind NAT to get up to preferred_max connections (#2543) Allow to peers behind NAT to get up to preffered_max connections If peer has only outbound connections it's mot likely behind NAT and we should not stop it from getting more outbound connections * Reduce usage of unwrap in p2p crate (#2627) Also change store crate a bit * Simplify (and fix) output_pos cleanup during chain compaction (#2609) * expose leaf pos iterator use it for various things in txhashset when iterating over outputs * fix * cleanup * rebuild output_pos index (and clear it out first) when compacting the chain * fixup tests * refactor to match on (output, proof) tuple * add comments to compact() to explain what is going on. * get rid of some boxing around the leaf_set iterator * cleanup * [docs] Add switch commitment documentation (#2526) * remove references to no-longer existing switch commitment hash (as switch commitments were removed in ca8447f3bd49e80578770da841e5fbbac2c23cde and moved into the blinding factor of the Pedersen Commitment) * some rewording (points vs curves) and fix of small formatting issues * Add switch commitment documentation * [docs] Documents in grin repo had translated in Korean. (#2604) * Start to M/W intro translate in Korean * translate in Korean * add korean translation on intro * table_of_content.md translate in Korean. * table_of_content_KR.md finish translate in Korean, start to translate State_KR.md * add state_KR.md & commit some translation in State_KR.md * WIP stat_KR.md translation * add build_KR.md && stratum_KR.md * finish translate stratum_KR.md & table_of_content_KR.md * rename intro.KR.md to intro_KR.md * add intro_KR.md file path each language's intro.md * add Korean translation file path to stratum.md & table_of_contents.md * fix difference with grin/master * Fix TxHashSet file filter for Windows. (#2641) * Fix TxHashSet file filter for Windows. * rustfmt * Updating regexp * Adding in test case * Display the current download rate rather than the average when syncing the chain (#2633) * When syncing the chain, calculate the displayed download speed using the current rate from the most recent iteration, rather than the average download speed from the entire syncing process. * Replace the explicitly ignored variables in the pattern with an implicit ignore * remove root = true from editorconfig (#2655) * Add Medium post to intro (#2654) Spoke to @yeastplume who agreed it makes sense to add the "Grin Transactions Explained, Step-by-Step" Medium post to intro.md Open for suggestions on a better location. * add a new configure item for log_max_files (#2601) * add a new configure item for log_max_files * rustfmt * use a constant instead of multiple 32 * rustfmt * Fix the build warning of deprecated trim_right_matches (#2662) * [DOC] state.md, build.md and chain directory documents translate in Korean. (#2649) * add md files for translation. * start to translation fast-sync, code_structure. add file build_KR.md, states_KR.md * add dandelion_KR.md && simulation_KR.md for Korean translation. * add md files for translation. * start to translation fast-sync, code_structure. add file build_KR.md, states_KR.md * add dandelion_KR.md && simulation_KR.md for Korean translation. * remove some useless md files for translation. this is rearrange set up translation order. * add dot end of sentence & translate build.md in korean * remove fast-sync_KR.md * finish build_KR.md translation * finish build_KR.md translation * finish translation state_KR.md & add phrase in state.md to move other language md file * translate blocks_and_headers.md && chain_sync.md in Korean * add . in chain_sync.md , translation finished in doc/chain dir. * fix some miss typos * Api documentation fixes (#2646) * Fix the API documentation for Chain Validate (v1/chain/validate). It was documented as a POST, but it is actually a GET request, which can be seen in its handler ChainValidationHandler * Update the API V1 route list response to include the headers and merkleproof routes. Also clarify that for the chain/outputs route you must specify either byids or byheight to select outputs. * refactor(ci): reorganize CI related code (#2658) Break-down the CI related code into smaller more maintainable pieces. * Specify grin or nanogrins in API docs where applicable (#2642) * Set Content-Type in API client (#2680) * Reduce number of unwraps in chain crate (#2679) * fix: the restart of state sync doesn't work sometimes (#2687) * let check_txhashset_needed return true on abnormal case (#2684) * Reduce number of unwwaps in api crate (#2681) * Reduce number of unwwaps in api crate * Format use section * Small QoL improvements for wallet developers (#2651) * Small changes for wallet devs * Move create_nonce into Keychain trait * Replace match by map_err * Add flag to Slate to skip fee check * Fix secp dependency * Remove check_fee flag in Slate * Add Japanese edition of build.md (#2697) * catch the panic to avoid peer thread quit early (#2686) * catch the panic to avoid peer thread quit before taking the chance to ban * move catch wrapper logic down into the util crate * log the panic info * keep txhashset.rs untouched * remove a warning * [DOC] dandelion.md, simulation.md ,fast-sync.md and pruning.md documents translate in Korean. (#2678) * Show response code in API client error message (#2683) It's hard to investigate what happens when an API client error is printed out * Add some better logging for get_outputs_by_id failure states (#2705) * Switch commitment doc fixes (#2645) Fix some typos and remove the use of parentheses in a couple of places to make the reading flow a bit better. * docs: update/add new README.md badges (#2708) Replace existing badges with SVG counterparts and add a bunch of new ones. * Update intro.md (#2702) Add mention of censoring attack prevented by range proofs * use sandbox folder for txhashset validation on state sync (#2685) * use sandbox folder for txhashset validation on state sync * rustfmt * use temp directory as the sandbox instead actual db_root txhashset dir * rustfmt * move txhashset overwrite to the end of full validation * fix travis-ci test * rustfmt * fix: hashset have 2 folders including txhashset and header * rustfmt * (1)switch to rebuild_header_mmr instead of copy the sandbox header mmr (2)lock txhashset when overwriting and opening and rebuild * minor improve on sandbox_dir * add Japanese edition of state.md (#2703) * Attempt to fix broken TUI locale (#2713) Can confirm that on the same machine 1.0.2 TUI looks great and is broken on the current master. Bump of `cursive` version fixed it for me. Fixes #2676 * clean the header folder in sandbox (#2716) * forgot to clean the header folder in sandbox in #2685 * Reduce number of unwraps in servers crate (#2707) It doesn't include stratum server which is sufficiently changed in 1.1 branch and adapters, which is big enough for a separate PR. * rustfmt * change version to beta
2019-04-01 13:47:48 +03:00
}
impl fmt::Debug for Peer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Peer({:?})", &self.info)
}
}
impl Peer {
// Only accept and connect can be externally used to build a peer
fn new(info: PeerInfo, conn: TcpStream, adapter: Arc<dyn NetAdapter>) -> std::io::Result<Peer> {
let state = Arc::new(RwLock::new(State::Connected));
let state_sync_requested = Arc::new(AtomicBool::new(false));
let tracking_adapter = TrackingAdapter::new(adapter);
let handler = Protocol::new(
Arc::new(tracking_adapter.clone()),
info.clone(),
state_sync_requested.clone(),
);
let tracker = Arc::new(conn::Tracker::new());
let (sendh, stoph) = conn::listen(conn, info.version, tracker.clone(), handler)?;
let send_handle = Mutex::new(sendh);
let stop_handle = Mutex::new(stoph);
Ok(Peer {
info,
state,
tracking_adapter,
tracker,
send_handle,
stop_handle,
state_sync_requested,
})
}
pub fn accept(
mut conn: TcpStream,
capab: Capabilities,
total_difficulty: Difficulty,
hs: &Handshake,
adapter: Arc<dyn NetAdapter>,
) -> Result<Peer, Error> {
debug!("accept: handshaking from {:?}", conn.peer_addr());
let info = hs.accept(capab, total_difficulty, &mut conn);
match info {
Ok(info) => Ok(Peer::new(info, conn, adapter)?),
Err(e) => {
debug!(
"accept: handshaking from {:?} failed with error: {:?}",
conn.peer_addr(),
e
);
if let Err(e) = conn.shutdown(Shutdown::Both) {
debug!("Error shutting down conn: {:?}", e);
}
Err(e)
}
}
}
pub fn connect(
mut conn: TcpStream,
capab: Capabilities,
total_difficulty: Difficulty,
self_addr: PeerAddr,
hs: &Handshake,
adapter: Arc<dyn NetAdapter>,
) -> Result<Peer, Error> {
2019-01-02 04:54:17 +03:00
debug!("connect: handshaking with {:?}", conn.peer_addr());
let info = hs.initiate(capab, total_difficulty, self_addr, &mut conn);
match info {
Ok(info) => Ok(Peer::new(info, conn, adapter)?),
Err(e) => {
debug!(
"connect: handshaking with {:?} failed with error: {:?}",
2019-01-02 04:54:17 +03:00
conn.peer_addr(),
e
);
if let Err(e) = conn.shutdown(Shutdown::Both) {
debug!("Error shutting down conn: {:?}", e);
}
Err(e)
}
}
}
pub fn is_denied(config: &P2PConfig, peer_addr: PeerAddr) -> bool {
if let Some(ref denied) = config.peers_deny {
if denied.contains(&peer_addr) {
2018-03-04 03:19:54 +03:00
debug!(
"checking peer allowed/denied: {:?} explicitly denied",
peer_addr
2018-03-04 03:19:54 +03:00
);
return true;
}
}
if let Some(ref allowed) = config.peers_allow {
if allowed.contains(&peer_addr) {
2018-03-04 03:19:54 +03:00
debug!(
"checking peer allowed/denied: {:?} explicitly allowed",
peer_addr
2018-03-04 03:19:54 +03:00
);
return false;
} else {
2018-03-04 03:19:54 +03:00
debug!(
"checking peer allowed/denied: {:?} not explicitly allowed, denying",
peer_addr
2018-03-04 03:19:54 +03:00
);
return true;
}
}
2018-03-04 03:19:54 +03:00
// default to allowing peer connection if we do not explicitly allow or deny
// the peer
false
}
/// Whether this peer is currently connected.
pub fn is_connected(&self) -> bool {
State::Connected == *self.state.read()
}
/// Whether this peer has been banned.
pub fn is_banned(&self) -> bool {
State::Banned == *self.state.read()
}
/// Whether this peer is stuck on sync.
pub fn is_stuck(&self) -> (bool, Difficulty) {
let peer_live_info = self.info.live_info.read();
let now = Utc::now().timestamp_millis();
// if last updated difficulty is 2 hours ago, we're sure this peer is a stuck node.
if now > peer_live_info.stuck_detector.timestamp_millis() + global::STUCK_PEER_KICK_TIME {
(true, peer_live_info.total_difficulty)
} else {
(false, peer_live_info.total_difficulty)
}
}
/// Whether the peer is considered abusive, mostly for spammy nodes
pub fn is_abusive(&self) -> bool {
let rec = self.tracker.received_bytes.read();
let sent = self.tracker.sent_bytes.read();
rec.count_per_min() > MAX_PEER_MSG_PER_MIN || sent.count_per_min() > MAX_PEER_MSG_PER_MIN
}
/// Number of bytes sent to the peer
pub fn last_min_sent_bytes(&self) -> Option<u64> {
let sent_bytes = self.tracker.sent_bytes.read();
Some(sent_bytes.bytes_per_min())
}
/// Number of bytes received from the peer
pub fn last_min_received_bytes(&self) -> Option<u64> {
let received_bytes = self.tracker.received_bytes.read();
Some(received_bytes.bytes_per_min())
}
pub fn last_min_message_counts(&self) -> Option<(u64, u64)> {
let received_bytes = self.tracker.received_bytes.read();
let sent_bytes = self.tracker.sent_bytes.read();
Some((sent_bytes.count_per_min(), received_bytes.count_per_min()))
}
/// Set this peer status to banned
pub fn set_banned(&self) {
*self.state.write() = State::Banned;
}
/// Send a msg with given msg_type to our peer via the connection.
fn send<T: Writeable>(&self, msg: T, msg_type: Type) -> Result<(), Error> {
let msg = Msg::new(msg_type, msg, self.info.version)?;
self.send_handle.lock().send(msg)
}
2018-03-04 03:19:54 +03:00
/// Send a ping to the remote peer, providing our local difficulty and
/// height
pub fn send_ping(&self, total_difficulty: Difficulty, height: u64) -> Result<(), Error> {
2018-03-04 03:19:54 +03:00
let ping_msg = Ping {
total_difficulty,
height,
};
self.send(ping_msg, msg::Type::Ping)
}
/// Send the ban reason before banning
[1.1.0] Merge master into 1.1.0 (#2720) * cleanup legacy "3 dot" check (#2625) * Allow to peers behind NAT to get up to preferred_max connections (#2543) Allow to peers behind NAT to get up to preffered_max connections If peer has only outbound connections it's mot likely behind NAT and we should not stop it from getting more outbound connections * Reduce usage of unwrap in p2p crate (#2627) Also change store crate a bit * Simplify (and fix) output_pos cleanup during chain compaction (#2609) * expose leaf pos iterator use it for various things in txhashset when iterating over outputs * fix * cleanup * rebuild output_pos index (and clear it out first) when compacting the chain * fixup tests * refactor to match on (output, proof) tuple * add comments to compact() to explain what is going on. * get rid of some boxing around the leaf_set iterator * cleanup * [docs] Add switch commitment documentation (#2526) * remove references to no-longer existing switch commitment hash (as switch commitments were removed in ca8447f3bd49e80578770da841e5fbbac2c23cde and moved into the blinding factor of the Pedersen Commitment) * some rewording (points vs curves) and fix of small formatting issues * Add switch commitment documentation * [docs] Documents in grin repo had translated in Korean. (#2604) * Start to M/W intro translate in Korean * translate in Korean * add korean translation on intro * table_of_content.md translate in Korean. * table_of_content_KR.md finish translate in Korean, start to translate State_KR.md * add state_KR.md & commit some translation in State_KR.md * WIP stat_KR.md translation * add build_KR.md && stratum_KR.md * finish translate stratum_KR.md & table_of_content_KR.md * rename intro.KR.md to intro_KR.md * add intro_KR.md file path each language's intro.md * add Korean translation file path to stratum.md & table_of_contents.md * fix difference with grin/master * Fix TxHashSet file filter for Windows. (#2641) * Fix TxHashSet file filter for Windows. * rustfmt * Updating regexp * Adding in test case * Display the current download rate rather than the average when syncing the chain (#2633) * When syncing the chain, calculate the displayed download speed using the current rate from the most recent iteration, rather than the average download speed from the entire syncing process. * Replace the explicitly ignored variables in the pattern with an implicit ignore * remove root = true from editorconfig (#2655) * Add Medium post to intro (#2654) Spoke to @yeastplume who agreed it makes sense to add the "Grin Transactions Explained, Step-by-Step" Medium post to intro.md Open for suggestions on a better location. * add a new configure item for log_max_files (#2601) * add a new configure item for log_max_files * rustfmt * use a constant instead of multiple 32 * rustfmt * Fix the build warning of deprecated trim_right_matches (#2662) * [DOC] state.md, build.md and chain directory documents translate in Korean. (#2649) * add md files for translation. * start to translation fast-sync, code_structure. add file build_KR.md, states_KR.md * add dandelion_KR.md && simulation_KR.md for Korean translation. * add md files for translation. * start to translation fast-sync, code_structure. add file build_KR.md, states_KR.md * add dandelion_KR.md && simulation_KR.md for Korean translation. * remove some useless md files for translation. this is rearrange set up translation order. * add dot end of sentence & translate build.md in korean * remove fast-sync_KR.md * finish build_KR.md translation * finish build_KR.md translation * finish translation state_KR.md & add phrase in state.md to move other language md file * translate blocks_and_headers.md && chain_sync.md in Korean * add . in chain_sync.md , translation finished in doc/chain dir. * fix some miss typos * Api documentation fixes (#2646) * Fix the API documentation for Chain Validate (v1/chain/validate). It was documented as a POST, but it is actually a GET request, which can be seen in its handler ChainValidationHandler * Update the API V1 route list response to include the headers and merkleproof routes. Also clarify that for the chain/outputs route you must specify either byids or byheight to select outputs. * refactor(ci): reorganize CI related code (#2658) Break-down the CI related code into smaller more maintainable pieces. * Specify grin or nanogrins in API docs where applicable (#2642) * Set Content-Type in API client (#2680) * Reduce number of unwraps in chain crate (#2679) * fix: the restart of state sync doesn't work sometimes (#2687) * let check_txhashset_needed return true on abnormal case (#2684) * Reduce number of unwwaps in api crate (#2681) * Reduce number of unwwaps in api crate * Format use section * Small QoL improvements for wallet developers (#2651) * Small changes for wallet devs * Move create_nonce into Keychain trait * Replace match by map_err * Add flag to Slate to skip fee check * Fix secp dependency * Remove check_fee flag in Slate * Add Japanese edition of build.md (#2697) * catch the panic to avoid peer thread quit early (#2686) * catch the panic to avoid peer thread quit before taking the chance to ban * move catch wrapper logic down into the util crate * log the panic info * keep txhashset.rs untouched * remove a warning * [DOC] dandelion.md, simulation.md ,fast-sync.md and pruning.md documents translate in Korean. (#2678) * Show response code in API client error message (#2683) It's hard to investigate what happens when an API client error is printed out * Add some better logging for get_outputs_by_id failure states (#2705) * Switch commitment doc fixes (#2645) Fix some typos and remove the use of parentheses in a couple of places to make the reading flow a bit better. * docs: update/add new README.md badges (#2708) Replace existing badges with SVG counterparts and add a bunch of new ones. * Update intro.md (#2702) Add mention of censoring attack prevented by range proofs * use sandbox folder for txhashset validation on state sync (#2685) * use sandbox folder for txhashset validation on state sync * rustfmt * use temp directory as the sandbox instead actual db_root txhashset dir * rustfmt * move txhashset overwrite to the end of full validation * fix travis-ci test * rustfmt * fix: hashset have 2 folders including txhashset and header * rustfmt * (1)switch to rebuild_header_mmr instead of copy the sandbox header mmr (2)lock txhashset when overwriting and opening and rebuild * minor improve on sandbox_dir * add Japanese edition of state.md (#2703) * Attempt to fix broken TUI locale (#2713) Can confirm that on the same machine 1.0.2 TUI looks great and is broken on the current master. Bump of `cursive` version fixed it for me. Fixes #2676 * clean the header folder in sandbox (#2716) * forgot to clean the header folder in sandbox in #2685 * Reduce number of unwraps in servers crate (#2707) It doesn't include stratum server which is sufficiently changed in 1.1 branch and adapters, which is big enough for a separate PR. * rustfmt * change version to beta
2019-04-01 13:47:48 +03:00
pub fn send_ban_reason(&self, ban_reason: ReasonForBan) -> Result<(), Error> {
let ban_reason_msg = BanReason { ban_reason };
self.send(ban_reason_msg, msg::Type::BanReason).map(|_| ())
}
2016-12-21 04:33:20 +03:00
/// Sends the provided block to the remote peer. The request may be dropped
/// if the remote peer is known to already have the block.
pub fn send_block(&self, b: &core::Block) -> Result<bool, Error> {
if !self.tracking_adapter.has_recv(b.hash()) {
trace!("Send block {} to {}", b.hash(), self.info.addr);
self.send(b, msg::Type::Block)?;
Ok(true)
} else {
debug!(
"Suppress block send {} to {} (already seen)",
b.hash(),
self.info.addr,
);
Ok(false)
}
}
pub fn send_compact_block(&self, b: &core::CompactBlock) -> Result<bool, Error> {
if !self.tracking_adapter.has_recv(b.hash()) {
trace!("Send compact block {} to {}", b.hash(), self.info.addr);
self.send(b, msg::Type::CompactBlock)?;
Ok(true)
} else {
debug!(
"Suppress compact block send {} to {} (already seen)",
b.hash(),
self.info.addr,
);
Ok(false)
}
}
pub fn send_header(&self, bh: &core::BlockHeader) -> Result<bool, Error> {
if !self.tracking_adapter.has_recv(bh.hash()) {
debug!("Send header {} to {}", bh.hash(), self.info.addr);
self.send(bh, msg::Type::Header)?;
Ok(true)
} else {
debug!(
"Suppress header send {} to {} (already seen)",
bh.hash(),
self.info.addr,
);
Ok(false)
}
2016-12-21 04:33:20 +03:00
}
pub fn send_tx_kernel_hash(&self, h: Hash) -> Result<bool, Error> {
if !self.tracking_adapter.has_recv(h) {
debug!("Send tx kernel hash {} to {}", h, self.info.addr);
self.send(h, msg::Type::TransactionKernel)?;
Ok(true)
} else {
debug!(
"Not sending tx kernel hash {} to {} (already seen)",
h, self.info.addr
);
Ok(false)
}
}
/// Sends the provided transaction to the remote peer. The request may be
/// dropped if the remote peer is known to already have the transaction.
/// We support broadcast of lightweight tx kernel hash
/// so track known txs by kernel hash.
pub fn send_transaction(&self, tx: &core::Transaction) -> Result<bool, Error> {
let kernel = &tx.kernels()[0];
if self
.info
.capabilities
.contains(Capabilities::TX_KERNEL_HASH)
{
return self.send_tx_kernel_hash(kernel.hash());
}
if !self.tracking_adapter.has_recv(kernel.hash()) {
debug!("Send full tx {} to {}", tx.hash(), self.info.addr);
self.send(tx, msg::Type::Transaction)?;
Ok(true)
} else {
2018-03-04 03:19:54 +03:00
debug!(
"Not sending tx {} to {} (already seen)",
tx.hash(),
self.info.addr
);
Ok(false)
}
}
Minimal Transaction Pool (#1067) * verify a tx like we verify a block (experimental) * first minimal_pool test up and running but not testing what we need to * rework tx_pool validation to use txhashset extension * minimal tx pool wired up but rough * works locally (rough statew though) delete "legacy" pool and graph code * rework the new pool into TransactionPool and Pool impls * rework pool to store pool entries with associated timer and source etc. * all_transactions * extra_txs so we can validate stempool against existing txpool * rework reconcile_block * txhashset apply_raw_tx can now rewind to a checkpoint (prev raw tx) * wip - txhashset tx tests * more flexible rewind on MMRs * add tests to cover apply_raw_txs on txhashset extension * add_to_stempool and add_to_txpool * deaggregate multi kernel tx when adding to txpoool * handle freshness in stempool handle propagation of stempool txs via dandelion monitor * patience timer and fluff if we cannot propagate to next relay * aggregate and fluff stempool is we have no relay * refactor coinbase maturity * rewrote basic tx pool tests to use a real txhashset via chain adapter * rework dandelion monitor to reflect recent discussion works locally but needs a cleanup * refactor dandelion_monitor - split out phases * more pool test coverage * remove old test code from pool (still wip) * block_building and block_reconciliation tests * tracked down chain test failure... * fix test_coinbase_maturity * dandelion_monitor now runs... * refactor dandelion config, shared across p2p and pool components * fix pool tests with new config * fix p2p tests * rework tx pool to deal with duplicate commitments (testnet2 limitation) * cleanup and address some PR feedback * add big comment about pre_tx...
2018-05-30 23:57:13 +03:00
/// Sends the provided stem transaction to the remote peer.
/// Note: tracking adapter is ignored for stem transactions (while under
/// embargo).
pub fn send_stem_transaction(&self, tx: &core::Transaction) -> Result<(), Error> {
debug!("Send (stem) tx {} to {}", tx.hash(), self.info.addr);
self.send(tx, msg::Type::StemTransaction)
}
/// Sends a request for block headers from the provided block locator
pub fn send_header_request(&self, locator: Vec<Hash>) -> Result<(), Error> {
self.send(&Locator { hashes: locator }, msg::Type::GetHeaders)
}
pub fn send_tx_request(&self, h: Hash) -> Result<(), Error> {
debug!(
"Requesting tx (kernel hash) {} from peer {}.",
h, self.info.addr
);
self.send(&h, msg::Type::GetTransaction)
}
/// Sends a request for a specific block by hash.
/// Takes opts so we can track if this request was due to our node syncing or otherwise.
pub fn send_block_request(&self, h: Hash, opts: chain::Options) -> Result<(), Error> {
debug!("Requesting block {} from peer {}.", h, self.info.addr);
self.tracking_adapter.push_req(h, opts);
self.send(&h, msg::Type::GetBlock)
}
/// Sends a request for a specific compact block by hash
pub fn send_compact_block_request(&self, h: Hash) -> Result<(), Error> {
debug!("Requesting compact block {} from {}", h, self.info.addr);
self.send(&h, msg::Type::GetCompactBlock)
}
pub fn send_peer_request(&self, capab: Capabilities) -> Result<(), Error> {
trace!("Asking {} for more peers {:?}", self.info.addr, capab);
self.send(
&GetPeerAddrs {
capabilities: capab,
},
2018-03-04 03:19:54 +03:00
msg::Type::GetPeerAddrs,
)
}
pub fn send_txhashset_request(&self, height: u64, hash: Hash) -> Result<(), Error> {
2018-03-04 03:19:54 +03:00
debug!(
"Asking {} for txhashset archive at {} {}.",
self.info.addr, height, hash
2018-03-04 03:19:54 +03:00
);
self.state_sync_requested.store(true, Ordering::Relaxed);
self.send(
&TxHashSetRequest { hash, height },
msg::Type::TxHashSetRequest,
2018-03-04 03:19:54 +03:00
)
[WIP] Abridged sync (#440) * Util to zip and unzip directories * First pass at sumtree request/response. Add message types, implement the exchange in the protocol, zip up the sumtree directory and stream the file over, with necessary adapter hooks. * Implement the sumtree archive receive logicGets the sumtree archive data stream from the network and write it to a file. Unzip the file, place it at the right spot and reconstruct the sumtree data structure, rewinding where to the right spot. * Sumtree hash structure validation * Simplify sumtree backend buffering logic. The backend for a sumtree has to implement some in-memory buffering logic to provide a commit/rollback interface. The backend itself is an aggregate of 3 underlying storages (an append only file, a remove log and a skip list). The buffering was previously implemented both by the backend and some of the underlying storages. Now pushing back all buffering logic to the storages to keep the backend simpler. * Add kernel append only store file to sumtrees. The chain sumtrees structure now also saves all kernels to a dedicated file. As that storage is implemented by the append only file wrapper, it's also rewind-aware. * Full state validation. Checks that: - MMRs are sane (hash and sum each node) - Tree roots match the corresponding header - Kernel signatures are valid - Sum of all kernel excesses equals the sum of UTXO commitments minus the supply * Fast sync handoff to body sync. Once the fast-sync state is fully setup, get bacj in body sync mode to get the full bodies of the last blocks we're missing. * First fully working fast sync * Facility in p2p conn to deal with attachments (raw binary after message). * Re-introduced sumtree send and receive message handling using the above. * Fixed test and finished updating all required db state after sumtree validation. * Massaged a little bit the pipeline orphan check to still work after the new sumtrees have been setup. * Various cleanup. Consolidated fast sync and full sync into a single function as they're very similar. Proper conditions to trigger a sumtree request and some checks on receiving it.
2018-02-10 01:32:16 +03:00
}
pub fn send_kernel_data_request(&self) -> Result<(), Error> {
debug!("Asking {} for kernel data.", self.info.addr);
self.send(&KernelDataRequest {}, msg::Type::KernelDataRequest)
}
/// Stops the peer
pub fn stop(&self) {
debug!("Stopping peer {:?}", self.info.addr);
match self.stop_handle.try_lock() {
Some(handle) => handle.stop(),
None => error!("can't get stop lock for peer"),
}
}
/// Waits until the peer's thread exit
pub fn wait(&self) {
debug!("Waiting for peer {:?} to stop", self.info.addr);
match self.stop_handle.try_lock() {
Some(mut handle) => handle.wait(),
None => error!("can't get stop lock for peer"),
}
}
}
/// Adapter implementation that forwards everything to an underlying adapter
/// but keeps track of the block and transaction hashes that were requested or
/// received.
#[derive(Clone)]
struct TrackingAdapter {
adapter: Arc<dyn NetAdapter>,
received: Arc<RwLock<LruCache<Hash, ()>>>,
requested: Arc<RwLock<LruCache<Hash, chain::Options>>>,
}
impl TrackingAdapter {
fn new(adapter: Arc<dyn NetAdapter>) -> TrackingAdapter {
TrackingAdapter {
adapter: adapter,
received: Arc::new(RwLock::new(LruCache::new(MAX_TRACK_SIZE))),
requested: Arc::new(RwLock::new(LruCache::new(MAX_TRACK_SIZE))),
}
}
fn has_recv(&self, hash: Hash) -> bool {
self.received.write().contains_key(&hash)
}
fn push_recv(&self, hash: Hash) {
self.received.write().insert(hash, ());
}
/// Track a block or transaction hash requested by us.
/// Track the opts alongside the hash so we know if this was due to us syncing or not.
fn push_req(&self, hash: Hash, opts: chain::Options) {
self.requested.write().insert(hash, opts);
}
fn req_opts(&self, hash: Hash) -> Option<chain::Options> {
self.requested.write().get_mut(&hash).cloned()
}
}
impl ChainAdapter for TrackingAdapter {
fn total_difficulty(&self) -> Result<Difficulty, chain::Error> {
self.adapter.total_difficulty()
}
fn total_height(&self) -> Result<u64, chain::Error> {
self.adapter.total_height()
}
fn get_transaction(&self, kernel_hash: Hash) -> Option<core::Transaction> {
self.adapter.get_transaction(kernel_hash)
}
fn tx_kernel_received(
&self,
kernel_hash: Hash,
peer_info: &PeerInfo,
) -> Result<bool, chain::Error> {
self.push_recv(kernel_hash);
self.adapter.tx_kernel_received(kernel_hash, peer_info)
}
fn transaction_received(
&self,
tx: core::Transaction,
stem: bool,
) -> Result<bool, chain::Error> {
Minimal Transaction Pool (#1067) * verify a tx like we verify a block (experimental) * first minimal_pool test up and running but not testing what we need to * rework tx_pool validation to use txhashset extension * minimal tx pool wired up but rough * works locally (rough statew though) delete "legacy" pool and graph code * rework the new pool into TransactionPool and Pool impls * rework pool to store pool entries with associated timer and source etc. * all_transactions * extra_txs so we can validate stempool against existing txpool * rework reconcile_block * txhashset apply_raw_tx can now rewind to a checkpoint (prev raw tx) * wip - txhashset tx tests * more flexible rewind on MMRs * add tests to cover apply_raw_txs on txhashset extension * add_to_stempool and add_to_txpool * deaggregate multi kernel tx when adding to txpoool * handle freshness in stempool handle propagation of stempool txs via dandelion monitor * patience timer and fluff if we cannot propagate to next relay * aggregate and fluff stempool is we have no relay * refactor coinbase maturity * rewrote basic tx pool tests to use a real txhashset via chain adapter * rework dandelion monitor to reflect recent discussion works locally but needs a cleanup * refactor dandelion_monitor - split out phases * more pool test coverage * remove old test code from pool (still wip) * block_building and block_reconciliation tests * tracked down chain test failure... * fix test_coinbase_maturity * dandelion_monitor now runs... * refactor dandelion config, shared across p2p and pool components * fix pool tests with new config * fix p2p tests * rework tx pool to deal with duplicate commitments (testnet2 limitation) * cleanup and address some PR feedback * add big comment about pre_tx...
2018-05-30 23:57:13 +03:00
// Do not track the tx hash for stem txs.
// Otherwise we fail to handle the subsequent fluff or embargo expiration
// correctly.
if !stem {
let kernel = &tx.kernels()[0];
self.push_recv(kernel.hash());
Minimal Transaction Pool (#1067) * verify a tx like we verify a block (experimental) * first minimal_pool test up and running but not testing what we need to * rework tx_pool validation to use txhashset extension * minimal tx pool wired up but rough * works locally (rough statew though) delete "legacy" pool and graph code * rework the new pool into TransactionPool and Pool impls * rework pool to store pool entries with associated timer and source etc. * all_transactions * extra_txs so we can validate stempool against existing txpool * rework reconcile_block * txhashset apply_raw_tx can now rewind to a checkpoint (prev raw tx) * wip - txhashset tx tests * more flexible rewind on MMRs * add tests to cover apply_raw_txs on txhashset extension * add_to_stempool and add_to_txpool * deaggregate multi kernel tx when adding to txpoool * handle freshness in stempool handle propagation of stempool txs via dandelion monitor * patience timer and fluff if we cannot propagate to next relay * aggregate and fluff stempool is we have no relay * refactor coinbase maturity * rewrote basic tx pool tests to use a real txhashset via chain adapter * rework dandelion monitor to reflect recent discussion works locally but needs a cleanup * refactor dandelion_monitor - split out phases * more pool test coverage * remove old test code from pool (still wip) * block_building and block_reconciliation tests * tracked down chain test failure... * fix test_coinbase_maturity * dandelion_monitor now runs... * refactor dandelion config, shared across p2p and pool components * fix pool tests with new config * fix p2p tests * rework tx pool to deal with duplicate commitments (testnet2 limitation) * cleanup and address some PR feedback * add big comment about pre_tx...
2018-05-30 23:57:13 +03:00
}
self.adapter.transaction_received(tx, stem)
}
fn block_received(
&self,
b: core::Block,
peer_info: &PeerInfo,
opts: chain::Options,
) -> Result<bool, chain::Error> {
let bh = b.hash();
self.push_recv(bh);
// If we are currently tracking a request for this block then
// use the opts specified when we made the request.
// If we requested this block as part of sync then we want to
// let our adapter know this when we receive it.
let req_opts = self.req_opts(bh).unwrap_or(opts);
self.adapter.block_received(b, peer_info, req_opts)
}
fn compact_block_received(
&self,
cb: core::CompactBlock,
peer_info: &PeerInfo,
) -> Result<bool, chain::Error> {
self.push_recv(cb.hash());
self.adapter.compact_block_received(cb, peer_info)
}
fn header_received(
&self,
bh: core::BlockHeader,
peer_info: &PeerInfo,
) -> Result<bool, chain::Error> {
self.push_recv(bh.hash());
self.adapter.header_received(bh, peer_info)
}
fn headers_received(
&self,
bh: &[core::BlockHeader],
peer_info: &PeerInfo,
) -> Result<bool, chain::Error> {
self.adapter.headers_received(bh, peer_info)
}
fn locate_headers(&self, locator: &[Hash]) -> Result<Vec<core::BlockHeader>, chain::Error> {
self.adapter.locate_headers(locator)
}
fn get_block(&self, h: Hash) -> Option<core::Block> {
self.adapter.get_block(h)
}
[WIP] Abridged sync (#440) * Util to zip and unzip directories * First pass at sumtree request/response. Add message types, implement the exchange in the protocol, zip up the sumtree directory and stream the file over, with necessary adapter hooks. * Implement the sumtree archive receive logicGets the sumtree archive data stream from the network and write it to a file. Unzip the file, place it at the right spot and reconstruct the sumtree data structure, rewinding where to the right spot. * Sumtree hash structure validation * Simplify sumtree backend buffering logic. The backend for a sumtree has to implement some in-memory buffering logic to provide a commit/rollback interface. The backend itself is an aggregate of 3 underlying storages (an append only file, a remove log and a skip list). The buffering was previously implemented both by the backend and some of the underlying storages. Now pushing back all buffering logic to the storages to keep the backend simpler. * Add kernel append only store file to sumtrees. The chain sumtrees structure now also saves all kernels to a dedicated file. As that storage is implemented by the append only file wrapper, it's also rewind-aware. * Full state validation. Checks that: - MMRs are sane (hash and sum each node) - Tree roots match the corresponding header - Kernel signatures are valid - Sum of all kernel excesses equals the sum of UTXO commitments minus the supply * Fast sync handoff to body sync. Once the fast-sync state is fully setup, get bacj in body sync mode to get the full bodies of the last blocks we're missing. * First fully working fast sync * Facility in p2p conn to deal with attachments (raw binary after message). * Re-introduced sumtree send and receive message handling using the above. * Fixed test and finished updating all required db state after sumtree validation. * Massaged a little bit the pipeline orphan check to still work after the new sumtrees have been setup. * Various cleanup. Consolidated fast sync and full sync into a single function as they're very similar. Proper conditions to trigger a sumtree request and some checks on receiving it.
2018-02-10 01:32:16 +03:00
fn kernel_data_read(&self) -> Result<File, chain::Error> {
self.adapter.kernel_data_read()
}
fn kernel_data_write(&self, reader: &mut dyn Read) -> Result<bool, chain::Error> {
self.adapter.kernel_data_write(reader)
}
fn txhashset_read(&self, h: Hash) -> Option<TxHashSetRead> {
self.adapter.txhashset_read(h)
[WIP] Abridged sync (#440) * Util to zip and unzip directories * First pass at sumtree request/response. Add message types, implement the exchange in the protocol, zip up the sumtree directory and stream the file over, with necessary adapter hooks. * Implement the sumtree archive receive logicGets the sumtree archive data stream from the network and write it to a file. Unzip the file, place it at the right spot and reconstruct the sumtree data structure, rewinding where to the right spot. * Sumtree hash structure validation * Simplify sumtree backend buffering logic. The backend for a sumtree has to implement some in-memory buffering logic to provide a commit/rollback interface. The backend itself is an aggregate of 3 underlying storages (an append only file, a remove log and a skip list). The buffering was previously implemented both by the backend and some of the underlying storages. Now pushing back all buffering logic to the storages to keep the backend simpler. * Add kernel append only store file to sumtrees. The chain sumtrees structure now also saves all kernels to a dedicated file. As that storage is implemented by the append only file wrapper, it's also rewind-aware. * Full state validation. Checks that: - MMRs are sane (hash and sum each node) - Tree roots match the corresponding header - Kernel signatures are valid - Sum of all kernel excesses equals the sum of UTXO commitments minus the supply * Fast sync handoff to body sync. Once the fast-sync state is fully setup, get bacj in body sync mode to get the full bodies of the last blocks we're missing. * First fully working fast sync * Facility in p2p conn to deal with attachments (raw binary after message). * Re-introduced sumtree send and receive message handling using the above. * Fixed test and finished updating all required db state after sumtree validation. * Massaged a little bit the pipeline orphan check to still work after the new sumtrees have been setup. * Various cleanup. Consolidated fast sync and full sync into a single function as they're very similar. Proper conditions to trigger a sumtree request and some checks on receiving it.
2018-02-10 01:32:16 +03:00
}
fn txhashset_archive_header(&self) -> Result<core::BlockHeader, chain::Error> {
self.adapter.txhashset_archive_header()
}
fn txhashset_receive_ready(&self) -> bool {
self.adapter.txhashset_receive_ready()
}
fn txhashset_write(
&self,
h: Hash,
txhashset_data: File,
peer_info: &PeerInfo,
) -> Result<bool, chain::Error> {
self.adapter.txhashset_write(h, txhashset_data, peer_info)
[WIP] Abridged sync (#440) * Util to zip and unzip directories * First pass at sumtree request/response. Add message types, implement the exchange in the protocol, zip up the sumtree directory and stream the file over, with necessary adapter hooks. * Implement the sumtree archive receive logicGets the sumtree archive data stream from the network and write it to a file. Unzip the file, place it at the right spot and reconstruct the sumtree data structure, rewinding where to the right spot. * Sumtree hash structure validation * Simplify sumtree backend buffering logic. The backend for a sumtree has to implement some in-memory buffering logic to provide a commit/rollback interface. The backend itself is an aggregate of 3 underlying storages (an append only file, a remove log and a skip list). The buffering was previously implemented both by the backend and some of the underlying storages. Now pushing back all buffering logic to the storages to keep the backend simpler. * Add kernel append only store file to sumtrees. The chain sumtrees structure now also saves all kernels to a dedicated file. As that storage is implemented by the append only file wrapper, it's also rewind-aware. * Full state validation. Checks that: - MMRs are sane (hash and sum each node) - Tree roots match the corresponding header - Kernel signatures are valid - Sum of all kernel excesses equals the sum of UTXO commitments minus the supply * Fast sync handoff to body sync. Once the fast-sync state is fully setup, get bacj in body sync mode to get the full bodies of the last blocks we're missing. * First fully working fast sync * Facility in p2p conn to deal with attachments (raw binary after message). * Re-introduced sumtree send and receive message handling using the above. * Fixed test and finished updating all required db state after sumtree validation. * Massaged a little bit the pipeline orphan check to still work after the new sumtrees have been setup. * Various cleanup. Consolidated fast sync and full sync into a single function as they're very similar. Proper conditions to trigger a sumtree request and some checks on receiving it.
2018-02-10 01:32:16 +03:00
}
fn txhashset_download_update(
&self,
start_time: DateTime<Utc>,
downloaded_size: u64,
total_size: u64,
) -> bool {
self.adapter
.txhashset_download_update(start_time, downloaded_size, total_size)
}
fn get_tmp_dir(&self) -> PathBuf {
self.adapter.get_tmp_dir()
}
fn get_tmpfile_pathname(&self, tmpfile_name: String) -> PathBuf {
self.adapter.get_tmpfile_pathname(tmpfile_name)
}
}
impl NetAdapter for TrackingAdapter {
fn find_peer_addrs(&self, capab: Capabilities) -> Vec<PeerAddr> {
self.adapter.find_peer_addrs(capab)
}
fn peer_addrs_received(&self, addrs: Vec<PeerAddr>) {
self.adapter.peer_addrs_received(addrs)
}
fn peer_difficulty(&self, addr: PeerAddr, diff: Difficulty, height: u64) {
self.adapter.peer_difficulty(addr, diff, height)
}
fn is_banned(&self, addr: PeerAddr) -> bool {
self.adapter.is_banned(addr)
}
}