Track blocks we requested, always broadcast otherwise (#2349)

This commit is contained in:
Ignotus Peverell 2019-01-12 09:28:03 -08:00 committed by GitHub
parent cf8f9d609a
commit f9a20aef0d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 69 additions and 46 deletions

View file

@ -261,7 +261,7 @@ impl Peer {
/// Sends the provided block to the remote peer. The request may be dropped /// Sends the provided block to the remote peer. The request may be dropped
/// if the remote peer is known to already have the block. /// if the remote peer is known to already have the block.
pub fn send_block(&self, b: &core::Block) -> Result<bool, Error> { pub fn send_block(&self, b: &core::Block) -> Result<bool, Error> {
if !self.tracking_adapter.has(b.hash()) { if !self.tracking_adapter.has_recv(b.hash()) {
trace!("Send block {} to {}", b.hash(), self.info.addr); trace!("Send block {} to {}", b.hash(), self.info.addr);
self.connection self.connection
.as_ref() .as_ref()
@ -280,7 +280,7 @@ impl Peer {
} }
pub fn send_compact_block(&self, b: &core::CompactBlock) -> Result<bool, Error> { pub fn send_compact_block(&self, b: &core::CompactBlock) -> Result<bool, Error> {
if !self.tracking_adapter.has(b.hash()) { if !self.tracking_adapter.has_recv(b.hash()) {
trace!("Send compact block {} to {}", b.hash(), self.info.addr); trace!("Send compact block {} to {}", b.hash(), self.info.addr);
self.connection self.connection
.as_ref() .as_ref()
@ -299,7 +299,7 @@ impl Peer {
} }
pub fn send_header(&self, bh: &core::BlockHeader) -> Result<bool, Error> { pub fn send_header(&self, bh: &core::BlockHeader) -> Result<bool, Error> {
if !self.tracking_adapter.has(bh.hash()) { if !self.tracking_adapter.has_recv(bh.hash()) {
debug!("Send header {} to {}", bh.hash(), self.info.addr); debug!("Send header {} to {}", bh.hash(), self.info.addr);
self.connection self.connection
.as_ref() .as_ref()
@ -318,7 +318,7 @@ impl Peer {
} }
pub fn send_tx_kernel_hash(&self, h: Hash) -> Result<bool, Error> { pub fn send_tx_kernel_hash(&self, h: Hash) -> Result<bool, Error> {
if !self.tracking_adapter.has(h) { if !self.tracking_adapter.has_recv(h) {
debug!("Send tx kernel hash {} to {}", h, self.info.addr); debug!("Send tx kernel hash {} to {}", h, self.info.addr);
self.connection self.connection
.as_ref() .as_ref()
@ -350,7 +350,7 @@ impl Peer {
return self.send_tx_kernel_hash(kernel.hash()); return self.send_tx_kernel_hash(kernel.hash());
} }
if !self.tracking_adapter.has(kernel.hash()) { if !self.tracking_adapter.has_recv(kernel.hash()) {
debug!("Send full tx {} to {}", tx.hash(), self.info.addr); debug!("Send full tx {} to {}", tx.hash(), self.info.addr);
self.connection self.connection
.as_ref() .as_ref()
@ -405,6 +405,7 @@ impl Peer {
/// Sends a request for a specific block by hash /// Sends a request for a specific block by hash
pub fn send_block_request(&self, h: Hash) -> Result<(), Error> { pub fn send_block_request(&self, h: Hash) -> Result<(), Error> {
debug!("Requesting block {} from peer {}.", h, self.info.addr); debug!("Requesting block {} from peer {}.", h, self.info.addr);
self.tracking_adapter.push_req(h);
self.connection self.connection
.as_ref() .as_ref()
.unwrap() .unwrap()
@ -499,29 +500,32 @@ fn stop_with_connection(connection: &conn::Tracker) {
} }
/// Adapter implementation that forwards everything to an underlying adapter /// Adapter implementation that forwards everything to an underlying adapter
/// but keeps track of the block and transaction hashes that were received. /// but keeps track of the block and transaction hashes that were requested or
/// received.
#[derive(Clone)] #[derive(Clone)]
struct TrackingAdapter { struct TrackingAdapter {
adapter: Arc<dyn NetAdapter>, adapter: Arc<dyn NetAdapter>,
known: Arc<RwLock<Vec<Hash>>>, known: Arc<RwLock<Vec<Hash>>>,
requested: Arc<RwLock<Vec<Hash>>>,
} }
impl TrackingAdapter { impl TrackingAdapter {
fn new(adapter: Arc<dyn NetAdapter>) -> TrackingAdapter { fn new(adapter: Arc<dyn NetAdapter>) -> TrackingAdapter {
TrackingAdapter { TrackingAdapter {
adapter: adapter, adapter: adapter,
known: Arc::new(RwLock::new(vec![])), known: Arc::new(RwLock::new(Vec::with_capacity(MAX_TRACK_SIZE))),
requested: Arc::new(RwLock::new(Vec::with_capacity(MAX_TRACK_SIZE))),
} }
} }
fn has(&self, hash: Hash) -> bool { fn has_recv(&self, hash: Hash) -> bool {
let known = self.known.read(); let known = self.known.read();
// may become too slow, an ordered set (by timestamp for eviction) may // may become too slow, an ordered set (by timestamp for eviction) may
// end up being a better choice // end up being a better choice
known.contains(&hash) known.contains(&hash)
} }
fn push(&self, hash: Hash) { fn push_recv(&self, hash: Hash) {
let mut known = self.known.write(); let mut known = self.known.write();
if known.len() > MAX_TRACK_SIZE { if known.len() > MAX_TRACK_SIZE {
known.truncate(MAX_TRACK_SIZE); known.truncate(MAX_TRACK_SIZE);
@ -530,6 +534,23 @@ impl TrackingAdapter {
known.insert(0, hash); known.insert(0, hash);
} }
} }
fn has_req(&self, hash: Hash) -> bool {
let requested = self.requested.read();
// may become too slow, an ordered set (by timestamp for eviction) may
// end up being a better choice
requested.contains(&hash)
}
fn push_req(&self, hash: Hash) {
let mut requested = self.requested.write();
if requested.len() > MAX_TRACK_SIZE {
requested.truncate(MAX_TRACK_SIZE);
}
if !requested.contains(&hash) {
requested.insert(0, hash);
}
}
} }
impl ChainAdapter for TrackingAdapter { impl ChainAdapter for TrackingAdapter {
@ -546,7 +567,7 @@ impl ChainAdapter for TrackingAdapter {
} }
fn tx_kernel_received(&self, kernel_hash: Hash, addr: SocketAddr) { fn tx_kernel_received(&self, kernel_hash: Hash, addr: SocketAddr) {
self.push(kernel_hash); self.push_recv(kernel_hash);
self.adapter.tx_kernel_received(kernel_hash, addr) self.adapter.tx_kernel_received(kernel_hash, addr)
} }
@ -556,23 +577,24 @@ impl ChainAdapter for TrackingAdapter {
// correctly. // correctly.
if !stem { if !stem {
let kernel = &tx.kernels()[0]; let kernel = &tx.kernels()[0];
self.push(kernel.hash()); self.push_recv(kernel.hash());
} }
self.adapter.transaction_received(tx, stem) self.adapter.transaction_received(tx, stem)
} }
fn block_received(&self, b: core::Block, addr: SocketAddr) -> bool { fn block_received(&self, b: core::Block, addr: SocketAddr, _was_requested: bool) -> bool {
self.push(b.hash()); let bh = b.hash();
self.adapter.block_received(b, addr) self.push_recv(bh);
self.adapter.block_received(b, addr, self.has_req(bh))
} }
fn compact_block_received(&self, cb: core::CompactBlock, addr: SocketAddr) -> bool { fn compact_block_received(&self, cb: core::CompactBlock, addr: SocketAddr) -> bool {
self.push(cb.hash()); self.push_recv(cb.hash());
self.adapter.compact_block_received(cb, addr) self.adapter.compact_block_received(cb, addr)
} }
fn header_received(&self, bh: core::BlockHeader, addr: SocketAddr) -> bool { fn header_received(&self, bh: core::BlockHeader, addr: SocketAddr) -> bool {
self.push(bh.hash()); self.push_recv(bh.hash());
self.adapter.header_received(bh, addr) self.adapter.header_received(bh, addr)
} }

View file

@ -554,9 +554,9 @@ impl ChainAdapter for Peers {
self.adapter.transaction_received(tx, stem) self.adapter.transaction_received(tx, stem)
} }
fn block_received(&self, b: core::Block, peer_addr: SocketAddr) -> bool { fn block_received(&self, b: core::Block, peer_addr: SocketAddr, was_requested: bool) -> bool {
let hash = b.hash(); let hash = b.hash();
if !self.adapter.block_received(b, peer_addr) { if !self.adapter.block_received(b, peer_addr, was_requested) {
// if the peer sent us a block that's intrinsically bad // if the peer sent us a block that's intrinsically bad
// they are either mistaken or malevolent, both of which require a ban // they are either mistaken or malevolent, both of which require a ban
debug!( debug!(

View file

@ -154,7 +154,9 @@ impl MessageHandler for Protocol {
); );
let b: core::Block = msg.body()?; let b: core::Block = msg.body()?;
adapter.block_received(b, self.addr); // we can't know at this level whether we requested the block or not,
// the boolean should be properly set in higher level adapter
adapter.block_received(b, self.addr, false);
Ok(None) Ok(None)
} }

View file

@ -242,7 +242,7 @@ impl ChainAdapter for DummyAdapter {
fn header_received(&self, _bh: core::BlockHeader, _addr: SocketAddr) -> bool { fn header_received(&self, _bh: core::BlockHeader, _addr: SocketAddr) -> bool {
true true
} }
fn block_received(&self, _: core::Block, _: SocketAddr) -> bool { fn block_received(&self, _: core::Block, _: SocketAddr, _: bool) -> bool {
true true
} }
fn headers_received(&self, _: &[core::BlockHeader], _: SocketAddr) -> bool { fn headers_received(&self, _: &[core::BlockHeader], _: SocketAddr) -> bool {

View file

@ -355,7 +355,7 @@ pub trait ChainAdapter: Sync + Send {
/// block could be handled properly and is not deemed defective by the /// block could be handled properly and is not deemed defective by the
/// chain. Returning false means the block will never be valid and /// chain. Returning false means the block will never be valid and
/// may result in the peer being banned. /// may result in the peer being banned.
fn block_received(&self, b: core::Block, addr: SocketAddr) -> bool; fn block_received(&self, b: core::Block, addr: SocketAddr, was_requested: bool) -> bool;
fn compact_block_received(&self, cb: core::CompactBlock, addr: SocketAddr) -> bool; fn compact_block_received(&self, cb: core::CompactBlock, addr: SocketAddr) -> bool;

View file

@ -107,7 +107,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
} }
} }
fn block_received(&self, b: core::Block, addr: SocketAddr) -> bool { fn block_received(&self, b: core::Block, addr: SocketAddr, was_requested: bool) -> bool {
debug!( debug!(
"Received block {} at {} from {} [in/out/kern: {}/{}/{}] going to process.", "Received block {} at {} from {} [in/out/kern: {}/{}/{}] going to process.",
b.hash(), b.hash(),
@ -117,7 +117,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
b.outputs().len(), b.outputs().len(),
b.kernels().len(), b.kernels().len(),
); );
self.process_block(b, addr) self.process_block(b, addr, was_requested)
} }
fn compact_block_received(&self, cb: core::CompactBlock, addr: SocketAddr) -> bool { fn compact_block_received(&self, cb: core::CompactBlock, addr: SocketAddr) -> bool {
@ -136,7 +136,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
if cb.kern_ids().is_empty() { if cb.kern_ids().is_empty() {
// push the freshly hydrated block through the chain pipeline // push the freshly hydrated block through the chain pipeline
match core::Block::hydrate_from(cb, vec![]) { match core::Block::hydrate_from(cb, vec![]) {
Ok(block) => self.process_block(block, addr), Ok(block) => self.process_block(block, addr, false),
Err(e) => { Err(e) => {
debug!("Invalid hydrated block {}: {:?}", cb_hash, e); debug!("Invalid hydrated block {}: {:?}", cb_hash, e);
return false; return false;
@ -146,7 +146,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
// check at least the header is valid before hydrating // check at least the header is valid before hydrating
if let Err(e) = self if let Err(e) = self
.chain() .chain()
.process_block_header(&cb.header, self.chain_opts()) .process_block_header(&cb.header, self.chain_opts(false))
{ {
debug!("Invalid compact block header {}: {:?}", cb_hash, e.kind()); debug!("Invalid compact block header {}: {:?}", cb_hash, e.kind());
return !e.is_bad_data(); return !e.is_bad_data();
@ -183,7 +183,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
.is_ok() .is_ok()
{ {
debug!("successfully hydrated block from tx pool!"); debug!("successfully hydrated block from tx pool!");
self.process_block(block, addr) self.process_block(block, addr, false)
} else { } else {
if self.sync_state.status() == SyncStatus::NoSync { if self.sync_state.status() == SyncStatus::NoSync {
debug!("adapter: block invalid after hydration, requesting full block"); debug!("adapter: block invalid after hydration, requesting full block");
@ -210,7 +210,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
// pushing the new block header through the header chain pipeline // pushing the new block header through the header chain pipeline
// we will go ask for the block if this is a new header // we will go ask for the block if this is a new header
let res = self.chain().process_block_header(&bh, self.chain_opts()); let res = self.chain().process_block_header(&bh, self.chain_opts(false));
if let &Err(ref e) = &res { if let &Err(ref e) = &res {
debug!("Block header {} refused by chain: {:?}", bhash, e.kind()); debug!("Block header {} refused by chain: {:?}", bhash, e.kind());
@ -239,7 +239,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
} }
// try to add headers to our header chain // try to add headers to our header chain
let res = self.chain().sync_block_headers(bhs, self.chain_opts()); let res = self.chain().sync_block_headers(bhs, self.chain_opts(true));
if let &Err(ref e) = &res { if let &Err(ref e) = &res {
debug!("Block headers refused by chain: {:?}", e); debug!("Block headers refused by chain: {:?}", e);
@ -419,7 +419,7 @@ impl NetToChainAdapter {
// pushing the new block through the chain pipeline // pushing the new block through the chain pipeline
// remembering to reset the head if we have a bad block // remembering to reset the head if we have a bad block
fn process_block(&self, b: core::Block, addr: SocketAddr) -> bool { fn process_block(&self, b: core::Block, addr: SocketAddr, was_requested: bool) -> bool {
// We cannot process blocks earlier than the horizon so check for this here. // We cannot process blocks earlier than the horizon so check for this here.
{ {
let head = self.chain().head().unwrap(); let head = self.chain().head().unwrap();
@ -434,7 +434,7 @@ impl NetToChainAdapter {
let bhash = b.hash(); let bhash = b.hash();
let previous = self.chain().get_previous_header(&b.header); let previous = self.chain().get_previous_header(&b.header);
match self.chain().process_block(b, self.chain_opts()) { match self.chain().process_block(b, self.chain_opts(was_requested)) {
Ok(_) => { Ok(_) => {
self.validate_chain(bhash); self.validate_chain(bhash);
self.check_compact(); self.check_compact();
@ -587,8 +587,8 @@ impl NetToChainAdapter {
} }
/// Prepare options for the chain pipeline /// Prepare options for the chain pipeline
fn chain_opts(&self) -> chain::Options { fn chain_opts(&self, was_requested: bool) -> chain::Options {
let opts = if self.sync_state.is_syncing() { let opts = if was_requested {
chain::Options::SYNC chain::Options::SYNC
} else { } else {
chain::Options::NONE chain::Options::NONE
@ -635,10 +635,8 @@ impl ChainAdapter for ChainToPoolAndNetAdapter {
} }
} }
if self.sync_state.is_syncing() { // not broadcasting blocks received through sync
return; if !opts.contains(chain::Options::SYNC) {
}
// If we mined the block then we want to broadcast the compact block. // If we mined the block then we want to broadcast the compact block.
// If we received the block from another node then broadcast "header first" // If we received the block from another node then broadcast "header first"
// to minimize network traffic. // to minimize network traffic.
@ -650,6 +648,7 @@ impl ChainAdapter for ChainToPoolAndNetAdapter {
// "header first" propagation if we are not the originator of this block // "header first" propagation if we are not the originator of this block
self.peers().broadcast_header(&b.header); self.peers().broadcast_header(&b.header);
} }
}
// Reconcile the txpool against the new block *after* we have broadcast it too our peers. // Reconcile the txpool against the new block *after* we have broadcast it too our peers.
// This may be slow and we do not want to delay block propagation. // This may be slow and we do not want to delay block propagation.