diff --git a/Cargo.lock b/Cargo.lock index 9b998ab80..590d7247f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -818,6 +818,7 @@ dependencies = [ "grin_store 3.0.0-alpha.1", "grin_util 3.0.0-alpha.1", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "lru-cache 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "net2 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)", "num 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 7777bb501..4dfc80aa5 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -13,6 +13,7 @@ edition = "2018" bitflags = "1" bytes = "0.4" enum_primitive = "0.1" +lru-cache = "0.1" net2 = "0.2" num = "0.1" rand = "0.6" diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index dded7b114..77b7d7805 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -21,6 +21,8 @@ use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use lru_cache::LruCache; + use crate::chain; use crate::conn; use crate::core::core::hash::{Hash, Hashed}; @@ -364,10 +366,11 @@ impl Peer { self.send(&h, msg::Type::GetTransaction) } - /// Sends a request for a specific block by hash - pub fn send_block_request(&self, h: Hash) -> Result<(), Error> { + /// Sends a request for a specific block by hash. + /// Takes opts so we can track if this request was due to our node syncing or otherwise. + pub fn send_block_request(&self, h: Hash, opts: chain::Options) -> Result<(), Error> { debug!("Requesting block {} from peer {}.", h, self.info.addr); - self.tracking_adapter.push_req(h); + self.tracking_adapter.push_req(h, opts); self.send(&h, msg::Type::GetBlock) } @@ -429,51 +432,35 @@ impl Peer { #[derive(Clone)] struct TrackingAdapter { adapter: Arc, - known: Arc>>, - requested: Arc>>, + received: Arc>>, + requested: Arc>>, } impl TrackingAdapter { fn new(adapter: Arc) -> TrackingAdapter { TrackingAdapter { adapter: adapter, - known: Arc::new(RwLock::new(Vec::with_capacity(MAX_TRACK_SIZE))), - requested: Arc::new(RwLock::new(Vec::with_capacity(MAX_TRACK_SIZE))), + received: Arc::new(RwLock::new(LruCache::new(MAX_TRACK_SIZE))), + requested: Arc::new(RwLock::new(LruCache::new(MAX_TRACK_SIZE))), } } fn has_recv(&self, hash: Hash) -> bool { - let known = self.known.read(); - // may become too slow, an ordered set (by timestamp for eviction) may - // end up being a better choice - known.contains(&hash) + self.received.write().contains_key(&hash) } fn push_recv(&self, hash: Hash) { - let mut known = self.known.write(); - if known.len() > MAX_TRACK_SIZE { - known.truncate(MAX_TRACK_SIZE); - } - if !known.contains(&hash) { - known.insert(0, hash); - } + self.received.write().insert(hash, ()); } - fn has_req(&self, hash: Hash) -> bool { - let requested = self.requested.read(); - // may become too slow, an ordered set (by timestamp for eviction) may - // end up being a better choice - requested.contains(&hash) + /// Track a block or transaction hash requested by us. + /// Track the opts alongside the hash so we know if this was due to us syncing or not. + fn push_req(&self, hash: Hash, opts: chain::Options) { + self.requested.write().insert(hash, opts); } - fn push_req(&self, hash: Hash) { - let mut requested = self.requested.write(); - if requested.len() > MAX_TRACK_SIZE { - requested.truncate(MAX_TRACK_SIZE); - } - if !requested.contains(&hash) { - requested.insert(0, hash); - } + fn req_opts(&self, hash: Hash) -> Option { + self.requested.write().get_mut(&hash).cloned() } } @@ -518,11 +505,17 @@ impl ChainAdapter for TrackingAdapter { &self, b: core::Block, peer_info: &PeerInfo, - _was_requested: bool, + opts: chain::Options, ) -> Result { let bh = b.hash(); self.push_recv(bh); - self.adapter.block_received(b, peer_info, self.has_req(bh)) + + // If we are currently tracking a request for this block then + // use the opts specified when we made the request. + // If we requested this block as part of sync then we want to + // let our adapter know this when we receive it. + let req_opts = self.req_opts(bh).unwrap_or(opts); + self.adapter.block_received(b, peer_info, req_opts) } fn compact_block_received( diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index 818105128..505008cae 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -577,10 +577,10 @@ impl ChainAdapter for Peers { &self, b: core::Block, peer_info: &PeerInfo, - was_requested: bool, + opts: chain::Options, ) -> Result { let hash = b.hash(); - if !self.adapter.block_received(b, peer_info, was_requested)? { + if !self.adapter.block_received(b, peer_info, opts)? { // if the peer sent us a block that's intrinsically bad // they are either mistaken or malevolent, both of which require a ban debug!( diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index afb57e071..0f2646ad4 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::chain; use crate::conn::{Message, MessageHandler, Tracker}; use crate::core::core::{self, hash::Hash, hash::Hashed, CompactBlock}; @@ -162,9 +163,11 @@ impl MessageHandler for Protocol { ); let b: core::Block = msg.body()?; - // we can't know at this level whether we requested the block or not, - // the boolean should be properly set in higher level adapter - adapter.block_received(b, &self.peer_info, false)?; + // We default to NONE opts here as we do not know know yet why this block was + // received. + // If we requested this block from a peer due to our node syncing then + // the peer adapter will override opts to reflect this. + adapter.block_received(b, &self.peer_info, chain::Options::NONE)?; Ok(None) } diff --git a/p2p/src/serv.rs b/p2p/src/serv.rs index 6629604b0..bd9476ddb 100644 --- a/p2p/src/serv.rs +++ b/p2p/src/serv.rs @@ -296,7 +296,12 @@ impl ChainAdapter for DummyAdapter { ) -> Result { Ok(true) } - fn block_received(&self, _: core::Block, _: &PeerInfo, _: bool) -> Result { + fn block_received( + &self, + _: core::Block, + _: &PeerInfo, + _: chain::Options, + ) -> Result { Ok(true) } fn headers_received( diff --git a/p2p/src/types.rs b/p2p/src/types.rs index f9a5f40be..a0da98a50 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -529,7 +529,7 @@ pub trait ChainAdapter: Sync + Send { &self, b: core::Block, peer_info: &PeerInfo, - was_requested: bool, + opts: chain::Options, ) -> Result; fn compact_block_received( diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index 1f51c4f05..9d1748dd7 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -118,7 +118,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { &self, b: core::Block, peer_info: &PeerInfo, - was_requested: bool, + opts: chain::Options, ) -> Result { if self.chain().block_exists(b.hash())? { return Ok(true); @@ -132,7 +132,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { b.outputs().len(), b.kernels().len(), ); - self.process_block(b, peer_info, was_requested) + self.process_block(b, peer_info, opts) } fn compact_block_received( @@ -165,7 +165,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { hook.on_block_received(&block, &peer_info.addr); } } - self.process_block(block, peer_info, false) + self.process_block(block, peer_info, chain::Options::NONE) } Err(e) => { debug!("Invalid hydrated block {}: {:?}", cb_hash, e); @@ -176,7 +176,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { // check at least the header is valid before hydrating if let Err(e) = self .chain() - .process_block_header(&cb.header, self.chain_opts(false)) + .process_block_header(&cb.header, chain::Options::NONE) { debug!("Invalid compact block header {}: {:?}", cb_hash, e.kind()); return Ok(!e.is_bad_data()); @@ -220,11 +220,11 @@ impl p2p::ChainAdapter for NetToChainAdapter { .is_ok() { debug!("successfully hydrated block from tx pool!"); - self.process_block(block, peer_info, false) + self.process_block(block, peer_info, chain::Options::NONE) } else { if self.sync_state.status() == SyncStatus::NoSync { debug!("adapter: block invalid after hydration, requesting full block"); - self.request_block(&cb.header, peer_info); + self.request_block(&cb.header, peer_info, chain::Options::NONE); Ok(true) } else { debug!("block invalid after hydration, ignoring it, cause still syncing"); @@ -255,9 +255,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { // pushing the new block header through the header chain pipeline // we will go ask for the block if this is a new header - let res = self - .chain() - .process_block_header(&bh, self.chain_opts(false)); + let res = self.chain().process_block_header(&bh, chain::Options::NONE); if let Err(e) = res { debug!( @@ -298,7 +296,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { } // try to add headers to our header chain - match self.chain().sync_block_headers(bhs, self.chain_opts(true)) { + match self.chain().sync_block_headers(bhs, chain::Options::SYNC) { Ok(_) => Ok(true), Err(e) => { debug!("Block headers refused by chain: {:?}", e); @@ -533,7 +531,7 @@ impl NetToChainAdapter { &self, b: core::Block, peer_info: &PeerInfo, - was_requested: bool, + opts: chain::Options, ) -> Result { // We cannot process blocks earlier than the horizon so check for this here. { @@ -549,10 +547,7 @@ impl NetToChainAdapter { let bhash = b.hash(); let previous = self.chain().get_previous_header(&b.header); - match self - .chain() - .process_block(b, self.chain_opts(was_requested)) - { + match self.chain().process_block(b, opts) { Ok(_) => { self.validate_chain(bhash); self.check_compact(); @@ -571,7 +566,7 @@ impl NetToChainAdapter { && !self.sync_state.is_syncing() { debug!("process_block: received an orphan block, checking the parent: {:}", previous.hash()); - self.request_block_by_hash(previous.hash(), peer_info) + self.request_block(&previous, peer_info, chain::Options::NONE) } } Ok(true) @@ -646,12 +641,10 @@ impl NetToChainAdapter { // it into a full block then fallback to requesting the full block // from the same peer that gave us the compact block // consider additional peers for redundancy? - fn request_block(&self, bh: &BlockHeader, peer_info: &PeerInfo) { - self.request_block_by_hash(bh.hash(), peer_info) - } - - fn request_block_by_hash(&self, h: Hash, peer_info: &PeerInfo) { - self.send_block_request_to_peer(h, peer_info, |peer, h| peer.send_block_request(h)) + fn request_block(&self, bh: &BlockHeader, peer_info: &PeerInfo, opts: Options) { + self.send_block_request_to_peer(bh.hash(), peer_info, |peer, h| { + peer.send_block_request(h, opts) + }) } // After we have received a block header in "header first" propagation @@ -703,16 +696,6 @@ impl NetToChainAdapter { ), } } - - /// Prepare options for the chain pipeline - fn chain_opts(&self, was_requested: bool) -> chain::Options { - let opts = if was_requested { - chain::Options::SYNC - } else { - chain::Options::NONE - }; - opts - } } /// Implementation of the ChainAdapter for the network. Gets notified when the diff --git a/servers/src/grin/sync/body_sync.rs b/servers/src/grin/sync/body_sync.rs index 87a3f5fca..43e1d19db 100644 --- a/servers/src/grin/sync/body_sync.rs +++ b/servers/src/grin/sync/body_sync.rs @@ -138,7 +138,7 @@ impl BodySync { let mut peers_iter = peers.iter().cycle(); for hash in hashes_to_get.clone() { if let Some(peer) = peers_iter.next() { - if let Err(e) = peer.send_block_request(*hash) { + if let Err(e) = peer.send_block_request(*hash, chain::Options::SYNC) { debug!("Skipped request to {}: {:?}", peer.info.addr, e); peer.stop(); } else {