mirror of
https://github.com/mimblewimble/grin.git
synced 2025-02-01 17:01:09 +03:00
header first propagation (#654)
* [wip] header first propagation successfully propagating headers (unless we mined the block itself) not yet asking for the block if we receive a header * call request_block after successful processing header * cleanup and skip asking for block if header is an orphan * comments around error handling in receive_header
This commit is contained in:
parent
765996a630
commit
6647823177
16 changed files with 273 additions and 58 deletions
|
@ -237,10 +237,10 @@ pub fn process_block(&self, b: Block, opts: Options)
|
||||||
}
|
}
|
||||||
|
|
||||||
// notifying other parts of the system of the update
|
// notifying other parts of the system of the update
|
||||||
if !opts.intersects(SYNC) {
|
if !opts.contains(SYNC) {
|
||||||
// broadcast the block
|
// broadcast the block
|
||||||
let adapter = self.adapter.clone();
|
let adapter = self.adapter.clone();
|
||||||
adapter.block_accepted(&b);
|
adapter.block_accepted(&b, opts);
|
||||||
}
|
}
|
||||||
Ok((Some(tip.clone()), Some(b.clone())))
|
Ok((Some(tip.clone()), Some(b.clone())))
|
||||||
},
|
},
|
||||||
|
@ -254,10 +254,10 @@ pub fn process_block(&self, b: Block, opts: Options)
|
||||||
// or less relevant blocks somehow.
|
// or less relevant blocks somehow.
|
||||||
// We should also probably consider banning nodes that send us really old blocks.
|
// We should also probably consider banning nodes that send us really old blocks.
|
||||||
//
|
//
|
||||||
if !opts.intersects(SYNC) {
|
if !opts.contains(SYNC) {
|
||||||
// broadcast the block
|
// broadcast the block
|
||||||
let adapter = self.adapter.clone();
|
let adapter = self.adapter.clone();
|
||||||
adapter.block_accepted(&b);
|
adapter.block_accepted(&b, opts);
|
||||||
}
|
}
|
||||||
Ok((None, Some(b.clone())))
|
Ok((None, Some(b.clone())))
|
||||||
},
|
},
|
||||||
|
@ -306,6 +306,17 @@ pub fn process_block(&self, b: Block, opts: Options)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Process a block header received during "header first" propagation.
|
||||||
|
pub fn process_block_header(
|
||||||
|
&self,
|
||||||
|
bh: &BlockHeader,
|
||||||
|
opts: Options,
|
||||||
|
) -> Result<Option<Tip>, Error> {
|
||||||
|
let header_head = self.get_header_head()?;
|
||||||
|
let ctx = self.ctx_from_head(header_head, opts);
|
||||||
|
pipe::process_block_header(bh, ctx)
|
||||||
|
}
|
||||||
|
|
||||||
/// Attempt to add a new header to the header chain.
|
/// Attempt to add a new header to the header chain.
|
||||||
/// This is only ever used during sync and uses sync_head.
|
/// This is only ever used during sync and uses sync_head.
|
||||||
pub fn sync_block_header(
|
pub fn sync_block_header(
|
||||||
|
|
|
@ -43,4 +43,4 @@ pub mod types;
|
||||||
// Re-export the base interface
|
// Re-export the base interface
|
||||||
|
|
||||||
pub use chain::Chain;
|
pub use chain::Chain;
|
||||||
pub use types::{ChainAdapter, ChainStore, Error, Options, Tip, NONE, SKIP_POW, SYNC};
|
pub use types::{ChainAdapter, ChainStore, Error, Options, Tip, NONE, SKIP_POW, SYNC, MINE};
|
||||||
|
|
|
@ -92,7 +92,7 @@ pub fn process_block(b: &Block, mut ctx: BlockContext) -> Result<Option<Tip>, Er
|
||||||
validate_block(b, &mut ctx, &mut extension)?;
|
validate_block(b, &mut ctx, &mut extension)?;
|
||||||
debug!(
|
debug!(
|
||||||
LOGGER,
|
LOGGER,
|
||||||
"pipe: process_block {} at {} is valid, save and append.",
|
"pipe: process_block: {} at {} is valid, save and append.",
|
||||||
b.hash(),
|
b.hash(),
|
||||||
b.header.height,
|
b.header.height,
|
||||||
);
|
);
|
||||||
|
@ -113,12 +113,7 @@ pub fn sync_block_header(
|
||||||
mut sync_ctx: BlockContext,
|
mut sync_ctx: BlockContext,
|
||||||
mut header_ctx: BlockContext,
|
mut header_ctx: BlockContext,
|
||||||
) -> Result<Option<Tip>, Error> {
|
) -> Result<Option<Tip>, Error> {
|
||||||
debug!(
|
debug!(LOGGER, "pipe: sync_block_header: {} at {}", bh.hash(), bh.height);
|
||||||
LOGGER,
|
|
||||||
"pipe: sync_block_header {} at {}",
|
|
||||||
bh.hash(),
|
|
||||||
bh.height
|
|
||||||
);
|
|
||||||
|
|
||||||
validate_header(&bh, &mut sync_ctx)?;
|
validate_header(&bh, &mut sync_ctx)?;
|
||||||
add_block_header(bh, &mut sync_ctx)?;
|
add_block_header(bh, &mut sync_ctx)?;
|
||||||
|
@ -133,6 +128,47 @@ pub fn sync_block_header(
|
||||||
update_sync_head(bh, &mut sync_ctx)
|
update_sync_head(bh, &mut sync_ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Process block header as part of "header first" block propagation.
|
||||||
|
pub fn process_block_header(
|
||||||
|
bh: &BlockHeader,
|
||||||
|
mut ctx: BlockContext,
|
||||||
|
) -> Result<Option<Tip>, Error> {
|
||||||
|
debug!(LOGGER, "pipe: process_block_header: {} at {}", bh.hash(), bh.height);
|
||||||
|
|
||||||
|
check_header_known(bh.hash(), &mut ctx)?;
|
||||||
|
validate_header(&bh, &mut ctx)?;
|
||||||
|
|
||||||
|
debug!(
|
||||||
|
LOGGER,
|
||||||
|
"pipe: process_block_header: {} at {} is valid, saving.",
|
||||||
|
bh.hash(),
|
||||||
|
bh.height,
|
||||||
|
);
|
||||||
|
|
||||||
|
add_block_header(bh, &mut ctx)?;
|
||||||
|
|
||||||
|
// now update the header_head (if new header with most work)
|
||||||
|
update_header_head(bh, &mut ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Quick in-memory check to fast-reject any block header we've already handled
|
||||||
|
/// recently. Keeps duplicates from the network in check.
|
||||||
|
/// ctx here is specific to the header_head (tip of the header chain)
|
||||||
|
fn check_header_known(bh: Hash, ctx: &mut BlockContext) -> Result<(), Error> {
|
||||||
|
// TODO ring buffer of the last few blocks that came through here
|
||||||
|
if bh == ctx.head.last_block_h || bh == ctx.head.prev_block_h {
|
||||||
|
return Err(Error::Unfit("already known".to_string()));
|
||||||
|
}
|
||||||
|
if let Ok(h) = ctx.store.get_block_header(&bh) {
|
||||||
|
// there is a window where a block header can be saved but the chain head not
|
||||||
|
// updated yet, we plug that window here by re-accepting the block
|
||||||
|
if h.total_difficulty <= ctx.head.total_difficulty {
|
||||||
|
return Err(Error::Unfit("already in store".to_string()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Quick in-memory check to fast-reject any block we've already handled
|
/// Quick in-memory check to fast-reject any block we've already handled
|
||||||
/// recently. Keeps duplicates from the network in check.
|
/// recently. Keeps duplicates from the network in check.
|
||||||
fn check_known(bh: Hash, ctx: &mut BlockContext) -> Result<(), Error> {
|
fn check_known(bh: Hash, ctx: &mut BlockContext) -> Result<(), Error> {
|
||||||
|
@ -174,7 +210,7 @@ fn validate_header(header: &BlockHeader, ctx: &mut BlockContext) -> Result<(), E
|
||||||
return Err(Error::InvalidBlockTime);
|
return Err(Error::InvalidBlockTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
if !ctx.opts.intersects(SKIP_POW) {
|
if !ctx.opts.contains(SKIP_POW) {
|
||||||
let n = global::sizeshift() as u32;
|
let n = global::sizeshift() as u32;
|
||||||
if !(ctx.pow_verifier)(header, n) {
|
if !(ctx.pow_verifier)(header, n) {
|
||||||
error!(LOGGER, "pipe: validate_header failed for cuckoo shift size {}", n);
|
error!(LOGGER, "pipe: validate_header failed for cuckoo shift size {}", n);
|
||||||
|
@ -205,7 +241,7 @@ fn validate_header(header: &BlockHeader, ctx: &mut BlockContext) -> Result<(), E
|
||||||
return Err(Error::InvalidBlockTime);
|
return Err(Error::InvalidBlockTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
if !ctx.opts.intersects(SKIP_POW) {
|
if !ctx.opts.contains(SKIP_POW) {
|
||||||
// verify the proof of work and related parameters
|
// verify the proof of work and related parameters
|
||||||
|
|
||||||
// explicit check to ensure we are not below the minimum difficulty
|
// explicit check to ensure we are not below the minimum difficulty
|
||||||
|
@ -325,7 +361,7 @@ fn update_head(b: &Block, ctx: &mut BlockContext) -> Result<Option<Tip>, Error>
|
||||||
// in sync mode, only update the "body chain", otherwise update both the
|
// in sync mode, only update the "body chain", otherwise update both the
|
||||||
// "header chain" and "body chain", updating the header chain in sync resets
|
// "header chain" and "body chain", updating the header chain in sync resets
|
||||||
// all additional "future" headers we've received
|
// all additional "future" headers we've received
|
||||||
if ctx.opts.intersects(SYNC) {
|
if ctx.opts.contains(SYNC) {
|
||||||
ctx.store
|
ctx.store
|
||||||
.save_body_head(&tip)
|
.save_body_head(&tip)
|
||||||
.map_err(|e| Error::StoreErr(e, "pipe save body".to_owned()))?;
|
.map_err(|e| Error::StoreErr(e, "pipe save body".to_owned()))?;
|
||||||
|
@ -365,7 +401,6 @@ fn update_sync_head(bh: &BlockHeader, ctx: &mut BlockContext) -> Result<Option<T
|
||||||
|
|
||||||
fn update_header_head(bh: &BlockHeader, ctx: &mut BlockContext) -> Result<Option<Tip>, Error> {
|
fn update_header_head(bh: &BlockHeader, ctx: &mut BlockContext) -> Result<Option<Tip>, Error> {
|
||||||
let tip = Tip::from_block(bh);
|
let tip = Tip::from_block(bh);
|
||||||
debug!(LOGGER, "pipe: update_header_head: {}, {}", tip.total_difficulty, ctx.head.total_difficulty);
|
|
||||||
if tip.total_difficulty > ctx.head.total_difficulty {
|
if tip.total_difficulty > ctx.head.total_difficulty {
|
||||||
ctx.store
|
ctx.store
|
||||||
.save_header_head(&tip)
|
.save_header_head(&tip)
|
||||||
|
|
|
@ -28,12 +28,14 @@ use grin_store;
|
||||||
bitflags! {
|
bitflags! {
|
||||||
/// Options for block validation
|
/// Options for block validation
|
||||||
pub flags Options: u32 {
|
pub flags Options: u32 {
|
||||||
/// None flag
|
/// No flags
|
||||||
const NONE = 0b00000001,
|
const NONE = 0b00000000,
|
||||||
/// Runs without checking the Proof of Work, mostly to make testing easier.
|
/// Runs without checking the Proof of Work, mostly to make testing easier.
|
||||||
const SKIP_POW = 0b00000010,
|
const SKIP_POW = 0b00000001,
|
||||||
/// Adds block while in syncing mode.
|
/// Adds block while in syncing mode.
|
||||||
const SYNC = 0b00001000,
|
const SYNC = 0b00000010,
|
||||||
|
/// Block validation on a block we mined ourselves
|
||||||
|
const MINE = 0b00000100,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -271,11 +273,11 @@ pub trait ChainStore: Send + Sync {
|
||||||
pub trait ChainAdapter {
|
pub trait ChainAdapter {
|
||||||
/// The blockchain pipeline has accepted this block as valid and added
|
/// The blockchain pipeline has accepted this block as valid and added
|
||||||
/// it to our chain.
|
/// it to our chain.
|
||||||
fn block_accepted(&self, b: &Block);
|
fn block_accepted(&self, b: &Block, opts: Options);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Dummy adapter used as a placeholder for real implementations
|
/// Dummy adapter used as a placeholder for real implementations
|
||||||
pub struct NoopAdapter {}
|
pub struct NoopAdapter {}
|
||||||
impl ChainAdapter for NoopAdapter {
|
impl ChainAdapter for NoopAdapter {
|
||||||
fn block_accepted(&self, _: &Block) {}
|
fn block_accepted(&self, _: &Block, _: Options) {}
|
||||||
}
|
}
|
||||||
|
|
|
@ -96,7 +96,7 @@ fn mine_empty_chain() {
|
||||||
).unwrap();
|
).unwrap();
|
||||||
|
|
||||||
let bhash = b.hash();
|
let bhash = b.hash();
|
||||||
chain.process_block(b, chain::NONE).unwrap();
|
chain.process_block(b, chain::MINE).unwrap();
|
||||||
|
|
||||||
// checking our new head
|
// checking our new head
|
||||||
let head = chain.head().unwrap();
|
let head = chain.head().unwrap();
|
||||||
|
|
|
@ -105,7 +105,7 @@ fn test_coinbase_maturity() {
|
||||||
// we will need this later when we want to spend the coinbase output
|
// we will need this later when we want to spend the coinbase output
|
||||||
let block_hash = block.hash();
|
let block_hash = block.hash();
|
||||||
|
|
||||||
chain.process_block(block, chain::NONE).unwrap();
|
chain.process_block(block, chain::MINE).unwrap();
|
||||||
|
|
||||||
let prev = chain.head_header().unwrap();
|
let prev = chain.head_header().unwrap();
|
||||||
|
|
||||||
|
@ -178,7 +178,7 @@ fn test_coinbase_maturity() {
|
||||||
global::sizeshift() as u32,
|
global::sizeshift() as u32,
|
||||||
).unwrap();
|
).unwrap();
|
||||||
|
|
||||||
chain.process_block(block, chain::NONE).unwrap();
|
chain.process_block(block, chain::MINE).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
let prev = chain.head_header().unwrap();
|
let prev = chain.head_header().unwrap();
|
||||||
|
@ -213,7 +213,7 @@ fn test_coinbase_maturity() {
|
||||||
global::sizeshift() as u32,
|
global::sizeshift() as u32,
|
||||||
).unwrap();
|
).unwrap();
|
||||||
|
|
||||||
let result = chain.process_block(block, chain::NONE);
|
let result = chain.process_block(block, chain::MINE);
|
||||||
match result {
|
match result {
|
||||||
Ok(_) => (),
|
Ok(_) => (),
|
||||||
Err(Error::ImmatureCoinbase) => panic!("we should not get an ImmatureCoinbase here"),
|
Err(Error::ImmatureCoinbase) => panic!("we should not get an ImmatureCoinbase here"),
|
||||||
|
|
|
@ -16,7 +16,7 @@ use std::net::SocketAddr;
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
|
|
||||||
use chain::{self, ChainAdapter};
|
use chain::{self, ChainAdapter, Options, MINE};
|
||||||
use core::core;
|
use core::core;
|
||||||
use core::core::block::BlockHeader;
|
use core::core::block::BlockHeader;
|
||||||
use core::core::hash::{Hash, Hashed};
|
use core::core::hash::{Hash, Hashed};
|
||||||
|
@ -35,6 +35,7 @@ pub struct NetToChainAdapter {
|
||||||
currently_syncing: Arc<AtomicBool>,
|
currently_syncing: Arc<AtomicBool>,
|
||||||
chain: Arc<chain::Chain>,
|
chain: Arc<chain::Chain>,
|
||||||
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
|
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
|
||||||
|
peers: OneTime<p2p::Peers>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl p2p::ChainAdapter for NetToChainAdapter {
|
impl p2p::ChainAdapter for NetToChainAdapter {
|
||||||
|
@ -64,13 +65,14 @@ impl p2p::ChainAdapter for NetToChainAdapter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn block_received(&self, b: core::Block, _: SocketAddr) -> bool {
|
fn block_received(&self, b: core::Block, addr: SocketAddr) -> bool {
|
||||||
let bhash = b.hash();
|
let bhash = b.hash();
|
||||||
debug!(
|
debug!(
|
||||||
LOGGER,
|
LOGGER,
|
||||||
"Received block {} at {} from network, going to process.",
|
"Received block {} at {} from {}, going to process.",
|
||||||
bhash,
|
bhash,
|
||||||
b.header.height,
|
b.header.height,
|
||||||
|
addr,
|
||||||
);
|
);
|
||||||
|
|
||||||
// pushing the new block through the chain pipeline
|
// pushing the new block through the chain pipeline
|
||||||
|
@ -86,6 +88,41 @@ impl p2p::ChainAdapter for NetToChainAdapter {
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn header_received(&self, bh: core::BlockHeader, addr: SocketAddr) -> bool {
|
||||||
|
let bhash = bh.hash();
|
||||||
|
debug!(
|
||||||
|
LOGGER,
|
||||||
|
"Received block header {} at {} from {}, going to process.",
|
||||||
|
bhash,
|
||||||
|
bh.height,
|
||||||
|
addr,
|
||||||
|
);
|
||||||
|
|
||||||
|
// pushing the new block header through the header chain pipeline
|
||||||
|
// we will go ask for the block if this is a new header
|
||||||
|
let res = self.chain.process_block_header(&bh, self.chain_opts());
|
||||||
|
|
||||||
|
if let &Err(ref e) = &res {
|
||||||
|
debug!(LOGGER, "Block header {} refused by chain: {:?}", bhash, e);
|
||||||
|
if e.is_bad_block() {
|
||||||
|
debug!(LOGGER, "header_received: {} is a bad header, resetting header head", bhash);
|
||||||
|
let _ = self.chain.reset_head();
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
// we got an error when trying to process the block header
|
||||||
|
// but nothing serious enough to need to ban the peer upstream
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// we have successfully processed a block header
|
||||||
|
// so we can go request the block itself
|
||||||
|
self.request_block(&bh, &addr);
|
||||||
|
|
||||||
|
// done receiving the header
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
fn headers_received(&self, bhs: Vec<core::BlockHeader>, addr: SocketAddr) {
|
fn headers_received(&self, bhs: Vec<core::BlockHeader>, addr: SocketAddr) {
|
||||||
info!(
|
info!(
|
||||||
LOGGER,
|
LOGGER,
|
||||||
|
@ -197,9 +234,14 @@ impl NetToChainAdapter {
|
||||||
currently_syncing: currently_syncing,
|
currently_syncing: currently_syncing,
|
||||||
chain: chain_ref,
|
chain: chain_ref,
|
||||||
tx_pool: tx_pool,
|
tx_pool: tx_pool,
|
||||||
|
peers: OneTime::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn init(&self, peers: p2p::Peers) {
|
||||||
|
self.peers.init(peers);
|
||||||
|
}
|
||||||
|
|
||||||
// recursively go back through the locator vector and stop when we find
|
// recursively go back through the locator vector and stop when we find
|
||||||
// a header that we recognize this will be a header shared in common
|
// a header that we recognize this will be a header shared in common
|
||||||
// between us and the peer
|
// between us and the peer
|
||||||
|
@ -234,6 +276,28 @@ impl NetToChainAdapter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// After we have received a block header in "header first" propagation
|
||||||
|
// we need to go request the block from the same peer that gave us the header
|
||||||
|
// (unless we have already accepted the block)
|
||||||
|
//
|
||||||
|
// TODO - currently only request block from a single peer
|
||||||
|
// consider additional peers for redundancy?
|
||||||
|
fn request_block(&self, bh: &BlockHeader, addr: &SocketAddr) {
|
||||||
|
if let None = self.peers.borrow().adapter.get_block(bh.hash()) {
|
||||||
|
if let Some(_) = self.peers.borrow().adapter.get_block(bh.previous) {
|
||||||
|
if let Some(peer) = self.peers.borrow().get_connected_peer(addr) {
|
||||||
|
if let Ok(peer) = peer.read() {
|
||||||
|
let _ = peer.send_block_request(bh.hash());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
debug!(LOGGER, "request_block: prev block {} missing, skipping", bh.previous);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
debug!(LOGGER, "request_block: block {} already known", bh.hash());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Prepare options for the chain pipeline
|
/// Prepare options for the chain pipeline
|
||||||
fn chain_opts(&self) -> chain::Options {
|
fn chain_opts(&self) -> chain::Options {
|
||||||
let opts = if self.currently_syncing.load(Ordering::Relaxed) {
|
let opts = if self.currently_syncing.load(Ordering::Relaxed) {
|
||||||
|
@ -254,7 +318,7 @@ pub struct ChainToPoolAndNetAdapter {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ChainAdapter for ChainToPoolAndNetAdapter {
|
impl ChainAdapter for ChainToPoolAndNetAdapter {
|
||||||
fn block_accepted(&self, b: &core::Block) {
|
fn block_accepted(&self, b: &core::Block, opts: Options) {
|
||||||
{
|
{
|
||||||
if let Err(e) = self.tx_pool.write().unwrap().reconcile_block(b) {
|
if let Err(e) = self.tx_pool.write().unwrap().reconcile_block(b) {
|
||||||
error!(
|
error!(
|
||||||
|
@ -265,7 +329,16 @@ impl ChainAdapter for ChainToPoolAndNetAdapter {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If we mined the block then we want to broadcast the block itself.
|
||||||
|
// But if we received the block from another node then broadcast "header first"
|
||||||
|
// to minimize network traffic.
|
||||||
|
if opts.contains(MINE) {
|
||||||
self.peers.borrow().broadcast_block(b);
|
self.peers.borrow().broadcast_block(b);
|
||||||
|
} else {
|
||||||
|
// "header first" propagation if we are not the originator of this block
|
||||||
|
self.peers.borrow().broadcast_header(&b.header);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -528,7 +528,7 @@ impl Miner {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// if we found a solution, push our block out
|
// we found a solution, push our block through the chain processing pipeline
|
||||||
if let Some(proof) = sol {
|
if let Some(proof) = sol {
|
||||||
info!(
|
info!(
|
||||||
LOGGER,
|
LOGGER,
|
||||||
|
@ -537,7 +537,7 @@ impl Miner {
|
||||||
b.hash()
|
b.hash()
|
||||||
);
|
);
|
||||||
b.header.pow = proof;
|
b.header.pow = proof;
|
||||||
let res = self.chain.process_block(b, chain::NONE);
|
let res = self.chain.process_block(b, chain::MINE);
|
||||||
if let Err(e) = res {
|
if let Err(e) = res {
|
||||||
error!(
|
error!(
|
||||||
LOGGER,
|
LOGGER,
|
||||||
|
|
|
@ -133,6 +133,7 @@ impl Server {
|
||||||
)?);
|
)?);
|
||||||
chain_adapter.init(p2p_server.peers.clone());
|
chain_adapter.init(p2p_server.peers.clone());
|
||||||
pool_net_adapter.init(p2p_server.peers.clone());
|
pool_net_adapter.init(p2p_server.peers.clone());
|
||||||
|
net_adapter.init(p2p_server.peers.clone());
|
||||||
|
|
||||||
let seed = seed::Seeder::new(
|
let seed = seed::Seeder::new(
|
||||||
config.capabilities, p2p_server.clone(), p2p_server.peers.clone());
|
config.capabilities, p2p_server.clone(), p2p_server.peers.clone());
|
||||||
|
|
|
@ -211,7 +211,7 @@ pub fn needs_syncing(
|
||||||
if let Some(peer) = peer {
|
if let Some(peer) = peer {
|
||||||
if let Ok(peer) = peer.try_read() {
|
if let Ok(peer) = peer.try_read() {
|
||||||
if peer.info.total_difficulty <= local_diff {
|
if peer.info.total_difficulty <= local_diff {
|
||||||
info!(LOGGER, "synchronize stopped, at {:?} @ {:?}", local_diff, chain.head().unwrap().height);
|
info!(LOGGER, "synchronized at {:?} @ {:?}", local_diff, chain.head().unwrap().height);
|
||||||
currently_syncing.store(false, Ordering::Relaxed);
|
currently_syncing.store(false, Ordering::Relaxed);
|
||||||
let _ = chain.reset_head();
|
let _ = chain.reset_head();
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,6 +59,7 @@ enum_from_primitive! {
|
||||||
GetPeerAddrs,
|
GetPeerAddrs,
|
||||||
PeerAddrs,
|
PeerAddrs,
|
||||||
GetHeaders,
|
GetHeaders,
|
||||||
|
Header,
|
||||||
Headers,
|
Headers,
|
||||||
GetBlock,
|
GetBlock,
|
||||||
Block,
|
Block,
|
||||||
|
@ -426,6 +427,7 @@ impl Readable for SockAddr {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Serializable wrapper for the block locator.
|
/// Serializable wrapper for the block locator.
|
||||||
|
#[derive(Debug)]
|
||||||
pub struct Locator {
|
pub struct Locator {
|
||||||
pub hashes: Vec<Hash>,
|
pub hashes: Vec<Hash>,
|
||||||
}
|
}
|
||||||
|
|
|
@ -152,6 +152,27 @@ impl Peer {
|
||||||
debug!(LOGGER, "Send block {} to {}", b.hash(), self.info.addr);
|
debug!(LOGGER, "Send block {} to {}", b.hash(), self.info.addr);
|
||||||
self.proto.send_block(b)
|
self.proto.send_block(b)
|
||||||
} else {
|
} else {
|
||||||
|
debug!(
|
||||||
|
LOGGER,
|
||||||
|
"Suppress block send {} to {} (already seen)",
|
||||||
|
b.hash(),
|
||||||
|
self.info.addr,
|
||||||
|
);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn send_header(&self, bh: &core::BlockHeader) -> Result<(), Error> {
|
||||||
|
if !self.tracking_adapter.has(bh.hash()) {
|
||||||
|
debug!(LOGGER, "Send header {} to {}", bh.hash(), self.info.addr);
|
||||||
|
self.proto.send_header(bh)
|
||||||
|
} else {
|
||||||
|
debug!(
|
||||||
|
LOGGER,
|
||||||
|
"Suppress header send {} to {} (already seen)",
|
||||||
|
bh.hash(),
|
||||||
|
self.info.addr,
|
||||||
|
);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -160,8 +181,10 @@ impl Peer {
|
||||||
/// dropped if the remote peer is known to already have the transaction.
|
/// dropped if the remote peer is known to already have the transaction.
|
||||||
pub fn send_transaction(&self, tx: &core::Transaction) -> Result<(), Error> {
|
pub fn send_transaction(&self, tx: &core::Transaction) -> Result<(), Error> {
|
||||||
if !self.tracking_adapter.has(tx.hash()) {
|
if !self.tracking_adapter.has(tx.hash()) {
|
||||||
|
debug!(LOGGER, "Send tx {} to {}", tx.hash(), self.info.addr);
|
||||||
self.proto.send_transaction(tx)
|
self.proto.send_transaction(tx)
|
||||||
} else {
|
} else {
|
||||||
|
debug!(LOGGER, "Not sending tx {} to {} (already seen)", tx.hash(), self.info.addr);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -171,12 +194,7 @@ impl Peer {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn send_block_request(&self, h: Hash) -> Result<(), Error> {
|
pub fn send_block_request(&self, h: Hash) -> Result<(), Error> {
|
||||||
debug!(
|
debug!(LOGGER, "Requesting block {} from {}", h, self.info.addr);
|
||||||
LOGGER,
|
|
||||||
"Requesting block {} from peer {}.",
|
|
||||||
h,
|
|
||||||
self.info.addr
|
|
||||||
);
|
|
||||||
self.proto.send_block_request(h)
|
self.proto.send_block_request(h)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -241,6 +259,11 @@ impl ChainAdapter for TrackingAdapter {
|
||||||
self.adapter.block_received(b, addr)
|
self.adapter.block_received(b, addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn header_received(&self, bh: core::BlockHeader, addr: SocketAddr) -> bool {
|
||||||
|
self.push(bh.hash());
|
||||||
|
self.adapter.header_received(bh, addr)
|
||||||
|
}
|
||||||
|
|
||||||
fn headers_received(&self, bh: Vec<core::BlockHeader>, addr: SocketAddr) {
|
fn headers_received(&self, bh: Vec<core::BlockHeader>, addr: SocketAddr) {
|
||||||
self.adapter.headers_received(bh, addr)
|
self.adapter.headers_received(bh, addr)
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,7 @@ use std::sync::{Arc, RwLock};
|
||||||
use rand::{thread_rng, Rng};
|
use rand::{thread_rng, Rng};
|
||||||
|
|
||||||
use core::core;
|
use core::core;
|
||||||
use core::core::hash::Hash;
|
use core::core::hash::{Hash, Hashed};
|
||||||
use core::core::target::Difficulty;
|
use core::core::target::Difficulty;
|
||||||
use util::LOGGER;
|
use util::LOGGER;
|
||||||
use time;
|
use time;
|
||||||
|
@ -237,7 +237,7 @@ impl Peers {
|
||||||
}
|
}
|
||||||
debug!(
|
debug!(
|
||||||
LOGGER,
|
LOGGER,
|
||||||
"broadcast_block: {}, {} at {}, to {} peers",
|
"broadcast_block: {}, {} at {}, to {} peers, done.",
|
||||||
b.hash(),
|
b.hash(),
|
||||||
b.header.total_difficulty,
|
b.header.total_difficulty,
|
||||||
b.header.height,
|
b.header.height,
|
||||||
|
@ -245,6 +245,35 @@ impl Peers {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Broadcasts the provided block to PEER_PREFERRED_COUNT of our peers.
|
||||||
|
/// We may be connected to PEER_MAX_COUNT peers so we only
|
||||||
|
/// want to broadcast to a random subset of peers.
|
||||||
|
/// A peer implementation may drop the broadcast request
|
||||||
|
/// if it knows the remote peer already has the block.
|
||||||
|
pub fn broadcast_header(&self, bh: &core::BlockHeader) {
|
||||||
|
let peers = self.connected_peers();
|
||||||
|
let preferred_peers = 8;
|
||||||
|
let mut count = 0;
|
||||||
|
for p in peers.iter().take(preferred_peers) {
|
||||||
|
let p = p.read().unwrap();
|
||||||
|
if p.is_connected() {
|
||||||
|
if let Err(e) = p.send_header(bh) {
|
||||||
|
debug!(LOGGER, "Error sending header to peer: {:?}", e);
|
||||||
|
} else {
|
||||||
|
count += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
debug!(
|
||||||
|
LOGGER,
|
||||||
|
"broadcast_header: {}, {} at {}, to {} peers, done.",
|
||||||
|
bh.hash(),
|
||||||
|
bh.total_difficulty,
|
||||||
|
bh.height,
|
||||||
|
count,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
/// Broadcasts the provided transaction to PEER_PREFERRED_COUNT of our peers.
|
/// Broadcasts the provided transaction to PEER_PREFERRED_COUNT of our peers.
|
||||||
/// We may be connected to PEER_MAX_COUNT peers so we only
|
/// We may be connected to PEER_MAX_COUNT peers so we only
|
||||||
/// want to broadcast to a random subset of peers.
|
/// want to broadcast to a random subset of peers.
|
||||||
|
@ -390,8 +419,18 @@ impl ChainAdapter for Peers {
|
||||||
}
|
}
|
||||||
fn block_received(&self, b: core::Block, peer_addr: SocketAddr) -> bool {
|
fn block_received(&self, b: core::Block, peer_addr: SocketAddr) -> bool {
|
||||||
if !self.adapter.block_received(b, peer_addr) {
|
if !self.adapter.block_received(b, peer_addr) {
|
||||||
// if the peer sent us a block that's intrinsically bad, they're either
|
// if the peer sent us a block that's intrinsically bad
|
||||||
// mistaken or manevolent, both of which require a ban
|
// they are either mistaken or manevolent, both of which require a ban
|
||||||
|
self.ban_peer(&peer_addr);
|
||||||
|
false
|
||||||
|
} else {
|
||||||
|
true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fn header_received(&self, bh: core::BlockHeader, peer_addr: SocketAddr) -> bool {
|
||||||
|
if !self.adapter.header_received(bh, peer_addr) {
|
||||||
|
// if the peer sent us a block header that's intrinsically bad
|
||||||
|
// they are either mistaken or manevolent, both of which require a ban
|
||||||
self.ban_peer(&peer_addr);
|
self.ban_peer(&peer_addr);
|
||||||
false
|
false
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
// Copyright 2016 The Grin Developers
|
// Copyright 2018 The Grin Developers
|
||||||
//
|
//
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
// you may not use this file except in compliance with the License.
|
// you may not use this file except in compliance with the License.
|
||||||
|
@ -21,7 +21,7 @@ use futures_cpupool::CpuPool;
|
||||||
use tokio_core::net::TcpStream;
|
use tokio_core::net::TcpStream;
|
||||||
|
|
||||||
use core::core;
|
use core::core;
|
||||||
use core::core::hash::Hash;
|
use core::core::hash::{Hash, Hashed};
|
||||||
use core::core::target::Difficulty;
|
use core::core::target::Difficulty;
|
||||||
use core::ser;
|
use core::ser;
|
||||||
use conn::TimeoutConnection;
|
use conn::TimeoutConnection;
|
||||||
|
@ -83,6 +83,11 @@ impl Protocol for ProtocolV1 {
|
||||||
self.send_msg(Type::Block, b)
|
self.send_msg(Type::Block, b)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Serializes and sends a block header to our remote peer ("header first" propagation)
|
||||||
|
fn send_header(&self, bh: &core::BlockHeader) -> Result<(), Error> {
|
||||||
|
self.send_msg(Type::Header, bh)
|
||||||
|
}
|
||||||
|
|
||||||
/// Serializes and sends a transaction to our remote peer
|
/// Serializes and sends a transaction to our remote peer
|
||||||
fn send_transaction(&self, tx: &core::Transaction) -> Result<(), Error> {
|
fn send_transaction(&self, tx: &core::Transaction) -> Result<(), Error> {
|
||||||
self.send_msg(Type::Transaction, tx)
|
self.send_msg(Type::Transaction, tx)
|
||||||
|
@ -172,12 +177,14 @@ fn handle_payload(
|
||||||
},
|
},
|
||||||
Type::Transaction => {
|
Type::Transaction => {
|
||||||
let tx = ser::deserialize::<core::Transaction>(&mut &buf[..])?;
|
let tx = ser::deserialize::<core::Transaction>(&mut &buf[..])?;
|
||||||
|
debug!(LOGGER, "handle_payload: Transaction: {}", tx.hash());
|
||||||
|
|
||||||
adapter.transaction_received(tx);
|
adapter.transaction_received(tx);
|
||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
Type::GetBlock => {
|
Type::GetBlock => {
|
||||||
let h = ser::deserialize::<Hash>(&mut &buf[..])?;
|
let h = ser::deserialize::<Hash>(&mut &buf[..])?;
|
||||||
debug!(LOGGER, "handle_payload: GetBlock {}", h);
|
debug!(LOGGER, "handle_payload: GetBlock: {}", h);
|
||||||
|
|
||||||
let bo = adapter.get_block(h);
|
let bo = adapter.get_block(h);
|
||||||
if let Some(b) = bo {
|
if let Some(b) = bo {
|
||||||
|
@ -199,15 +206,16 @@ fn handle_payload(
|
||||||
Type::Block => {
|
Type::Block => {
|
||||||
let b = ser::deserialize::<core::Block>(&mut &buf[..])?;
|
let b = ser::deserialize::<core::Block>(&mut &buf[..])?;
|
||||||
let bh = b.hash();
|
let bh = b.hash();
|
||||||
|
debug!(LOGGER, "handle_payload: Block: {}", bh);
|
||||||
debug!(LOGGER, "handle_payload: Block {}", bh);
|
|
||||||
|
|
||||||
adapter.block_received(b, addr);
|
adapter.block_received(b, addr);
|
||||||
Ok(Some(bh))
|
Ok(Some(bh))
|
||||||
}
|
}
|
||||||
|
// A peer is asking us for some headers via a locator
|
||||||
Type::GetHeaders => {
|
Type::GetHeaders => {
|
||||||
// load headers from the locator
|
|
||||||
let loc = ser::deserialize::<Locator>(&mut &buf[..])?;
|
let loc = ser::deserialize::<Locator>(&mut &buf[..])?;
|
||||||
|
debug!(LOGGER, "handle_payload: GetHeaders: {:?}", loc);
|
||||||
|
|
||||||
let headers = adapter.locate_headers(loc.hashes);
|
let headers = adapter.locate_headers(loc.hashes);
|
||||||
|
|
||||||
// serialize and send all the headers over
|
// serialize and send all the headers over
|
||||||
|
@ -228,8 +236,23 @@ fn handle_payload(
|
||||||
|
|
||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
|
// "header first" block propagation - if we have not yet seen this block
|
||||||
|
// we can go request it from some of our peers
|
||||||
|
Type::Header => {
|
||||||
|
let header = ser::deserialize::<core::BlockHeader>(&mut &buf[..])?;
|
||||||
|
debug!(LOGGER, "handle_payload: Header: {}", header.hash());
|
||||||
|
|
||||||
|
adapter.header_received(header, addr);
|
||||||
|
|
||||||
|
// we do not return a hash here as we never request a single header
|
||||||
|
// a header will always arrive unsolicited
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
// receive headers as part of the sync process
|
||||||
Type::Headers => {
|
Type::Headers => {
|
||||||
let headers = ser::deserialize::<Headers>(&mut &buf[..])?;
|
let headers = ser::deserialize::<Headers>(&mut &buf[..])?;
|
||||||
|
debug!(LOGGER, "handle_payload: Headers: {}", headers.headers.len());
|
||||||
|
|
||||||
adapter.headers_received(headers.headers, addr);
|
adapter.headers_received(headers.headers, addr);
|
||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,10 +48,11 @@ impl ChainAdapter for DummyAdapter {
|
||||||
fn total_height(&self) -> u64 {
|
fn total_height(&self) -> u64 {
|
||||||
0
|
0
|
||||||
}
|
}
|
||||||
fn transaction_received(&self, _: core::Transaction) {}
|
fn transaction_received(&self, _tx: core::Transaction) {}
|
||||||
fn block_received(&self, _: core::Block, _: SocketAddr) -> bool { true }
|
fn block_received(&self, _b: core::Block, _addr: SocketAddr) -> bool { true }
|
||||||
fn headers_received(&self, _: Vec<core::BlockHeader>, _:SocketAddr) {}
|
fn header_received(&self, _bh: core::BlockHeader, _addr: SocketAddr) -> bool { true }
|
||||||
fn locate_headers(&self, _: Vec<Hash>) -> Vec<core::BlockHeader> {
|
fn headers_received(&self, _bh: Vec<core::BlockHeader>, _addr:SocketAddr) {}
|
||||||
|
fn locate_headers(&self, _loc: Vec<Hash>) -> Vec<core::BlockHeader> {
|
||||||
vec![]
|
vec![]
|
||||||
}
|
}
|
||||||
fn get_block(&self, _: Hash) -> Option<core::Block> {
|
fn get_block(&self, _: Hash) -> Option<core::Block> {
|
||||||
|
|
|
@ -141,6 +141,9 @@ pub trait Protocol {
|
||||||
/// Relays a block to the remote peer.
|
/// Relays a block to the remote peer.
|
||||||
fn send_block(&self, b: &core::Block) -> Result<(), Error>;
|
fn send_block(&self, b: &core::Block) -> Result<(), Error>;
|
||||||
|
|
||||||
|
/// Relays a block header to the remote peer ("header first" propagation).
|
||||||
|
fn send_header(&self, bh: &core::BlockHeader) -> Result<(), Error>;
|
||||||
|
|
||||||
/// Relays a transaction to the remote peer.
|
/// Relays a transaction to the remote peer.
|
||||||
fn send_transaction(&self, tx: &core::Transaction) -> Result<(), Error>;
|
fn send_transaction(&self, tx: &core::Transaction) -> Result<(), Error>;
|
||||||
|
|
||||||
|
@ -175,10 +178,12 @@ pub trait ChainAdapter: Sync + Send {
|
||||||
|
|
||||||
/// A block has been received from one of our peers. Returns true if the
|
/// A block has been received from one of our peers. Returns true if the
|
||||||
/// block could be handled properly and is not deemed defective by the
|
/// block could be handled properly and is not deemed defective by the
|
||||||
/// chain. Returning false means the block will nenver be valid and
|
/// chain. Returning false means the block will never be valid and
|
||||||
/// may result in the peer being banned.
|
/// may result in the peer being banned.
|
||||||
fn block_received(&self, b: core::Block, addr: SocketAddr) -> bool;
|
fn block_received(&self, b: core::Block, addr: SocketAddr) -> bool;
|
||||||
|
|
||||||
|
fn header_received(&self, b: core::BlockHeader, addr: SocketAddr) -> bool;
|
||||||
|
|
||||||
/// A set of block header has been received, typically in response to a
|
/// A set of block header has been received, typically in response to a
|
||||||
/// block
|
/// block
|
||||||
/// header request.
|
/// header request.
|
||||||
|
|
Loading…
Reference in a new issue