mirror of
https://github.com/mimblewimble/grin.git
synced 2025-05-06 01:01:14 +03:00
Request compact blocks (#667)
* first pass at allow/deny lists for hard-coded peers (not just seeds) * commit * add peers_allow and peers_deny examples and comments to grin.toml * always ask for compact block always fail to hydrate always fallback to requesting full block * decide to send full (empty) block over compact (empty) block * add some randomness to the decision around broadcasting an empty block as a block or as a compact block (so we can exercise more code paths easily)
This commit is contained in:
parent
33cb0902bd
commit
3f15f7f2f9
9 changed files with 217 additions and 33 deletions
|
@ -53,11 +53,12 @@ pub fn process_block(b: &Block, mut ctx: BlockContext) -> Result<Option<Tip>, Er
|
|||
|
||||
debug!(
|
||||
LOGGER,
|
||||
"pipe: process_block {} at {} with {} inputs and {} outputs.",
|
||||
"pipe: process_block {} at {} with {} inputs, {} outputs, {} kernels",
|
||||
b.hash(),
|
||||
b.header.height,
|
||||
b.inputs.len(),
|
||||
b.outputs.len()
|
||||
b.outputs.len(),
|
||||
b.kernels.len(),
|
||||
);
|
||||
check_known(b.hash(), &mut ctx)?;
|
||||
|
||||
|
|
|
@ -15,6 +15,8 @@
|
|||
use std::net::SocketAddr;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use rand;
|
||||
use rand::Rng;
|
||||
|
||||
use chain::{self, ChainAdapter, Options, MINE};
|
||||
use core::core;
|
||||
|
@ -88,6 +90,26 @@ impl p2p::ChainAdapter for NetToChainAdapter {
|
|||
true
|
||||
}
|
||||
|
||||
fn compact_block_received(&self, bh: core::CompactBlock, addr: SocketAddr) -> bool {
|
||||
let bhash = bh.hash();
|
||||
debug!(
|
||||
LOGGER,
|
||||
"Received compact_block {} at {} from {}, going to process.",
|
||||
bhash,
|
||||
bh.header.height,
|
||||
addr,
|
||||
);
|
||||
|
||||
debug!(
|
||||
LOGGER,
|
||||
"*** cannot hydrate compact block (not yet implemented), falling back to requesting full block",
|
||||
);
|
||||
|
||||
self.request_block(&bh.header, &addr);
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
fn header_received(&self, bh: core::BlockHeader, addr: SocketAddr) -> bool {
|
||||
let bhash = bh.hash();
|
||||
debug!(
|
||||
|
@ -117,7 +139,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
|
|||
|
||||
// we have successfully processed a block header
|
||||
// so we can go request the block itself
|
||||
self.request_block(&bh, &addr);
|
||||
self.request_compact_block(&bh, &addr);
|
||||
|
||||
// done receiving the header
|
||||
true
|
||||
|
@ -276,28 +298,42 @@ impl NetToChainAdapter {
|
|||
}
|
||||
}
|
||||
|
||||
// After we have received a block header in "header first" propagation
|
||||
// we need to go request the block from the same peer that gave us the header
|
||||
// (unless we have already accepted the block)
|
||||
// After receiving a compact block if we cannot successfully hydrate
|
||||
// it into a full block then fallback to requesting the full block
|
||||
// from the same peer that gave us the compact block
|
||||
//
|
||||
// TODO - currently only request block from a single peer
|
||||
// consider additional peers for redundancy?
|
||||
fn request_block(&self, bh: &BlockHeader, addr: &SocketAddr) {
|
||||
if let None = self.peers.borrow().adapter.get_block(bh.hash()) {
|
||||
if let Some(_) = self.peers.borrow().adapter.get_block(bh.previous) {
|
||||
if let Some(peer) = self.peers.borrow().get_connected_peer(addr) {
|
||||
if let Ok(peer) = peer.read() {
|
||||
let _ = peer.send_block_request(bh.hash());
|
||||
}
|
||||
if let Some(peer) = self.peers.borrow().get_connected_peer(addr) {
|
||||
if let Ok(peer) = peer.read() {
|
||||
let _ = peer.send_block_request(bh.hash());
|
||||
}
|
||||
} else {
|
||||
debug!(LOGGER, "request_block: prev block {} missing, skipping", bh.previous);
|
||||
}
|
||||
} else {
|
||||
debug!(LOGGER, "request_block: block {} already known", bh.hash());
|
||||
}
|
||||
}
|
||||
|
||||
// After we have received a block header in "header first" propagation
|
||||
// we need to go request the block (compact representation) from the
|
||||
// same peer that gave us the header (unless we have already accepted the block)
|
||||
//
|
||||
// TODO - currently only request block from a single peer
|
||||
// consider additional peers for redundancy?
|
||||
fn request_compact_block(&self, bh: &BlockHeader, addr: &SocketAddr) {
|
||||
if let None = self.peers.borrow().adapter.get_block(bh.hash()) {
|
||||
if let Some(peer) = self.peers.borrow().get_connected_peer(addr) {
|
||||
if let Ok(peer) = peer.read() {
|
||||
let _ = peer.send_compact_block_request(bh.hash());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
debug!(LOGGER, "request_compact_block: block {} already known", bh.hash());
|
||||
}
|
||||
}
|
||||
|
||||
/// Prepare options for the chain pipeline
|
||||
fn chain_opts(&self) -> chain::Options {
|
||||
let opts = if self.currently_syncing.load(Ordering::Relaxed) {
|
||||
|
@ -331,10 +367,29 @@ impl ChainAdapter for ChainToPoolAndNetAdapter {
|
|||
}
|
||||
|
||||
// If we mined the block then we want to broadcast the block itself.
|
||||
// But if we received the block from another node then broadcast "header first"
|
||||
// If block is empty then broadcast the block.
|
||||
// If block contains txs then broadcast the compact block.
|
||||
// If we received the block from another node then broadcast "header first"
|
||||
// to minimize network traffic.
|
||||
if opts.contains(MINE) {
|
||||
self.peers.borrow().broadcast_block(b);
|
||||
// propagate compact block out if we mined the block
|
||||
// but broadcast full block if we have no txs
|
||||
let cb = b.as_compact_block();
|
||||
if cb.kern_ids.is_empty() {
|
||||
|
||||
// in the interest of testing all code paths
|
||||
// randomly decide how we send an empty block out
|
||||
// TODO - lock this down once we are comfortable it works...
|
||||
|
||||
let mut rng = rand::thread_rng();
|
||||
if rng.gen() {
|
||||
self.peers.borrow().broadcast_block(&b);
|
||||
} else {
|
||||
self.peers.borrow().broadcast_compact_block(&cb);
|
||||
}
|
||||
} else {
|
||||
self.peers.borrow().broadcast_compact_block(&cb);
|
||||
}
|
||||
} else {
|
||||
// "header first" propagation if we are not the originator of this block
|
||||
self.peers.borrow().broadcast_header(&b.header);
|
||||
|
|
|
@ -158,7 +158,6 @@ pub fn header_sync(peers: Peers, chain: Arc<chain::Chain>) {
|
|||
if let Some(peer) = peers.most_work_peer() {
|
||||
if let Ok(p) = peer.try_read() {
|
||||
let peer_difficulty = p.info.total_difficulty.clone();
|
||||
debug!(LOGGER, "sync: header_sync: {}, {}", difficulty, peer_difficulty);
|
||||
if peer_difficulty > difficulty {
|
||||
let _ = request_headers(
|
||||
peer.clone(),
|
||||
|
|
|
@ -49,22 +49,24 @@ pub enum ErrCodes {
|
|||
|
||||
/// Types of messages
|
||||
enum_from_primitive! {
|
||||
#[derive(Debug, Clone, Copy, PartialEq)]
|
||||
pub enum Type {
|
||||
Error,
|
||||
Hand,
|
||||
Shake,
|
||||
Ping,
|
||||
Pong,
|
||||
GetPeerAddrs,
|
||||
PeerAddrs,
|
||||
GetHeaders,
|
||||
Header,
|
||||
Headers,
|
||||
GetBlock,
|
||||
Block,
|
||||
Transaction,
|
||||
}
|
||||
#[derive(Debug, Clone, Copy, PartialEq)]
|
||||
pub enum Type {
|
||||
Error,
|
||||
Hand,
|
||||
Shake,
|
||||
Ping,
|
||||
Pong,
|
||||
GetPeerAddrs,
|
||||
PeerAddrs,
|
||||
GetHeaders,
|
||||
Header,
|
||||
Headers,
|
||||
GetBlock,
|
||||
Block,
|
||||
GetCompactBlock,
|
||||
CompactBlock,
|
||||
Transaction,
|
||||
}
|
||||
}
|
||||
|
||||
/// Future combinator to read any message where the body is a Readable. Reads
|
||||
|
|
|
@ -184,6 +184,21 @@ impl Peer {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn send_compact_block(&self, b: &core::CompactBlock) -> Result<(), Error> {
|
||||
if !self.tracking_adapter.has(b.hash()) {
|
||||
debug!(LOGGER, "Send compact block {} to {}", b.hash(), self.info.addr);
|
||||
self.proto.send_compact_block(b)
|
||||
} else {
|
||||
debug!(
|
||||
LOGGER,
|
||||
"Suppress compact block send {} to {} (already seen)",
|
||||
b.hash(),
|
||||
self.info.addr,
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn send_header(&self, bh: &core::BlockHeader) -> Result<(), Error> {
|
||||
if !self.tracking_adapter.has(bh.hash()) {
|
||||
debug!(LOGGER, "Send header {} to {}", bh.hash(), self.info.addr);
|
||||
|
@ -220,6 +235,11 @@ impl Peer {
|
|||
self.proto.send_block_request(h)
|
||||
}
|
||||
|
||||
pub fn send_compact_block_request(&self, h: Hash) -> Result<(), Error> {
|
||||
debug!(LOGGER, "Requesting compact block {} from {}", h, self.info.addr);
|
||||
self.proto.send_compact_block_request(h)
|
||||
}
|
||||
|
||||
pub fn send_peer_request(&self, capab: Capabilities) -> Result<(), Error> {
|
||||
debug!(LOGGER, "Asking {} for more peers.", self.info.addr);
|
||||
self.proto.send_peer_request(capab)
|
||||
|
@ -281,6 +301,11 @@ impl ChainAdapter for TrackingAdapter {
|
|||
self.adapter.block_received(b, addr)
|
||||
}
|
||||
|
||||
fn compact_block_received(&self, cb: core::CompactBlock, addr: SocketAddr) -> bool {
|
||||
self.push(cb.hash());
|
||||
self.adapter.compact_block_received(cb, addr)
|
||||
}
|
||||
|
||||
fn header_received(&self, bh: core::BlockHeader, addr: SocketAddr) -> bool {
|
||||
self.push(bh.hash());
|
||||
self.adapter.header_received(bh, addr)
|
||||
|
|
|
@ -247,6 +247,30 @@ impl Peers {
|
|||
);
|
||||
}
|
||||
|
||||
pub fn broadcast_compact_block(&self, b: &core::CompactBlock) {
|
||||
let peers = self.connected_peers();
|
||||
let preferred_peers = 8;
|
||||
let mut count = 0;
|
||||
for p in peers.iter().take(preferred_peers) {
|
||||
let p = p.read().unwrap();
|
||||
if p.is_connected() {
|
||||
if let Err(e) = p.send_compact_block(b) {
|
||||
debug!(LOGGER, "Error sending compact block to peer: {:?}", e);
|
||||
} else {
|
||||
count += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
debug!(
|
||||
LOGGER,
|
||||
"broadcast_compact_block: {}, {} at {}, to {} peers, done.",
|
||||
b.hash(),
|
||||
b.header.total_difficulty,
|
||||
b.header.height,
|
||||
count,
|
||||
);
|
||||
}
|
||||
|
||||
/// Broadcasts the provided block to PEER_PREFERRED_COUNT of our peers.
|
||||
/// We may be connected to PEER_MAX_COUNT peers so we only
|
||||
/// want to broadcast to a random subset of peers.
|
||||
|
@ -429,6 +453,16 @@ impl ChainAdapter for Peers {
|
|||
true
|
||||
}
|
||||
}
|
||||
fn compact_block_received(&self, cb: core::CompactBlock, peer_addr: SocketAddr) -> bool {
|
||||
if !self.adapter.compact_block_received(cb, peer_addr) {
|
||||
// if the peer sent us a block that's intrinsically bad
|
||||
// they are either mistaken or manevolent, both of which require a ban
|
||||
self.ban_peer(&peer_addr);
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
}
|
||||
fn header_received(&self, bh: core::BlockHeader, peer_addr: SocketAddr) -> bool {
|
||||
if !self.adapter.header_received(bh, peer_addr) {
|
||||
// if the peer sent us a block header that's intrinsically bad
|
||||
|
|
|
@ -18,6 +18,8 @@ use std::net::SocketAddr;
|
|||
use futures::Future;
|
||||
use futures::sync::mpsc::UnboundedSender;
|
||||
use futures_cpupool::CpuPool;
|
||||
use rand;
|
||||
use rand::Rng;
|
||||
use tokio_core::net::TcpStream;
|
||||
|
||||
use core::core;
|
||||
|
@ -83,6 +85,10 @@ impl Protocol for ProtocolV1 {
|
|||
self.send_msg(Type::Block, b)
|
||||
}
|
||||
|
||||
fn send_compact_block(&self, cb: &core::CompactBlock) -> Result<(), Error> {
|
||||
self.send_msg(Type::CompactBlock, cb)
|
||||
}
|
||||
|
||||
/// Serializes and sends a block header to our remote peer ("header first" propagation)
|
||||
fn send_header(&self, bh: &core::BlockHeader) -> Result<(), Error> {
|
||||
self.send_msg(Type::Header, bh)
|
||||
|
@ -106,6 +112,10 @@ impl Protocol for ProtocolV1 {
|
|||
self.send_request(Type::GetBlock, Type::Block, &h, Some(h))
|
||||
}
|
||||
|
||||
fn send_compact_block_request(&self, h: Hash) -> Result<(), Error> {
|
||||
self.send_request(Type::GetCompactBlock, Type::CompactBlock, &h, Some(h))
|
||||
}
|
||||
|
||||
fn send_peer_request(&self, capab: Capabilities) -> Result<(), Error> {
|
||||
self.send_request(
|
||||
Type::GetPeerAddrs,
|
||||
|
@ -211,6 +221,57 @@ fn handle_payload(
|
|||
adapter.block_received(b, addr);
|
||||
Ok(Some(bh))
|
||||
}
|
||||
Type::GetCompactBlock => {
|
||||
let h = ser::deserialize::<Hash>(&mut &buf[..])?;
|
||||
debug!(LOGGER, "handle_payload: GetCompactBlock: {}", h);
|
||||
|
||||
if let Some(b) = adapter.get_block(h) {
|
||||
let cb = b.as_compact_block();
|
||||
|
||||
// serialize and send the block over in compact representation
|
||||
let mut body_data = vec![];
|
||||
let mut data = vec![];
|
||||
|
||||
// if we have txs in the block send a compact block
|
||||
// but if block is empty -
|
||||
// to allow us to test all code paths, randomly choose to send
|
||||
// either the block or the compact block
|
||||
let mut rng = rand::thread_rng();
|
||||
|
||||
if cb.kern_ids.is_empty() && rng.gen() {
|
||||
debug!(
|
||||
LOGGER,
|
||||
"handle_payload: GetCompactBlock: empty block, sending full block",
|
||||
);
|
||||
|
||||
try!(ser::serialize(&mut body_data, &b));
|
||||
try!(ser::serialize(
|
||||
&mut data,
|
||||
&MsgHeader::new(Type::Block, body_data.len() as u64),
|
||||
));
|
||||
} else {
|
||||
try!(ser::serialize(&mut body_data, &cb));
|
||||
try!(ser::serialize(
|
||||
&mut data,
|
||||
&MsgHeader::new(Type::CompactBlock, body_data.len() as u64),
|
||||
));
|
||||
}
|
||||
|
||||
data.append(&mut body_data);
|
||||
if let Err(e) = sender.unbounded_send(data) {
|
||||
debug!(LOGGER, "handle_payload: GetCompactBlock, error sending: {:?}", e);
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
Type::CompactBlock => {
|
||||
let b = ser::deserialize::<core::CompactBlock>(&mut &buf[..])?;
|
||||
let bh = b.hash();
|
||||
debug!(LOGGER, "handle_payload: CompactBlock: {}", bh);
|
||||
|
||||
adapter.compact_block_received(b, addr);
|
||||
Ok(Some(bh))
|
||||
}
|
||||
// A peer is asking us for some headers via a locator
|
||||
Type::GetHeaders => {
|
||||
let loc = ser::deserialize::<Locator>(&mut &buf[..])?;
|
||||
|
|
|
@ -50,6 +50,7 @@ impl ChainAdapter for DummyAdapter {
|
|||
}
|
||||
fn transaction_received(&self, _tx: core::Transaction) {}
|
||||
fn block_received(&self, _b: core::Block, _addr: SocketAddr) -> bool { true }
|
||||
fn compact_block_received(&self, _cb: core::CompactBlock, _addr: SocketAddr) -> bool { true }
|
||||
fn header_received(&self, _bh: core::BlockHeader, _addr: SocketAddr) -> bool { true }
|
||||
fn headers_received(&self, _bh: Vec<core::BlockHeader>, _addr:SocketAddr) {}
|
||||
fn locate_headers(&self, _loc: Vec<Hash>) -> Vec<core::BlockHeader> {
|
||||
|
|
|
@ -147,6 +147,8 @@ pub trait Protocol {
|
|||
/// Relays a block to the remote peer.
|
||||
fn send_block(&self, b: &core::Block) -> Result<(), Error>;
|
||||
|
||||
fn send_compact_block(&self, cb: &core::CompactBlock) -> Result<(), Error>;
|
||||
|
||||
/// Relays a block header to the remote peer ("header first" propagation).
|
||||
fn send_header(&self, bh: &core::BlockHeader) -> Result<(), Error>;
|
||||
|
||||
|
@ -159,6 +161,8 @@ pub trait Protocol {
|
|||
/// Sends a request for a block from its hash.
|
||||
fn send_block_request(&self, h: Hash) -> Result<(), Error>;
|
||||
|
||||
fn send_compact_block_request(&self, h: Hash) -> Result<(), Error>;
|
||||
|
||||
/// Sends a request for some peer addresses.
|
||||
fn send_peer_request(&self, capab: Capabilities) -> Result<(), Error>;
|
||||
|
||||
|
@ -188,7 +192,9 @@ pub trait ChainAdapter: Sync + Send {
|
|||
/// may result in the peer being banned.
|
||||
fn block_received(&self, b: core::Block, addr: SocketAddr) -> bool;
|
||||
|
||||
fn header_received(&self, b: core::BlockHeader, addr: SocketAddr) -> bool;
|
||||
fn compact_block_received(&self, cb: core::CompactBlock, addr: SocketAddr) -> bool;
|
||||
|
||||
fn header_received(&self, bh: core::BlockHeader, addr: SocketAddr) -> bool;
|
||||
|
||||
/// A set of block header has been received, typically in response to a
|
||||
/// block
|
||||
|
|
Loading…
Add table
Reference in a new issue