[DNM] broadcast tx kernel hash (if supported by peer) (#1929)

broadcast tx kernel hash (if supported by peer)
This commit is contained in:
Antioch Peverell 2018-11-07 09:28:17 +00:00 committed by GitHub
parent 8b546632fe
commit d97a6c2189
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 169 additions and 19 deletions

View file

@ -212,7 +212,7 @@ fn comments() -> HashMap<String, String> {
#until we get to at least this number
#peer_min_preferred_count = 8
# 7 = Bit flags for FULL_NODE
# 15 = Bit flags for FULL_NODE
#This structure needs to be changed internally, to make it more configurable
".to_string(),
);

View file

@ -68,8 +68,8 @@ enum_from_primitive! {
TxHashSetRequest = 16,
TxHashSetArchive = 17,
BanReason = 18,
// GetTransaction = 19,
// CompactTransaction = 20,
GetTransaction = 19,
TransactionKernel = 20,
}
}
@ -95,6 +95,8 @@ fn max_msg_size(msg_type: Type) -> u64 {
Type::TxHashSetRequest => 40,
Type::TxHashSetArchive => 64,
Type::BanReason => 64,
Type::GetTransaction => 32,
Type::TransactionKernel => 32,
}
}
@ -444,9 +446,7 @@ impl Readable for GetPeerAddrs {
fn read(reader: &mut Reader) -> Result<GetPeerAddrs, ser::Error> {
let capab = reader.read_u32()?;
let capabilities = Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData)?;
Ok(GetPeerAddrs {
capabilities: capabilities,
})
Ok(GetPeerAddrs { capabilities })
}
}

View file

