From 6647823177920c5e028c59c6e9a44e3394e77105 Mon Sep 17 00:00:00 2001 From: Antioch Peverell <30642645+antiochp@users.noreply.github.com> Date: Tue, 30 Jan 2018 09:42:04 -0500 Subject: [PATCH] header first propagation (#654) * [wip] header first propagation successfully propagating headers (unless we mined the block itself) not yet asking for the block if we receive a header * call request_block after successful processing header * cleanup and skip asking for block if header is an orphan * comments around error handling in receive_header --- chain/src/chain.rs | 19 ++++-- chain/src/lib.rs | 2 +- chain/src/pipe.rs | 57 ++++++++++++++---- chain/src/types.rs | 14 +++-- chain/tests/mine_simple_chain.rs | 2 +- chain/tests/test_coinbase_maturity.rs | 6 +- grin/src/adapters.rs | 83 +++++++++++++++++++++++++-- grin/src/miner.rs | 8 +-- grin/src/server.rs | 1 + grin/src/sync.rs | 2 +- p2p/src/msg.rs | 2 + p2p/src/peer.rs | 37 +++++++++--- p2p/src/peers.rs | 47 +++++++++++++-- p2p/src/protocol.rs | 35 +++++++++-- p2p/src/server.rs | 9 +-- p2p/src/types.rs | 7 ++- 16 files changed, 273 insertions(+), 58 deletions(-) diff --git a/chain/src/chain.rs b/chain/src/chain.rs index 242dd3b9d..3636694dd 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -237,10 +237,10 @@ pub fn process_block(&self, b: Block, opts: Options) } // notifying other parts of the system of the update - if !opts.intersects(SYNC) { + if !opts.contains(SYNC) { // broadcast the block let adapter = self.adapter.clone(); - adapter.block_accepted(&b); + adapter.block_accepted(&b, opts); } Ok((Some(tip.clone()), Some(b.clone()))) }, @@ -254,10 +254,10 @@ pub fn process_block(&self, b: Block, opts: Options) // or less relevant blocks somehow. // We should also probably consider banning nodes that send us really old blocks. // - if !opts.intersects(SYNC) { + if !opts.contains(SYNC) { // broadcast the block let adapter = self.adapter.clone(); - adapter.block_accepted(&b); + adapter.block_accepted(&b, opts); } Ok((None, Some(b.clone()))) }, @@ -306,6 +306,17 @@ pub fn process_block(&self, b: Block, opts: Options) } } + /// Process a block header received during "header first" propagation. + pub fn process_block_header( + &self, + bh: &BlockHeader, + opts: Options, + ) -> Result, Error> { + let header_head = self.get_header_head()?; + let ctx = self.ctx_from_head(header_head, opts); + pipe::process_block_header(bh, ctx) + } + /// Attempt to add a new header to the header chain. /// This is only ever used during sync and uses sync_head. pub fn sync_block_header( diff --git a/chain/src/lib.rs b/chain/src/lib.rs index b3faf228a..557e55c98 100644 --- a/chain/src/lib.rs +++ b/chain/src/lib.rs @@ -43,4 +43,4 @@ pub mod types; // Re-export the base interface pub use chain::Chain; -pub use types::{ChainAdapter, ChainStore, Error, Options, Tip, NONE, SKIP_POW, SYNC}; +pub use types::{ChainAdapter, ChainStore, Error, Options, Tip, NONE, SKIP_POW, SYNC, MINE}; diff --git a/chain/src/pipe.rs b/chain/src/pipe.rs index 1d652a3ea..13cdb844e 100644 --- a/chain/src/pipe.rs +++ b/chain/src/pipe.rs @@ -92,7 +92,7 @@ pub fn process_block(b: &Block, mut ctx: BlockContext) -> Result, Er validate_block(b, &mut ctx, &mut extension)?; debug!( LOGGER, - "pipe: process_block {} at {} is valid, save and append.", + "pipe: process_block: {} at {} is valid, save and append.", b.hash(), b.header.height, ); @@ -113,12 +113,7 @@ pub fn sync_block_header( mut sync_ctx: BlockContext, mut header_ctx: BlockContext, ) -> Result, Error> { - debug!( - LOGGER, - "pipe: sync_block_header {} at {}", - bh.hash(), - bh.height - ); + debug!(LOGGER, "pipe: sync_block_header: {} at {}", bh.hash(), bh.height); validate_header(&bh, &mut sync_ctx)?; add_block_header(bh, &mut sync_ctx)?; @@ -133,6 +128,47 @@ pub fn sync_block_header( update_sync_head(bh, &mut sync_ctx) } +/// Process block header as part of "header first" block propagation. +pub fn process_block_header( + bh: &BlockHeader, + mut ctx: BlockContext, +) -> Result, Error> { + debug!(LOGGER, "pipe: process_block_header: {} at {}", bh.hash(), bh.height); + + check_header_known(bh.hash(), &mut ctx)?; + validate_header(&bh, &mut ctx)?; + + debug!( + LOGGER, + "pipe: process_block_header: {} at {} is valid, saving.", + bh.hash(), + bh.height, + ); + + add_block_header(bh, &mut ctx)?; + + // now update the header_head (if new header with most work) + update_header_head(bh, &mut ctx) +} + +/// Quick in-memory check to fast-reject any block header we've already handled +/// recently. Keeps duplicates from the network in check. +/// ctx here is specific to the header_head (tip of the header chain) +fn check_header_known(bh: Hash, ctx: &mut BlockContext) -> Result<(), Error> { + // TODO ring buffer of the last few blocks that came through here + if bh == ctx.head.last_block_h || bh == ctx.head.prev_block_h { + return Err(Error::Unfit("already known".to_string())); + } + if let Ok(h) = ctx.store.get_block_header(&bh) { + // there is a window where a block header can be saved but the chain head not + // updated yet, we plug that window here by re-accepting the block + if h.total_difficulty <= ctx.head.total_difficulty { + return Err(Error::Unfit("already in store".to_string())); + } + } + Ok(()) +} + /// Quick in-memory check to fast-reject any block we've already handled /// recently. Keeps duplicates from the network in check. fn check_known(bh: Hash, ctx: &mut BlockContext) -> Result<(), Error> { @@ -174,7 +210,7 @@ fn validate_header(header: &BlockHeader, ctx: &mut BlockContext) -> Result<(), E return Err(Error::InvalidBlockTime); } - if !ctx.opts.intersects(SKIP_POW) { + if !ctx.opts.contains(SKIP_POW) { let n = global::sizeshift() as u32; if !(ctx.pow_verifier)(header, n) { error!(LOGGER, "pipe: validate_header failed for cuckoo shift size {}", n); @@ -205,7 +241,7 @@ fn validate_header(header: &BlockHeader, ctx: &mut BlockContext) -> Result<(), E return Err(Error::InvalidBlockTime); } - if !ctx.opts.intersects(SKIP_POW) { + if !ctx.opts.contains(SKIP_POW) { // verify the proof of work and related parameters // explicit check to ensure we are not below the minimum difficulty @@ -325,7 +361,7 @@ fn update_head(b: &Block, ctx: &mut BlockContext) -> Result, Error> // in sync mode, only update the "body chain", otherwise update both the // "header chain" and "body chain", updating the header chain in sync resets // all additional "future" headers we've received - if ctx.opts.intersects(SYNC) { + if ctx.opts.contains(SYNC) { ctx.store .save_body_head(&tip) .map_err(|e| Error::StoreErr(e, "pipe save body".to_owned()))?; @@ -365,7 +401,6 @@ fn update_sync_head(bh: &BlockHeader, ctx: &mut BlockContext) -> Result Result, Error> { let tip = Tip::from_block(bh); - debug!(LOGGER, "pipe: update_header_head: {}, {}", tip.total_difficulty, ctx.head.total_difficulty); if tip.total_difficulty > ctx.head.total_difficulty { ctx.store .save_header_head(&tip) diff --git a/chain/src/types.rs b/chain/src/types.rs index 902df6812..6bf27ff4a 100644 --- a/chain/src/types.rs +++ b/chain/src/types.rs @@ -28,12 +28,14 @@ use grin_store; bitflags! { /// Options for block validation pub flags Options: u32 { - /// None flag - const NONE = 0b00000001, + /// No flags + const NONE = 0b00000000, /// Runs without checking the Proof of Work, mostly to make testing easier. - const SKIP_POW = 0b00000010, + const SKIP_POW = 0b00000001, /// Adds block while in syncing mode. - const SYNC = 0b00001000, + const SYNC = 0b00000010, + /// Block validation on a block we mined ourselves + const MINE = 0b00000100, } } @@ -271,11 +273,11 @@ pub trait ChainStore: Send + Sync { pub trait ChainAdapter { /// The blockchain pipeline has accepted this block as valid and added /// it to our chain. - fn block_accepted(&self, b: &Block); + fn block_accepted(&self, b: &Block, opts: Options); } /// Dummy adapter used as a placeholder for real implementations pub struct NoopAdapter {} impl ChainAdapter for NoopAdapter { - fn block_accepted(&self, _: &Block) {} + fn block_accepted(&self, _: &Block, _: Options) {} } diff --git a/chain/tests/mine_simple_chain.rs b/chain/tests/mine_simple_chain.rs index feb74bb09..771072c8d 100644 --- a/chain/tests/mine_simple_chain.rs +++ b/chain/tests/mine_simple_chain.rs @@ -96,7 +96,7 @@ fn mine_empty_chain() { ).unwrap(); let bhash = b.hash(); - chain.process_block(b, chain::NONE).unwrap(); + chain.process_block(b, chain::MINE).unwrap(); // checking our new head let head = chain.head().unwrap(); diff --git a/chain/tests/test_coinbase_maturity.rs b/chain/tests/test_coinbase_maturity.rs index b0105c765..51e0565dd 100644 --- a/chain/tests/test_coinbase_maturity.rs +++ b/chain/tests/test_coinbase_maturity.rs @@ -105,7 +105,7 @@ fn test_coinbase_maturity() { // we will need this later when we want to spend the coinbase output let block_hash = block.hash(); - chain.process_block(block, chain::NONE).unwrap(); + chain.process_block(block, chain::MINE).unwrap(); let prev = chain.head_header().unwrap(); @@ -178,7 +178,7 @@ fn test_coinbase_maturity() { global::sizeshift() as u32, ).unwrap(); - chain.process_block(block, chain::NONE).unwrap(); + chain.process_block(block, chain::MINE).unwrap(); } let prev = chain.head_header().unwrap(); @@ -213,7 +213,7 @@ fn test_coinbase_maturity() { global::sizeshift() as u32, ).unwrap(); - let result = chain.process_block(block, chain::NONE); + let result = chain.process_block(block, chain::MINE); match result { Ok(_) => (), Err(Error::ImmatureCoinbase) => panic!("we should not get an ImmatureCoinbase here"), diff --git a/grin/src/adapters.rs b/grin/src/adapters.rs index b5f83ae39..85967f108 100644 --- a/grin/src/adapters.rs +++ b/grin/src/adapters.rs @@ -16,7 +16,7 @@ use std::net::SocketAddr; use std::sync::{Arc, RwLock}; use std::sync::atomic::{AtomicBool, Ordering}; -use chain::{self, ChainAdapter}; +use chain::{self, ChainAdapter, Options, MINE}; use core::core; use core::core::block::BlockHeader; use core::core::hash::{Hash, Hashed}; @@ -35,6 +35,7 @@ pub struct NetToChainAdapter { currently_syncing: Arc, chain: Arc, tx_pool: Arc>>, + peers: OneTime, } impl p2p::ChainAdapter for NetToChainAdapter { @@ -64,13 +65,14 @@ impl p2p::ChainAdapter for NetToChainAdapter { } } - fn block_received(&self, b: core::Block, _: SocketAddr) -> bool { + fn block_received(&self, b: core::Block, addr: SocketAddr) -> bool { let bhash = b.hash(); debug!( LOGGER, - "Received block {} at {} from network, going to process.", + "Received block {} at {} from {}, going to process.", bhash, b.header.height, + addr, ); // pushing the new block through the chain pipeline @@ -86,6 +88,41 @@ impl p2p::ChainAdapter for NetToChainAdapter { true } + fn header_received(&self, bh: core::BlockHeader, addr: SocketAddr) -> bool { + let bhash = bh.hash(); + debug!( + LOGGER, + "Received block header {} at {} from {}, going to process.", + bhash, + bh.height, + addr, + ); + + // pushing the new block header through the header chain pipeline + // we will go ask for the block if this is a new header + let res = self.chain.process_block_header(&bh, self.chain_opts()); + + if let &Err(ref e) = &res { + debug!(LOGGER, "Block header {} refused by chain: {:?}", bhash, e); + if e.is_bad_block() { + debug!(LOGGER, "header_received: {} is a bad header, resetting header head", bhash); + let _ = self.chain.reset_head(); + return false; + } else { + // we got an error when trying to process the block header + // but nothing serious enough to need to ban the peer upstream + return true; + } + } + + // we have successfully processed a block header + // so we can go request the block itself + self.request_block(&bh, &addr); + + // done receiving the header + true + } + fn headers_received(&self, bhs: Vec, addr: SocketAddr) { info!( LOGGER, @@ -197,9 +234,14 @@ impl NetToChainAdapter { currently_syncing: currently_syncing, chain: chain_ref, tx_pool: tx_pool, + peers: OneTime::new(), } } + pub fn init(&self, peers: p2p::Peers) { + self.peers.init(peers); + } + // recursively go back through the locator vector and stop when we find // a header that we recognize this will be a header shared in common // between us and the peer @@ -234,6 +276,28 @@ impl NetToChainAdapter { } } + // After we have received a block header in "header first" propagation + // we need to go request the block from the same peer that gave us the header + // (unless we have already accepted the block) + // + // TODO - currently only request block from a single peer + // consider additional peers for redundancy? + fn request_block(&self, bh: &BlockHeader, addr: &SocketAddr) { + if let None = self.peers.borrow().adapter.get_block(bh.hash()) { + if let Some(_) = self.peers.borrow().adapter.get_block(bh.previous) { + if let Some(peer) = self.peers.borrow().get_connected_peer(addr) { + if let Ok(peer) = peer.read() { + let _ = peer.send_block_request(bh.hash()); + } + } + } else { + debug!(LOGGER, "request_block: prev block {} missing, skipping", bh.previous); + } + } else { + debug!(LOGGER, "request_block: block {} already known", bh.hash()); + } + } + /// Prepare options for the chain pipeline fn chain_opts(&self) -> chain::Options { let opts = if self.currently_syncing.load(Ordering::Relaxed) { @@ -254,7 +318,7 @@ pub struct ChainToPoolAndNetAdapter { } impl ChainAdapter for ChainToPoolAndNetAdapter { - fn block_accepted(&self, b: &core::Block) { + fn block_accepted(&self, b: &core::Block, opts: Options) { { if let Err(e) = self.tx_pool.write().unwrap().reconcile_block(b) { error!( @@ -265,7 +329,16 @@ impl ChainAdapter for ChainToPoolAndNetAdapter { ); } } - self.peers.borrow().broadcast_block(b); + + // If we mined the block then we want to broadcast the block itself. + // But if we received the block from another node then broadcast "header first" + // to minimize network traffic. + if opts.contains(MINE) { + self.peers.borrow().broadcast_block(b); + } else { + // "header first" propagation if we are not the originator of this block + self.peers.borrow().broadcast_header(&b.header); + } } } diff --git a/grin/src/miner.rs b/grin/src/miner.rs index 0d5b12383..e746f74a9 100644 --- a/grin/src/miner.rs +++ b/grin/src/miner.rs @@ -198,7 +198,7 @@ impl Miner { if let Some(s) = job_handle.get_solution() { let proof = Proof::new(s.solution_nonces.to_vec()); let proof_diff = proof.clone().to_difficulty(); - trace!(LOGGER, "Found cuckoo solution for nonce {} of difficulty {} (difficulty target {})", + trace!(LOGGER, "Found cuckoo solution for nonce {} of difficulty {} (difficulty target {})", s.get_nonce_as_u64(), proof_diff.into_num(), difficulty.into_num()); @@ -302,7 +302,7 @@ impl Miner { let pow_hash = b.hash(); if let Ok(proof) = plugin_miner.mine(&pow_hash[..]) { let proof_diff = proof.clone().to_difficulty(); - trace!(LOGGER, "Found cuckoo solution for nonce {} of difficulty {} (difficulty target {})", + trace!(LOGGER, "Found cuckoo solution for nonce {} of difficulty {} (difficulty target {})", b.header.nonce, proof_diff.into_num(), b.header.difficulty.into_num()); @@ -528,7 +528,7 @@ impl Miner { ); } - // if we found a solution, push our block out + // we found a solution, push our block through the chain processing pipeline if let Some(proof) = sol { info!( LOGGER, @@ -537,7 +537,7 @@ impl Miner { b.hash() ); b.header.pow = proof; - let res = self.chain.process_block(b, chain::NONE); + let res = self.chain.process_block(b, chain::MINE); if let Err(e) = res { error!( LOGGER, diff --git a/grin/src/server.rs b/grin/src/server.rs index 5b07316bc..3898258b7 100644 --- a/grin/src/server.rs +++ b/grin/src/server.rs @@ -133,6 +133,7 @@ impl Server { )?); chain_adapter.init(p2p_server.peers.clone()); pool_net_adapter.init(p2p_server.peers.clone()); + net_adapter.init(p2p_server.peers.clone()); let seed = seed::Seeder::new( config.capabilities, p2p_server.clone(), p2p_server.peers.clone()); diff --git a/grin/src/sync.rs b/grin/src/sync.rs index 9a300da04..cf9e9c6f2 100644 --- a/grin/src/sync.rs +++ b/grin/src/sync.rs @@ -211,7 +211,7 @@ pub fn needs_syncing( if let Some(peer) = peer { if let Ok(peer) = peer.try_read() { if peer.info.total_difficulty <= local_diff { - info!(LOGGER, "synchronize stopped, at {:?} @ {:?}", local_diff, chain.head().unwrap().height); + info!(LOGGER, "synchronized at {:?} @ {:?}", local_diff, chain.head().unwrap().height); currently_syncing.store(false, Ordering::Relaxed); let _ = chain.reset_head(); } diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index 6a406fa2b..313080076 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -59,6 +59,7 @@ enum_from_primitive! { GetPeerAddrs, PeerAddrs, GetHeaders, + Header, Headers, GetBlock, Block, @@ -426,6 +427,7 @@ impl Readable for SockAddr { } /// Serializable wrapper for the block locator. +#[derive(Debug)] pub struct Locator { pub hashes: Vec, } diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index cd2951352..44d9ea6a6 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -152,6 +152,27 @@ impl Peer { debug!(LOGGER, "Send block {} to {}", b.hash(), self.info.addr); self.proto.send_block(b) } else { + debug!( + LOGGER, + "Suppress block send {} to {} (already seen)", + b.hash(), + self.info.addr, + ); + Ok(()) + } + } + + pub fn send_header(&self, bh: &core::BlockHeader) -> Result<(), Error> { + if !self.tracking_adapter.has(bh.hash()) { + debug!(LOGGER, "Send header {} to {}", bh.hash(), self.info.addr); + self.proto.send_header(bh) + } else { + debug!( + LOGGER, + "Suppress header send {} to {} (already seen)", + bh.hash(), + self.info.addr, + ); Ok(()) } } @@ -160,8 +181,10 @@ impl Peer { /// dropped if the remote peer is known to already have the transaction. pub fn send_transaction(&self, tx: &core::Transaction) -> Result<(), Error> { if !self.tracking_adapter.has(tx.hash()) { + debug!(LOGGER, "Send tx {} to {}", tx.hash(), self.info.addr); self.proto.send_transaction(tx) } else { + debug!(LOGGER, "Not sending tx {} to {} (already seen)", tx.hash(), self.info.addr); Ok(()) } } @@ -171,12 +194,7 @@ impl Peer { } pub fn send_block_request(&self, h: Hash) -> Result<(), Error> { - debug!( - LOGGER, - "Requesting block {} from peer {}.", - h, - self.info.addr - ); + debug!(LOGGER, "Requesting block {} from {}", h, self.info.addr); self.proto.send_block_request(h) } @@ -209,7 +227,7 @@ impl TrackingAdapter { fn has(&self, hash: Hash) -> bool { let known = self.known.read().unwrap(); // may become too slow, an ordered set (by timestamp for eviction) may - // end up being a better choice + // end up being a better choice known.contains(&hash) } @@ -241,6 +259,11 @@ impl ChainAdapter for TrackingAdapter { self.adapter.block_received(b, addr) } + fn header_received(&self, bh: core::BlockHeader, addr: SocketAddr) -> bool { + self.push(bh.hash()); + self.adapter.header_received(bh, addr) + } + fn headers_received(&self, bh: Vec, addr: SocketAddr) { self.adapter.headers_received(bh, addr) } diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index 529a4807e..09e4e7859 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -19,7 +19,7 @@ use std::sync::{Arc, RwLock}; use rand::{thread_rng, Rng}; use core::core; -use core::core::hash::Hash; +use core::core::hash::{Hash, Hashed}; use core::core::target::Difficulty; use util::LOGGER; use time; @@ -237,7 +237,7 @@ impl Peers { } debug!( LOGGER, - "broadcast_block: {}, {} at {}, to {} peers", + "broadcast_block: {}, {} at {}, to {} peers, done.", b.hash(), b.header.total_difficulty, b.header.height, @@ -245,6 +245,35 @@ impl Peers { ); } + /// Broadcasts the provided block to PEER_PREFERRED_COUNT of our peers. + /// We may be connected to PEER_MAX_COUNT peers so we only + /// want to broadcast to a random subset of peers. + /// A peer implementation may drop the broadcast request + /// if it knows the remote peer already has the block. + pub fn broadcast_header(&self, bh: &core::BlockHeader) { + let peers = self.connected_peers(); + let preferred_peers = 8; + let mut count = 0; + for p in peers.iter().take(preferred_peers) { + let p = p.read().unwrap(); + if p.is_connected() { + if let Err(e) = p.send_header(bh) { + debug!(LOGGER, "Error sending header to peer: {:?}", e); + } else { + count += 1; + } + } + } + debug!( + LOGGER, + "broadcast_header: {}, {} at {}, to {} peers, done.", + bh.hash(), + bh.total_difficulty, + bh.height, + count, + ); + } + /// Broadcasts the provided transaction to PEER_PREFERRED_COUNT of our peers. /// We may be connected to PEER_MAX_COUNT peers so we only /// want to broadcast to a random subset of peers. @@ -390,8 +419,18 @@ impl ChainAdapter for Peers { } fn block_received(&self, b: core::Block, peer_addr: SocketAddr) -> bool { if !self.adapter.block_received(b, peer_addr) { - // if the peer sent us a block that's intrinsically bad, they're either - // mistaken or manevolent, both of which require a ban + // if the peer sent us a block that's intrinsically bad + // they are either mistaken or manevolent, both of which require a ban + self.ban_peer(&peer_addr); + false + } else { + true + } + } + fn header_received(&self, bh: core::BlockHeader, peer_addr: SocketAddr) -> bool { + if !self.adapter.header_received(bh, peer_addr) { + // if the peer sent us a block header that's intrinsically bad + // they are either mistaken or manevolent, both of which require a ban self.ban_peer(&peer_addr); false } else { diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index 97a69fda8..283f1efb8 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -1,4 +1,4 @@ -// Copyright 2016 The Grin Developers +// Copyright 2018 The Grin Developers // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -21,7 +21,7 @@ use futures_cpupool::CpuPool; use tokio_core::net::TcpStream; use core::core; -use core::core::hash::Hash; +use core::core::hash::{Hash, Hashed}; use core::core::target::Difficulty; use core::ser; use conn::TimeoutConnection; @@ -83,6 +83,11 @@ impl Protocol for ProtocolV1 { self.send_msg(Type::Block, b) } + /// Serializes and sends a block header to our remote peer ("header first" propagation) + fn send_header(&self, bh: &core::BlockHeader) -> Result<(), Error> { + self.send_msg(Type::Header, bh) + } + /// Serializes and sends a transaction to our remote peer fn send_transaction(&self, tx: &core::Transaction) -> Result<(), Error> { self.send_msg(Type::Transaction, tx) @@ -172,12 +177,14 @@ fn handle_payload( }, Type::Transaction => { let tx = ser::deserialize::(&mut &buf[..])?; + debug!(LOGGER, "handle_payload: Transaction: {}", tx.hash()); + adapter.transaction_received(tx); Ok(None) } Type::GetBlock => { let h = ser::deserialize::(&mut &buf[..])?; - debug!(LOGGER, "handle_payload: GetBlock {}", h); + debug!(LOGGER, "handle_payload: GetBlock: {}", h); let bo = adapter.get_block(h); if let Some(b) = bo { @@ -199,15 +206,16 @@ fn handle_payload( Type::Block => { let b = ser::deserialize::(&mut &buf[..])?; let bh = b.hash(); - - debug!(LOGGER, "handle_payload: Block {}", bh); + debug!(LOGGER, "handle_payload: Block: {}", bh); adapter.block_received(b, addr); Ok(Some(bh)) } + // A peer is asking us for some headers via a locator Type::GetHeaders => { - // load headers from the locator let loc = ser::deserialize::(&mut &buf[..])?; + debug!(LOGGER, "handle_payload: GetHeaders: {:?}", loc); + let headers = adapter.locate_headers(loc.hashes); // serialize and send all the headers over @@ -228,8 +236,23 @@ fn handle_payload( Ok(None) } + // "header first" block propagation - if we have not yet seen this block + // we can go request it from some of our peers + Type::Header => { + let header = ser::deserialize::(&mut &buf[..])?; + debug!(LOGGER, "handle_payload: Header: {}", header.hash()); + + adapter.header_received(header, addr); + + // we do not return a hash here as we never request a single header + // a header will always arrive unsolicited + Ok(None) + } + // receive headers as part of the sync process Type::Headers => { let headers = ser::deserialize::(&mut &buf[..])?; + debug!(LOGGER, "handle_payload: Headers: {}", headers.headers.len()); + adapter.headers_received(headers.headers, addr); Ok(None) } diff --git a/p2p/src/server.rs b/p2p/src/server.rs index d35604104..b5d3c93f1 100644 --- a/p2p/src/server.rs +++ b/p2p/src/server.rs @@ -48,10 +48,11 @@ impl ChainAdapter for DummyAdapter { fn total_height(&self) -> u64 { 0 } - fn transaction_received(&self, _: core::Transaction) {} - fn block_received(&self, _: core::Block, _: SocketAddr) -> bool { true } - fn headers_received(&self, _: Vec, _:SocketAddr) {} - fn locate_headers(&self, _: Vec) -> Vec { + fn transaction_received(&self, _tx: core::Transaction) {} + fn block_received(&self, _b: core::Block, _addr: SocketAddr) -> bool { true } + fn header_received(&self, _bh: core::BlockHeader, _addr: SocketAddr) -> bool { true } + fn headers_received(&self, _bh: Vec, _addr:SocketAddr) {} + fn locate_headers(&self, _loc: Vec) -> Vec { vec![] } fn get_block(&self, _: Hash) -> Option { diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 2ebf92f54..71cda1a7c 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -141,6 +141,9 @@ pub trait Protocol { /// Relays a block to the remote peer. fn send_block(&self, b: &core::Block) -> Result<(), Error>; + /// Relays a block header to the remote peer ("header first" propagation). + fn send_header(&self, bh: &core::BlockHeader) -> Result<(), Error>; + /// Relays a transaction to the remote peer. fn send_transaction(&self, tx: &core::Transaction) -> Result<(), Error>; @@ -175,10 +178,12 @@ pub trait ChainAdapter: Sync + Send { /// A block has been received from one of our peers. Returns true if the /// block could be handled properly and is not deemed defective by the - /// chain. Returning false means the block will nenver be valid and + /// chain. Returning false means the block will never be valid and /// may result in the peer being banned. fn block_received(&self, b: core::Block, addr: SocketAddr) -> bool; + fn header_received(&self, b: core::BlockHeader, addr: SocketAddr) -> bool; + /// A set of block header has been received, typically in response to a /// block /// header request.