diff --git a/config/src/comments.rs b/config/src/comments.rs index f25d5ffa6..ada34f345 100644 --- a/config/src/comments.rs +++ b/config/src/comments.rs @@ -212,7 +212,7 @@ fn comments() -> HashMap { #until we get to at least this number #peer_min_preferred_count = 8 -# 7 = Bit flags for FULL_NODE +# 15 = Bit flags for FULL_NODE #This structure needs to be changed internally, to make it more configurable ".to_string(), ); diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index 2041cede9..a4d0bfcab 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -68,8 +68,8 @@ enum_from_primitive! { TxHashSetRequest = 16, TxHashSetArchive = 17, BanReason = 18, - // GetTransaction = 19, - // CompactTransaction = 20, + GetTransaction = 19, + TransactionKernel = 20, } } @@ -95,6 +95,8 @@ fn max_msg_size(msg_type: Type) -> u64 { Type::TxHashSetRequest => 40, Type::TxHashSetArchive => 64, Type::BanReason => 64, + Type::GetTransaction => 32, + Type::TransactionKernel => 32, } } @@ -444,9 +446,7 @@ impl Readable for GetPeerAddrs { fn read(reader: &mut Reader) -> Result { let capab = reader.read_u32()?; let capabilities = Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData)?; - Ok(GetPeerAddrs { - capabilities: capabilities, - }) + Ok(GetPeerAddrs { capabilities }) } } diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 2aaae3a72..14ec17518 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -279,11 +279,40 @@ impl Peer { } } + pub fn send_tx_kernel_hash(&self, h: Hash) -> Result { + if !self.tracking_adapter.has(h) { + debug!("Send tx kernel hash {} to {}", h, self.info.addr); + self.connection + .as_ref() + .unwrap() + .send(h, msg::Type::TransactionKernel)?; + Ok(true) + } else { + debug!( + "Not sending tx kernel hash {} to {} (already seen)", + h, self.info.addr + ); + Ok(false) + } + } + /// Sends the provided transaction to the remote peer. The request may be /// dropped if the remote peer is known to already have the transaction. + /// We support broadcast of lightweight tx kernel hash + /// so track known txs by kernel hash. pub fn send_transaction(&self, tx: &core::Transaction) -> Result { - if !self.tracking_adapter.has(tx.hash()) { - debug!("Send tx {} to {}", tx.hash(), self.info.addr); + let kernel = &tx.kernels()[0]; + + if self + .info + .capabilities + .contains(Capabilities::TX_KERNEL_HASH) + { + return self.send_tx_kernel_hash(kernel.hash()); + } + + if !self.tracking_adapter.has(kernel.hash()) { + debug!("Send full tx {} to {}", tx.hash(), self.info.addr); self.connection .as_ref() .unwrap() @@ -322,6 +351,17 @@ impl Peer { .send(&Locator { hashes: locator }, msg::Type::GetHeaders) } + pub fn send_tx_request(&self, h: Hash) -> Result<(), Error> { + debug!( + "Requesting tx (kernel hash) {} from peer {}.", + h, self.info.addr + ); + self.connection + .as_ref() + .unwrap() + .send(&h, msg::Type::GetTransaction) + } + /// Sends a request for a specific block by hash pub fn send_block_request(&self, h: Hash) -> Result<(), Error> { debug!("Requesting block {} from peer {}.", h, self.info.addr); @@ -459,12 +499,22 @@ impl ChainAdapter for TrackingAdapter { self.adapter.total_height() } + fn get_transaction(&self, kernel_hash: Hash) -> Option { + self.adapter.get_transaction(kernel_hash) + } + + fn tx_kernel_received(&self, kernel_hash: Hash, addr: SocketAddr) { + self.push(kernel_hash); + self.adapter.tx_kernel_received(kernel_hash, addr) + } + fn transaction_received(&self, tx: core::Transaction, stem: bool) { // Do not track the tx hash for stem txs. // Otherwise we fail to handle the subsequent fluff or embargo expiration // correctly. if !stem { - self.push(tx.hash()); + let kernel = &tx.kernels()[0]; + self.push(kernel.hash()); } self.adapter.transaction_received(tx, stem) } diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index 49c35dd3a..c90ad7660 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -295,8 +295,8 @@ impl Peers { ); } - /// Broadcasts the provided stem transaction to our peer relay. - pub fn broadcast_stem_transaction(&self, tx: &core::Transaction) -> Result<(), Error> { + /// Relays the provided stem transaction to our single stem peer. + pub fn relay_stem_transaction(&self, tx: &core::Transaction) -> Result<(), Error> { let dandelion_relay = self.get_dandelion_relay(); if dandelion_relay.is_empty() { debug!("No dandelion relay, updating."); @@ -323,10 +323,10 @@ impl Peers { /// A peer implementation may drop the broadcast request /// if it knows the remote peer already has the transaction. pub fn broadcast_transaction(&self, tx: &core::Transaction) { - let num_peers = self.config.peer_min_preferred_count(); + let num_peers = self.config.peer_max_count(); let count = self.broadcast("transaction", num_peers, |p| p.send_transaction(tx)); - trace!( - "broadcast_transaction: {}, to {} peers, done.", + debug!( + "broadcast_transaction: {} to {} peers, done.", tx.hash(), count, ); @@ -472,6 +472,14 @@ impl ChainAdapter for Peers { self.adapter.total_height() } + fn get_transaction(&self, kernel_hash: Hash) -> Option { + self.adapter.get_transaction(kernel_hash) + } + + fn tx_kernel_received(&self, kernel_hash: Hash, addr: SocketAddr) { + self.adapter.tx_kernel_received(kernel_hash, addr) + } + fn transaction_received(&self, tx: core::Transaction, stem: bool) { self.adapter.transaction_received(tx, stem) } diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index 7434e9847..8d3095ce7 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -83,6 +83,30 @@ impl MessageHandler for Protocol { Ok(None) } + Type::TransactionKernel => { + let h: Hash = msg.body()?; + debug!( + "handle_payload: received tx kernel: {}, msg_len: {}", + h, msg.header.msg_len + ); + adapter.tx_kernel_received(h, self.addr); + Ok(None) + } + + Type::GetTransaction => { + let h: Hash = msg.body()?; + debug!( + "handle_payload: GetTransaction: {}, msg_len: {}", + h, msg.header.msg_len, + ); + let tx = adapter.get_transaction(h); + if let Some(tx) = tx { + Ok(Some(msg.respond(Type::Transaction, tx))) + } else { + Ok(None) + } + } + Type::Transaction => { debug!( "handle_payload: received tx: msg_len: {}", @@ -106,7 +130,7 @@ impl MessageHandler for Protocol { Type::GetBlock => { let h: Hash = msg.body()?; trace!( - "handle_payload: Getblock: {}, msg_len: {}", + "handle_payload: GetBlock: {}, msg_len: {}", h, msg.header.msg_len, ); diff --git a/p2p/src/serv.rs b/p2p/src/serv.rs index 4993da01b..1b5115844 100644 --- a/p2p/src/serv.rs +++ b/p2p/src/serv.rs @@ -197,6 +197,10 @@ impl ChainAdapter for DummyAdapter { fn total_height(&self) -> u64 { 0 } + fn get_transaction(&self, _h: Hash) -> Option { + None + } + fn tx_kernel_received(&self, _h: Hash, _addr: SocketAddr) {} fn transaction_received(&self, _: core::Transaction, _stem: bool) {} fn compact_block_received(&self, _cb: core::CompactBlock, _addr: SocketAddr) -> bool { true diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 34c8f8f91..e492af2e0 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -206,13 +206,16 @@ bitflags! { const TXHASHSET_HIST = 0b00000010; /// Can provide a list of healthy peers const PEER_LIST = 0b00000100; + /// Can broadcast and request txs by kernel hash. + const TX_KERNEL_HASH = 0b00001000; /// All nodes right now are "full nodes". /// Some nodes internally may maintain longer block histories (archival_mode) /// but we do not advertise this to other nodes. const FULL_NODE = Capabilities::HEADER_HIST.bits | Capabilities::TXHASHSET_HIST.bits - | Capabilities::PEER_LIST.bits; + | Capabilities::PEER_LIST.bits + | Capabilities::TX_KERNEL_HASH.bits; } } @@ -337,6 +340,10 @@ pub trait ChainAdapter: Sync + Send { /// A valid transaction has been received from one of our peers fn transaction_received(&self, tx: core::Transaction, stem: bool); + fn get_transaction(&self, kernel_hash: Hash) -> Option; + + fn tx_kernel_received(&self, kernel_hash: Hash, addr: SocketAddr); + /// A block has been received from one of our peers. Returns true if the /// block could be handled properly and is not deemed defective by the /// chain. Returning false means the block will never be valid and diff --git a/pool/src/pool.rs b/pool/src/pool.rs index 9bc473ac4..a62ec0dfb 100644 --- a/pool/src/pool.rs +++ b/pool/src/pool.rs @@ -66,6 +66,18 @@ impl Pool { .map(|x| x.tx.clone()) } + /// Query the tx pool for an individual tx matching the given kernel hash. + pub fn retrieve_tx_by_kernel_hash(&self, hash: Hash) -> Option { + for x in &self.entries { + for k in x.tx.kernels() { + if k.hash() == hash { + return Some(x.tx.clone()); + } + } + } + None + } + /// Query the tx pool for all known txs based on kernel short_ids /// from the provided compact_block. /// Note: does not validate that we return the full set of required txs. diff --git a/pool/src/transaction_pool.rs b/pool/src/transaction_pool.rs index 1be724b3f..4da3aeb7d 100644 --- a/pool/src/transaction_pool.rs +++ b/pool/src/transaction_pool.rs @@ -201,6 +201,11 @@ impl TransactionPool { Ok(()) } + /// Retrieve individual transaction for the given kernel hash. + pub fn retrieve_tx_by_kernel_hash(&self, hash: Hash) -> Option { + self.txpool.retrieve_tx_by_kernel_hash(hash) + } + /// Retrieve all transactions matching the provided "compact block" /// based on the kernel set. /// Note: we only look in the txpool for this (stempool is under embargo). diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index ce860fbe0..bbff07deb 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -58,6 +58,23 @@ impl p2p::ChainAdapter for NetToChainAdapter { self.chain().head().unwrap().height } + fn get_transaction(&self, kernel_hash: Hash) -> Option { + self.tx_pool.read().retrieve_tx_by_kernel_hash(kernel_hash) + } + + fn tx_kernel_received(&self, kernel_hash: Hash, addr: SocketAddr) { + // nothing much we can do with a new transaction while syncing + if self.sync_state.is_syncing() { + return; + } + + let tx = self.tx_pool.read().retrieve_tx_by_kernel_hash(kernel_hash); + + if tx.is_none() { + self.request_transaction(kernel_hash, &addr); + } + } + fn transaction_received(&self, tx: core::Transaction, stem: bool) { // nothing much we can do with a new transaction while syncing if self.sync_state.is_syncing() { @@ -136,8 +153,9 @@ impl p2p::ChainAdapter for NetToChainAdapter { } let (txs, missing_short_ids) = { - let tx_pool = self.tx_pool.read(); - tx_pool.retrieve_transactions(cb.hash(), cb.nonce, cb.kern_ids()) + self.tx_pool + .read() + .retrieve_transactions(cb.hash(), cb.nonce, cb.kern_ids()) }; debug!( @@ -539,6 +557,10 @@ impl NetToChainAdapter { } } + fn request_transaction(&self, h: Hash, addr: &SocketAddr) { + self.send_tx_request_to_peer(h, addr, |peer, h| peer.send_tx_request(h)) + } + // After receiving a compact block if we cannot successfully hydrate // it into a full block then fallback to requesting the full block // from the same peer that gave us the compact block @@ -560,6 +582,23 @@ impl NetToChainAdapter { }) } + fn send_tx_request_to_peer(&self, h: Hash, addr: &SocketAddr, f: F) + where + F: Fn(&p2p::Peer, Hash) -> Result<(), p2p::Error>, + { + match self.peers().get_connected_peer(addr) { + None => debug!( + "send_tx_request_to_peer: can't send request to peer {:?}, not connected", + addr + ), + Some(peer) => { + if let Err(e) = f(&peer, h) { + error!("send_tx_request_to_peer: failed: {:?}", e) + } + } + } + } + fn send_block_request_to_peer(&self, h: Hash, addr: &SocketAddr, f: F) where F: Fn(&p2p::Peer, Hash) -> Result<(), p2p::Error>, @@ -670,10 +709,11 @@ pub struct PoolToNetAdapter { impl pool::PoolAdapter for PoolToNetAdapter { fn stem_tx_accepted(&self, tx: &core::Transaction) -> Result<(), pool::PoolError> { self.peers() - .broadcast_stem_transaction(tx) + .relay_stem_transaction(tx) .map_err(|_| pool::PoolError::DandelionError)?; Ok(()) } + fn tx_accepted(&self, tx: &core::Transaction) { self.peers().broadcast_transaction(tx); }