@ -279,11 +279,40 @@ impl Peer {
}
}
pub fn send_tx_kernel_hash(&self, h: Hash) -> Result<bool, Error> {
if !self.tracking_adapter.has(h) {
debug!("Send tx kernel hash {} to {}", h, self.info.addr);
self.connection
.as_ref()
.unwrap()
.send(h, msg::Type::TransactionKernel)?;
Ok(true)
} else {
debug!(
"Not sending tx kernel hash {} to {} (already seen)",
h, self.info.addr
);
Ok(false)
}
}
/// Sends the provided transaction to the remote peer. The request may be
/// dropped if the remote peer is known to already have the transaction.
/// We support broadcast of lightweight tx kernel hash
/// so track known txs by kernel hash.
pub fn send_transaction(&self, tx: &core::Transaction) -> Result<bool, Error> {
if !self.tracking_adapter.has(tx.hash()) {
debug!("Send tx {} to {}", tx.hash(), self.info.addr);
let kernel = &tx.kernels()[0];
if self
.info
.capabilities
.contains(Capabilities::TX_KERNEL_HASH)
{
return self.send_tx_kernel_hash(kernel.hash());
}
if !self.tracking_adapter.has(kernel.hash()) {
debug!("Send full tx {} to {}", tx.hash(), self.info.addr);
self.connection
.as_ref()
.unwrap()
@ -322,6 +351,17 @@ impl Peer {
.send(&Locator { hashes: locator }, msg::Type::GetHeaders)
}
pub fn send_tx_request(&self, h: Hash) -> Result<(), Error> {
debug!(
"Requesting tx (kernel hash) {} from peer {}.",
h, self.info.addr
);
self.connection
.as_ref()
.unwrap()
.send(&h, msg::Type::GetTransaction)
}
/// Sends a request for a specific block by hash
pub fn send_block_request(&self, h: Hash) -> Result<(), Error> {
debug!("Requesting block {} from peer {}.", h, self.info.addr);
@ -459,12 +499,22 @@ impl ChainAdapter for TrackingAdapter {
self.adapter.total_height()
}
fn get_transaction(&self, kernel_hash: Hash) -> Option<core::Transaction> {
self.adapter.get_transaction(kernel_hash)
}
fn tx_kernel_received(&self, kernel_hash: Hash, addr: SocketAddr) {
self.push(kernel_hash);
self.adapter.tx_kernel_received(kernel_hash, addr)
}
fn transaction_received(&self, tx: core::Transaction, stem: bool) {
// Do not track the tx hash for stem txs.
// Otherwise we fail to handle the subsequent fluff or embargo expiration
// correctly.
if !stem {
self.push(tx.hash());
let kernel = &tx.kernels()[0];
self.push(kernel.hash());
}
self.adapter.transaction_received(tx, stem)
}

View file

@ -295,8 +295,8 @@ impl Peers {
);
}
/// Broadcasts the provided stem transaction to our peer relay.
pub fn broadcast_stem_transaction(&self, tx: &core::Transaction) -> Result<(), Error> {
/// Relays the provided stem transaction to our single stem peer.
pub fn relay_stem_transaction(&self, tx: &core::Transaction) -> Result<(), Error> {
let dandelion_relay = self.get_dandelion_relay();
if dandelion_relay.is_empty() {
debug!("No dandelion relay, updating.");
@ -323,10 +323,10 @@ impl Peers {
/// A peer implementation may drop the broadcast request
/// if it knows the remote peer already has the transaction.
pub fn broadcast_transaction(&self, tx: &core::Transaction) {
let num_peers = self.config.peer_min_preferred_count();
let num_peers = self.config.peer_max_count();
let count = self.broadcast("transaction", num_peers, |p| p.send_transaction(tx));
trace!(
"broadcast_transaction: {}, to {} peers, done.",
debug!(
"broadcast_transaction: {} to {} peers, done.",
tx.hash(),
count,
);
@ -472,6 +472,14 @@ impl ChainAdapter for Peers {
self.adapter.total_height()
}
fn get_transaction(&self, kernel_hash: Hash) -> Option<core::Transaction> {
self.adapter.get_transaction(kernel_hash)
}
fn tx_kernel_received(&self, kernel_hash: Hash, addr: SocketAddr) {
self.adapter.tx_kernel_received(kernel_hash, addr)
}
fn transaction_received(&self, tx: core::Transaction, stem: bool) {
self.adapter.transaction_received(tx, stem)
}

View file

@ -83,6 +83,30 @@ impl MessageHandler for Protocol {
Ok(None)
}
Type::TransactionKernel => {
let h: Hash = msg.body()?;
debug!(
"handle_payload: received tx kernel: {}, msg_len: {}",
h, msg.header.msg_len
);
adapter.tx_kernel_received(h, self.addr);
Ok(None)
}
Type::GetTransaction => {
let h: Hash = msg.body()?;
debug!(
"handle_payload: GetTransaction: {}, msg_len: {}",
h, msg.header.msg_len,
);
let tx = adapter.get_transaction(h);
if let Some(tx) = tx {
Ok(Some(msg.respond(Type::Transaction, tx)))
} else {
Ok(None)
}
}
Type::Transaction => {
debug!(
"handle_payload: received tx: msg_len: {}",
@ -106,7 +130,7 @@ impl MessageHandler for Protocol {
Type::GetBlock => {
let h: Hash = msg.body()?;
trace!(
"handle_payload: Getblock: {}, msg_len: {}",
"handle_payload: GetBlock: {}, msg_len: {}",
h,
msg.header.msg_len,
);

View file

@ -197,6 +197,10 @@ impl ChainAdapter for DummyAdapter {
fn total_height(&self) -> u64 {
0
}
fn get_transaction(&self, _h: Hash) -> Option<core::Transaction> {
None
}
fn tx_kernel_received(&self, _h: Hash, _addr: SocketAddr) {}
fn transaction_received(&self, _: core::Transaction, _stem: bool) {}
fn compact_block_received(&self, _cb: core::CompactBlock, _addr: SocketAddr) -> bool {
true

View file

@ -206,13 +206,16 @@ bitflags! {
const TXHASHSET_HIST = 0b00000010;
/// Can provide a list of healthy peers
const PEER_LIST = 0b00000100;
/// Can broadcast and request txs by kernel hash.
const TX_KERNEL_HASH = 0b00001000;
/// All nodes right now are "full nodes".
/// Some nodes internally may maintain longer block histories (archival_mode)
/// but we do not advertise this to other nodes.
const FULL_NODE = Capabilities::HEADER_HIST.bits
| Capabilities::TXHASHSET_HIST.bits
| Capabilities::PEER_LIST.bits;
| Capabilities::PEER_LIST.bits
| Capabilities::TX_KERNEL_HASH.bits;
}
}
@ -337,6 +340,10 @@ pub trait ChainAdapter: Sync + Send {
/// A valid transaction has been received from one of our peers
fn transaction_received(&self, tx: core::Transaction, stem: bool);
fn get_transaction(&self, kernel_hash: Hash) -> Option<core::Transaction>;
fn tx_kernel_received(&self, kernel_hash: Hash, addr: SocketAddr);
/// A block has been received from one of our peers. Returns true if the
/// block could be handled properly and is not deemed defective by the
/// chain. Returning false means the block will never be valid and

View file

@ -66,6 +66,18 @@ impl Pool {
.map(|x| x.tx.clone())
}
/// Query the tx pool for an individual tx matching the given kernel hash.
pub fn retrieve_tx_by_kernel_hash(&self, hash: Hash) -> Option<Transaction> {
for x in &self.entries {
for k in x.tx.kernels() {
if k.hash() == hash {
return Some(x.tx.clone());
}
}
}
None
}
/// Query the tx pool for all known txs based on kernel short_ids
/// from the provided compact_block.
/// Note: does not validate that we return the full set of required txs.

View file

@ -201,6 +201,11 @@ impl TransactionPool {
Ok(())
}
/// Retrieve individual transaction for the given kernel hash.
pub fn retrieve_tx_by_kernel_hash(&self, hash: Hash) -> Option<Transaction> {
self.txpool.retrieve_tx_by_kernel_hash(hash)
}
/// Retrieve all transactions matching the provided "compact block"
/// based on the kernel set.
/// Note: we only look in the txpool for this (stempool is under embargo).

View file

@ -58,6 +58,23 @@ impl p2p::ChainAdapter for NetToChainAdapter {
self.chain().head().unwrap().height
}
fn get_transaction(&self, kernel_hash: Hash) -> Option<core::Transaction> {
self.tx_pool.read().retrieve_tx_by_kernel_hash(kernel_hash)
}
fn tx_kernel_received(&self, kernel_hash: Hash, addr: SocketAddr) {
// nothing much we can do with a new transaction while syncing
if self.sync_state.is_syncing() {
return;
}
let tx = self.tx_pool.read().retrieve_tx_by_kernel_hash(kernel_hash);
if tx.is_none() {
self.request_transaction(kernel_hash, &addr);
}
}
fn transaction_received(&self, tx: core::Transaction, stem: bool) {
// nothing much we can do with a new transaction while syncing
if self.sync_state.is_syncing() {
@ -136,8 +153,9 @@ impl p2p::ChainAdapter for NetToChainAdapter {
}
let (txs, missing_short_ids) = {
let tx_pool = self.tx_pool.read();
tx_pool.retrieve_transactions(cb.hash(), cb.nonce, cb.kern_ids())
self.tx_pool
.read()
.retrieve_transactions(cb.hash(), cb.nonce, cb.kern_ids())
};
debug!(
@ -539,6 +557,10 @@ impl NetToChainAdapter {
}
}
fn request_transaction(&self, h: Hash, addr: &SocketAddr) {
self.send_tx_request_to_peer(h, addr, |peer, h| peer.send_tx_request(h))
}
// After receiving a compact block if we cannot successfully hydrate
// it into a full block then fallback to requesting the full block
// from the same peer that gave us the compact block
@ -560,6 +582,23 @@ impl NetToChainAdapter {
})
}
fn send_tx_request_to_peer<F>(&self, h: Hash, addr: &SocketAddr, f: F)
where
F: Fn(&p2p::Peer, Hash) -> Result<(), p2p::Error>,
{
match self.peers().get_connected_peer(addr) {
None => debug!(
"send_tx_request_to_peer: can't send request to peer {:?}, not connected",
addr
),
Some(peer) => {
if let Err(e) = f(&peer, h) {
error!("send_tx_request_to_peer: failed: {:?}", e)
}
}
}
}
fn send_block_request_to_peer<F>(&self, h: Hash, addr: &SocketAddr, f: F)
where
F: Fn(&p2p::Peer, Hash) -> Result<(), p2p::Error>,
@ -670,10 +709,11 @@ pub struct PoolToNetAdapter {
impl pool::PoolAdapter for PoolToNetAdapter {
fn stem_tx_accepted(&self, tx: &core::Transaction) -> Result<(), pool::PoolError> {
self.peers()
.broadcast_stem_transaction(tx)
.relay_stem_transaction(tx)
.map_err(|_| pool::PoolError::DandelionError)?;
Ok(())
}
fn tx_accepted(&self, tx: &core::Transaction) {
self.peers().broadcast_transaction(tx);
}