diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 4cfd7867e..eecae9b25 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -261,7 +261,7 @@ impl Peer { /// Sends the provided block to the remote peer. The request may be dropped /// if the remote peer is known to already have the block. pub fn send_block(&self, b: &core::Block) -> Result { - if !self.tracking_adapter.has(b.hash()) { + if !self.tracking_adapter.has_recv(b.hash()) { trace!("Send block {} to {}", b.hash(), self.info.addr); self.connection .as_ref() @@ -280,7 +280,7 @@ impl Peer { } pub fn send_compact_block(&self, b: &core::CompactBlock) -> Result { - if !self.tracking_adapter.has(b.hash()) { + if !self.tracking_adapter.has_recv(b.hash()) { trace!("Send compact block {} to {}", b.hash(), self.info.addr); self.connection .as_ref() @@ -299,7 +299,7 @@ impl Peer { } pub fn send_header(&self, bh: &core::BlockHeader) -> Result { - if !self.tracking_adapter.has(bh.hash()) { + if !self.tracking_adapter.has_recv(bh.hash()) { debug!("Send header {} to {}", bh.hash(), self.info.addr); self.connection .as_ref() @@ -318,7 +318,7 @@ impl Peer { } pub fn send_tx_kernel_hash(&self, h: Hash) -> Result { - if !self.tracking_adapter.has(h) { + if !self.tracking_adapter.has_recv(h) { debug!("Send tx kernel hash {} to {}", h, self.info.addr); self.connection .as_ref() @@ -350,7 +350,7 @@ impl Peer { return self.send_tx_kernel_hash(kernel.hash()); } - if !self.tracking_adapter.has(kernel.hash()) { + if !self.tracking_adapter.has_recv(kernel.hash()) { debug!("Send full tx {} to {}", tx.hash(), self.info.addr); self.connection .as_ref() @@ -405,6 +405,7 @@ impl Peer { /// Sends a request for a specific block by hash pub fn send_block_request(&self, h: Hash) -> Result<(), Error> { debug!("Requesting block {} from peer {}.", h, self.info.addr); + self.tracking_adapter.push_req(h); self.connection .as_ref() .unwrap() @@ -499,29 +500,32 @@ fn stop_with_connection(connection: &conn::Tracker) { } /// Adapter implementation that forwards everything to an underlying adapter -/// but keeps track of the block and transaction hashes that were received. +/// but keeps track of the block and transaction hashes that were requested or +/// received. #[derive(Clone)] struct TrackingAdapter { adapter: Arc, known: Arc>>, + requested: Arc>>, } impl TrackingAdapter { fn new(adapter: Arc) -> TrackingAdapter { TrackingAdapter { adapter: adapter, - known: Arc::new(RwLock::new(vec![])), + known: Arc::new(RwLock::new(Vec::with_capacity(MAX_TRACK_SIZE))), + requested: Arc::new(RwLock::new(Vec::with_capacity(MAX_TRACK_SIZE))), } } - fn has(&self, hash: Hash) -> bool { + fn has_recv(&self, hash: Hash) -> bool { let known = self.known.read(); // may become too slow, an ordered set (by timestamp for eviction) may // end up being a better choice known.contains(&hash) } - fn push(&self, hash: Hash) { + fn push_recv(&self, hash: Hash) { let mut known = self.known.write(); if known.len() > MAX_TRACK_SIZE { known.truncate(MAX_TRACK_SIZE); @@ -530,6 +534,23 @@ impl TrackingAdapter { known.insert(0, hash); } } + + fn has_req(&self, hash: Hash) -> bool { + let requested = self.requested.read(); + // may become too slow, an ordered set (by timestamp for eviction) may + // end up being a better choice + requested.contains(&hash) + } + + fn push_req(&self, hash: Hash) { + let mut requested = self.requested.write(); + if requested.len() > MAX_TRACK_SIZE { + requested.truncate(MAX_TRACK_SIZE); + } + if !requested.contains(&hash) { + requested.insert(0, hash); + } + } } impl ChainAdapter for TrackingAdapter { @@ -546,7 +567,7 @@ impl ChainAdapter for TrackingAdapter { } fn tx_kernel_received(&self, kernel_hash: Hash, addr: SocketAddr) { - self.push(kernel_hash); + self.push_recv(kernel_hash); self.adapter.tx_kernel_received(kernel_hash, addr) } @@ -556,23 +577,24 @@ impl ChainAdapter for TrackingAdapter { // correctly. if !stem { let kernel = &tx.kernels()[0]; - self.push(kernel.hash()); + self.push_recv(kernel.hash()); } self.adapter.transaction_received(tx, stem) } - fn block_received(&self, b: core::Block, addr: SocketAddr) -> bool { - self.push(b.hash()); - self.adapter.block_received(b, addr) + fn block_received(&self, b: core::Block, addr: SocketAddr, _was_requested: bool) -> bool { + let bh = b.hash(); + self.push_recv(bh); + self.adapter.block_received(b, addr, self.has_req(bh)) } fn compact_block_received(&self, cb: core::CompactBlock, addr: SocketAddr) -> bool { - self.push(cb.hash()); + self.push_recv(cb.hash()); self.adapter.compact_block_received(cb, addr) } fn header_received(&self, bh: core::BlockHeader, addr: SocketAddr) -> bool { - self.push(bh.hash()); + self.push_recv(bh.hash()); self.adapter.header_received(bh, addr) } diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index a67e8ffd6..0a094a4ef 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -554,9 +554,9 @@ impl ChainAdapter for Peers { self.adapter.transaction_received(tx, stem) } - fn block_received(&self, b: core::Block, peer_addr: SocketAddr) -> bool { + fn block_received(&self, b: core::Block, peer_addr: SocketAddr, was_requested: bool) -> bool { let hash = b.hash(); - if !self.adapter.block_received(b, peer_addr) { + if !self.adapter.block_received(b, peer_addr, was_requested) { // if the peer sent us a block that's intrinsically bad // they are either mistaken or malevolent, both of which require a ban debug!( diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index d931b6739..b4dd4b5f9 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -154,7 +154,9 @@ impl MessageHandler for Protocol { ); let b: core::Block = msg.body()?; - adapter.block_received(b, self.addr); + // we can't know at this level whether we requested the block or not, + // the boolean should be properly set in higher level adapter + adapter.block_received(b, self.addr, false); Ok(None) } diff --git a/p2p/src/serv.rs b/p2p/src/serv.rs index af7173149..45fc38354 100644 --- a/p2p/src/serv.rs +++ b/p2p/src/serv.rs @@ -242,7 +242,7 @@ impl ChainAdapter for DummyAdapter { fn header_received(&self, _bh: core::BlockHeader, _addr: SocketAddr) -> bool { true } - fn block_received(&self, _: core::Block, _: SocketAddr) -> bool { + fn block_received(&self, _: core::Block, _: SocketAddr, _: bool) -> bool { true } fn headers_received(&self, _: &[core::BlockHeader], _: SocketAddr) -> bool { diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 09bb3d73b..91361682d 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -355,7 +355,7 @@ pub trait ChainAdapter: Sync + Send { /// block could be handled properly and is not deemed defective by the /// chain. Returning false means the block will never be valid and /// may result in the peer being banned. - fn block_received(&self, b: core::Block, addr: SocketAddr) -> bool; + fn block_received(&self, b: core::Block, addr: SocketAddr, was_requested: bool) -> bool; fn compact_block_received(&self, cb: core::CompactBlock, addr: SocketAddr) -> bool; diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index 3fe0a6bd8..b54cd2d35 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -107,7 +107,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { } } - fn block_received(&self, b: core::Block, addr: SocketAddr) -> bool { + fn block_received(&self, b: core::Block, addr: SocketAddr, was_requested: bool) -> bool { debug!( "Received block {} at {} from {} [in/out/kern: {}/{}/{}] going to process.", b.hash(), @@ -117,7 +117,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { b.outputs().len(), b.kernels().len(), ); - self.process_block(b, addr) + self.process_block(b, addr, was_requested) } fn compact_block_received(&self, cb: core::CompactBlock, addr: SocketAddr) -> bool { @@ -136,7 +136,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { if cb.kern_ids().is_empty() { // push the freshly hydrated block through the chain pipeline match core::Block::hydrate_from(cb, vec![]) { - Ok(block) => self.process_block(block, addr), + Ok(block) => self.process_block(block, addr, false), Err(e) => { debug!("Invalid hydrated block {}: {:?}", cb_hash, e); return false; @@ -146,7 +146,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { // check at least the header is valid before hydrating if let Err(e) = self .chain() - .process_block_header(&cb.header, self.chain_opts()) + .process_block_header(&cb.header, self.chain_opts(false)) { debug!("Invalid compact block header {}: {:?}", cb_hash, e.kind()); return !e.is_bad_data(); @@ -183,7 +183,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { .is_ok() { debug!("successfully hydrated block from tx pool!"); - self.process_block(block, addr) + self.process_block(block, addr, false) } else { if self.sync_state.status() == SyncStatus::NoSync { debug!("adapter: block invalid after hydration, requesting full block"); @@ -210,7 +210,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { // pushing the new block header through the header chain pipeline // we will go ask for the block if this is a new header - let res = self.chain().process_block_header(&bh, self.chain_opts()); + let res = self.chain().process_block_header(&bh, self.chain_opts(false)); if let &Err(ref e) = &res { debug!("Block header {} refused by chain: {:?}", bhash, e.kind()); @@ -239,7 +239,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { } // try to add headers to our header chain - let res = self.chain().sync_block_headers(bhs, self.chain_opts()); + let res = self.chain().sync_block_headers(bhs, self.chain_opts(true)); if let &Err(ref e) = &res { debug!("Block headers refused by chain: {:?}", e); @@ -419,7 +419,7 @@ impl NetToChainAdapter { // pushing the new block through the chain pipeline // remembering to reset the head if we have a bad block - fn process_block(&self, b: core::Block, addr: SocketAddr) -> bool { + fn process_block(&self, b: core::Block, addr: SocketAddr, was_requested: bool) -> bool { // We cannot process blocks earlier than the horizon so check for this here. { let head = self.chain().head().unwrap(); @@ -434,7 +434,7 @@ impl NetToChainAdapter { let bhash = b.hash(); let previous = self.chain().get_previous_header(&b.header); - match self.chain().process_block(b, self.chain_opts()) { + match self.chain().process_block(b, self.chain_opts(was_requested)) { Ok(_) => { self.validate_chain(bhash); self.check_compact(); @@ -587,8 +587,8 @@ impl NetToChainAdapter { } /// Prepare options for the chain pipeline - fn chain_opts(&self) -> chain::Options { - let opts = if self.sync_state.is_syncing() { + fn chain_opts(&self, was_requested: bool) -> chain::Options { + let opts = if was_requested { chain::Options::SYNC } else { chain::Options::NONE @@ -635,20 +635,19 @@ impl ChainAdapter for ChainToPoolAndNetAdapter { } } - if self.sync_state.is_syncing() { - return; - } - - // If we mined the block then we want to broadcast the compact block. - // If we received the block from another node then broadcast "header first" - // to minimize network traffic. - if opts.contains(Options::MINE) { - // propagate compact block out if we mined the block - let cb: CompactBlock = b.clone().into(); - self.peers().broadcast_compact_block(&cb); - } else { - // "header first" propagation if we are not the originator of this block - self.peers().broadcast_header(&b.header); + // not broadcasting blocks received through sync + if !opts.contains(chain::Options::SYNC) { + // If we mined the block then we want to broadcast the compact block. + // If we received the block from another node then broadcast "header first" + // to minimize network traffic. + if opts.contains(Options::MINE) { + // propagate compact block out if we mined the block + let cb: CompactBlock = b.clone().into(); + self.peers().broadcast_compact_block(&cb); + } else { + // "header first" propagation if we are not the originator of this block + self.peers().broadcast_header(&b.header); + } } // Reconcile the txpool against the new block *after* we have broadcast it too our peers.