track opts when requesting blocks (not just if we requested them) (#3089)

This commit is contained in:
Antioch Peverell 2019-10-10 09:38:25 +01:00 committed by GitHub
parent 67b2ff717b
commit 8f4a1cba67
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 59 additions and 73 deletions

1
Cargo.lock generated
View file

@ -818,6 +818,7 @@ dependencies = [
"grin_store 3.0.0-alpha.1", "grin_store 3.0.0-alpha.1",
"grin_util 3.0.0-alpha.1", "grin_util 3.0.0-alpha.1",
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"lru-cache 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"net2 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)", "net2 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)",
"num 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)", "num 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)",

View file

@ -13,6 +13,7 @@ edition = "2018"
bitflags = "1" bitflags = "1"
bytes = "0.4" bytes = "0.4"
enum_primitive = "0.1" enum_primitive = "0.1"
lru-cache = "0.1"
net2 = "0.2" net2 = "0.2"
num = "0.1" num = "0.1"
rand = "0.6" rand = "0.6"

View file

@ -21,6 +21,8 @@ use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use lru_cache::LruCache;
use crate::chain; use crate::chain;
use crate::conn; use crate::conn;
use crate::core::core::hash::{Hash, Hashed}; use crate::core::core::hash::{Hash, Hashed};
@ -364,10 +366,11 @@ impl Peer {
self.send(&h, msg::Type::GetTransaction) self.send(&h, msg::Type::GetTransaction)
} }
/// Sends a request for a specific block by hash /// Sends a request for a specific block by hash.
pub fn send_block_request(&self, h: Hash) -> Result<(), Error> { /// Takes opts so we can track if this request was due to our node syncing or otherwise.
pub fn send_block_request(&self, h: Hash, opts: chain::Options) -> Result<(), Error> {
debug!("Requesting block {} from peer {}.", h, self.info.addr); debug!("Requesting block {} from peer {}.", h, self.info.addr);
self.tracking_adapter.push_req(h); self.tracking_adapter.push_req(h, opts);
self.send(&h, msg::Type::GetBlock) self.send(&h, msg::Type::GetBlock)
} }
@ -429,51 +432,35 @@ impl Peer {
#[derive(Clone)] #[derive(Clone)]
struct TrackingAdapter { struct TrackingAdapter {
adapter: Arc<dyn NetAdapter>, adapter: Arc<dyn NetAdapter>,
known: Arc<RwLock<Vec<Hash>>>, received: Arc<RwLock<LruCache<Hash, ()>>>,
requested: Arc<RwLock<Vec<Hash>>>, requested: Arc<RwLock<LruCache<Hash, chain::Options>>>,
} }
impl TrackingAdapter { impl TrackingAdapter {
fn new(adapter: Arc<dyn NetAdapter>) -> TrackingAdapter { fn new(adapter: Arc<dyn NetAdapter>) -> TrackingAdapter {
TrackingAdapter { TrackingAdapter {
adapter: adapter, adapter: adapter,
known: Arc::new(RwLock::new(Vec::with_capacity(MAX_TRACK_SIZE))), received: Arc::new(RwLock::new(LruCache::new(MAX_TRACK_SIZE))),
requested: Arc::new(RwLock::new(Vec::with_capacity(MAX_TRACK_SIZE))), requested: Arc::new(RwLock::new(LruCache::new(MAX_TRACK_SIZE))),
} }
} }
fn has_recv(&self, hash: Hash) -> bool { fn has_recv(&self, hash: Hash) -> bool {
let known = self.known.read(); self.received.write().contains_key(&hash)
// may become too slow, an ordered set (by timestamp for eviction) may
// end up being a better choice
known.contains(&hash)
} }
fn push_recv(&self, hash: Hash) { fn push_recv(&self, hash: Hash) {
let mut known = self.known.write(); self.received.write().insert(hash, ());
if known.len() > MAX_TRACK_SIZE {
known.truncate(MAX_TRACK_SIZE);
}
if !known.contains(&hash) {
known.insert(0, hash);
}
} }
fn has_req(&self, hash: Hash) -> bool { /// Track a block or transaction hash requested by us.
let requested = self.requested.read(); /// Track the opts alongside the hash so we know if this was due to us syncing or not.
// may become too slow, an ordered set (by timestamp for eviction) may fn push_req(&self, hash: Hash, opts: chain::Options) {
// end up being a better choice self.requested.write().insert(hash, opts);
requested.contains(&hash)
} }
fn push_req(&self, hash: Hash) { fn req_opts(&self, hash: Hash) -> Option<chain::Options> {
let mut requested = self.requested.write(); self.requested.write().get_mut(&hash).cloned()
if requested.len() > MAX_TRACK_SIZE {
requested.truncate(MAX_TRACK_SIZE);
}
if !requested.contains(&hash) {
requested.insert(0, hash);
}
} }
} }
@ -518,11 +505,17 @@ impl ChainAdapter for TrackingAdapter {
&self, &self,
b: core::Block, b: core::Block,
peer_info: &PeerInfo, peer_info: &PeerInfo,
_was_requested: bool, opts: chain::Options,
) -> Result<bool, chain::Error> { ) -> Result<bool, chain::Error> {
let bh = b.hash(); let bh = b.hash();
self.push_recv(bh); self.push_recv(bh);
self.adapter.block_received(b, peer_info, self.has_req(bh))
// If we are currently tracking a request for this block then
// use the opts specified when we made the request.
// If we requested this block as part of sync then we want to
// let our adapter know this when we receive it.
let req_opts = self.req_opts(bh).unwrap_or(opts);
self.adapter.block_received(b, peer_info, req_opts)
} }
fn compact_block_received( fn compact_block_received(

View file

@ -577,10 +577,10 @@ impl ChainAdapter for Peers {
&self, &self,
b: core::Block, b: core::Block,
peer_info: &PeerInfo, peer_info: &PeerInfo,
was_requested: bool, opts: chain::Options,
) -> Result<bool, chain::Error> { ) -> Result<bool, chain::Error> {
let hash = b.hash(); let hash = b.hash();
if !self.adapter.block_received(b, peer_info, was_requested)? { if !self.adapter.block_received(b, peer_info, opts)? {
// if the peer sent us a block that's intrinsically bad // if the peer sent us a block that's intrinsically bad
// they are either mistaken or malevolent, both of which require a ban // they are either mistaken or malevolent, both of which require a ban
debug!( debug!(

View file

@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use crate::chain;
use crate::conn::{Message, MessageHandler, Tracker}; use crate::conn::{Message, MessageHandler, Tracker};
use crate::core::core::{self, hash::Hash, hash::Hashed, CompactBlock}; use crate::core::core::{self, hash::Hash, hash::Hashed, CompactBlock};
@ -162,9 +163,11 @@ impl MessageHandler for Protocol {
); );
let b: core::Block = msg.body()?; let b: core::Block = msg.body()?;
// we can't know at this level whether we requested the block or not, // We default to NONE opts here as we do not know know yet why this block was
// the boolean should be properly set in higher level adapter // received.
adapter.block_received(b, &self.peer_info, false)?; // If we requested this block from a peer due to our node syncing then
// the peer adapter will override opts to reflect this.
adapter.block_received(b, &self.peer_info, chain::Options::NONE)?;
Ok(None) Ok(None)
} }

View file

@ -296,7 +296,12 @@ impl ChainAdapter for DummyAdapter {
) -> Result<bool, chain::Error> { ) -> Result<bool, chain::Error> {
Ok(true) Ok(true)
} }
fn block_received(&self, _: core::Block, _: &PeerInfo, _: bool) -> Result<bool, chain::Error> { fn block_received(
&self,
_: core::Block,
_: &PeerInfo,
_: chain::Options,
) -> Result<bool, chain::Error> {
Ok(true) Ok(true)
} }
fn headers_received( fn headers_received(

View file

@ -529,7 +529,7 @@ pub trait ChainAdapter: Sync + Send {
&self, &self,
b: core::Block, b: core::Block,
peer_info: &PeerInfo, peer_info: &PeerInfo,
was_requested: bool, opts: chain::Options,
) -> Result<bool, chain::Error>; ) -> Result<bool, chain::Error>;
fn compact_block_received( fn compact_block_received(

View file

@ -118,7 +118,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
&self, &self,
b: core::Block, b: core::Block,
peer_info: &PeerInfo, peer_info: &PeerInfo,
was_requested: bool, opts: chain::Options,
) -> Result<bool, chain::Error> { ) -> Result<bool, chain::Error> {
if self.chain().block_exists(b.hash())? { if self.chain().block_exists(b.hash())? {
return Ok(true); return Ok(true);
@ -132,7 +132,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
b.outputs().len(), b.outputs().len(),
b.kernels().len(), b.kernels().len(),
); );
self.process_block(b, peer_info, was_requested) self.process_block(b, peer_info, opts)
} }
fn compact_block_received( fn compact_block_received(
@ -165,7 +165,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
hook.on_block_received(&block, &peer_info.addr); hook.on_block_received(&block, &peer_info.addr);
} }
} }
self.process_block(block, peer_info, false) self.process_block(block, peer_info, chain::Options::NONE)
} }
Err(e) => { Err(e) => {
debug!("Invalid hydrated block {}: {:?}", cb_hash, e); debug!("Invalid hydrated block {}: {:?}", cb_hash, e);
@ -176,7 +176,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
// check at least the header is valid before hydrating // check at least the header is valid before hydrating
if let Err(e) = self if let Err(e) = self
.chain() .chain()
.process_block_header(&cb.header, self.chain_opts(false)) .process_block_header(&cb.header, chain::Options::NONE)
{ {
debug!("Invalid compact block header {}: {:?}", cb_hash, e.kind()); debug!("Invalid compact block header {}: {:?}", cb_hash, e.kind());
return Ok(!e.is_bad_data()); return Ok(!e.is_bad_data());
@ -220,11 +220,11 @@ impl p2p::ChainAdapter for NetToChainAdapter {
.is_ok() .is_ok()
{ {
debug!("successfully hydrated block from tx pool!"); debug!("successfully hydrated block from tx pool!");
self.process_block(block, peer_info, false) self.process_block(block, peer_info, chain::Options::NONE)
} else { } else {
if self.sync_state.status() == SyncStatus::NoSync { if self.sync_state.status() == SyncStatus::NoSync {
debug!("adapter: block invalid after hydration, requesting full block"); debug!("adapter: block invalid after hydration, requesting full block");
self.request_block(&cb.header, peer_info); self.request_block(&cb.header, peer_info, chain::Options::NONE);
Ok(true) Ok(true)
} else { } else {
debug!("block invalid after hydration, ignoring it, cause still syncing"); debug!("block invalid after hydration, ignoring it, cause still syncing");
@ -255,9 +255,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
// pushing the new block header through the header chain pipeline // pushing the new block header through the header chain pipeline
// we will go ask for the block if this is a new header // we will go ask for the block if this is a new header
let res = self let res = self.chain().process_block_header(&bh, chain::Options::NONE);
.chain()
.process_block_header(&bh, self.chain_opts(false));
if let Err(e) = res { if let Err(e) = res {
debug!( debug!(
@ -298,7 +296,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
} }
// try to add headers to our header chain // try to add headers to our header chain
match self.chain().sync_block_headers(bhs, self.chain_opts(true)) { match self.chain().sync_block_headers(bhs, chain::Options::SYNC) {
Ok(_) => Ok(true), Ok(_) => Ok(true),
Err(e) => { Err(e) => {
debug!("Block headers refused by chain: {:?}", e); debug!("Block headers refused by chain: {:?}", e);
@ -533,7 +531,7 @@ impl NetToChainAdapter {
&self, &self,
b: core::Block, b: core::Block,
peer_info: &PeerInfo, peer_info: &PeerInfo,
was_requested: bool, opts: chain::Options,
) -> Result<bool, chain::Error> { ) -> Result<bool, chain::Error> {
// We cannot process blocks earlier than the horizon so check for this here. // We cannot process blocks earlier than the horizon so check for this here.
{ {
@ -549,10 +547,7 @@ impl NetToChainAdapter {
let bhash = b.hash(); let bhash = b.hash();
let previous = self.chain().get_previous_header(&b.header); let previous = self.chain().get_previous_header(&b.header);
match self match self.chain().process_block(b, opts) {
.chain()
.process_block(b, self.chain_opts(was_requested))
{
Ok(_) => { Ok(_) => {
self.validate_chain(bhash); self.validate_chain(bhash);
self.check_compact(); self.check_compact();
@ -571,7 +566,7 @@ impl NetToChainAdapter {
&& !self.sync_state.is_syncing() && !self.sync_state.is_syncing()
{ {
debug!("process_block: received an orphan block, checking the parent: {:}", previous.hash()); debug!("process_block: received an orphan block, checking the parent: {:}", previous.hash());
self.request_block_by_hash(previous.hash(), peer_info) self.request_block(&previous, peer_info, chain::Options::NONE)
} }
} }
Ok(true) Ok(true)
@ -646,12 +641,10 @@ impl NetToChainAdapter {
// it into a full block then fallback to requesting the full block // it into a full block then fallback to requesting the full block
// from the same peer that gave us the compact block // from the same peer that gave us the compact block
// consider additional peers for redundancy? // consider additional peers for redundancy?
fn request_block(&self, bh: &BlockHeader, peer_info: &PeerInfo) { fn request_block(&self, bh: &BlockHeader, peer_info: &PeerInfo, opts: Options) {
self.request_block_by_hash(bh.hash(), peer_info) self.send_block_request_to_peer(bh.hash(), peer_info, |peer, h| {
} peer.send_block_request(h, opts)
})
fn request_block_by_hash(&self, h: Hash, peer_info: &PeerInfo) {
self.send_block_request_to_peer(h, peer_info, |peer, h| peer.send_block_request(h))
} }
// After we have received a block header in "header first" propagation // After we have received a block header in "header first" propagation
@ -703,16 +696,6 @@ impl NetToChainAdapter {
), ),
} }
} }
/// Prepare options for the chain pipeline
fn chain_opts(&self, was_requested: bool) -> chain::Options {
let opts = if was_requested {
chain::Options::SYNC
} else {
chain::Options::NONE
};
opts
}
} }
/// Implementation of the ChainAdapter for the network. Gets notified when the /// Implementation of the ChainAdapter for the network. Gets notified when the

View file

@ -138,7 +138,7 @@ impl BodySync {
let mut peers_iter = peers.iter().cycle(); let mut peers_iter = peers.iter().cycle();
for hash in hashes_to_get.clone() { for hash in hashes_to_get.clone() {
if let Some(peer) = peers_iter.next() { if let Some(peer) = peers_iter.next() {
if let Err(e) = peer.send_block_request(*hash) { if let Err(e) = peer.send_block_request(*hash, chain::Options::SYNC) {
debug!("Skipped request to {}: {:?}", peer.info.addr, e); debug!("Skipped request to {}: {:?}", peer.info.addr, e);
peer.stop(); peer.stop();
} else { } else {