pass peer_info around rather than peer_addr (includes protocol version) (#2761)

This commit is contained in:
Antioch Peverell 2019-04-18 14:11:06 +01:00 committed by GitHub
parent ea9953283b
commit 13c6160e15
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 138 additions and 112 deletions

View file

@ -136,9 +136,8 @@ impl Peer {
/// Main peer loop listening for messages and forwarding to the rest of the /// Main peer loop listening for messages and forwarding to the rest of the
/// system. /// system.
pub fn start(&mut self, conn: TcpStream) { pub fn start(&mut self, conn: TcpStream) {
let addr = self.info.addr;
let adapter = Arc::new(self.tracking_adapter.clone()); let adapter = Arc::new(self.tracking_adapter.clone());
let handler = Protocol::new(adapter, addr); let handler = Protocol::new(adapter, self.info.clone());
self.connection = Some(Mutex::new(conn::listen(conn, handler))); self.connection = Some(Mutex::new(conn::listen(conn, handler)));
} }
@ -533,9 +532,13 @@ impl ChainAdapter for TrackingAdapter {
self.adapter.get_transaction(kernel_hash) self.adapter.get_transaction(kernel_hash)
} }
fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr) -> Result<bool, chain::Error> { fn tx_kernel_received(
&self,
kernel_hash: Hash,
peer_info: &PeerInfo,
) -> Result<bool, chain::Error> {
self.push_recv(kernel_hash); self.push_recv(kernel_hash);
self.adapter.tx_kernel_received(kernel_hash, addr) self.adapter.tx_kernel_received(kernel_hash, peer_info)
} }
fn transaction_received( fn transaction_received(
@ -556,34 +559,38 @@ impl ChainAdapter for TrackingAdapter {
fn block_received( fn block_received(
&self, &self,
b: core::Block, b: core::Block,
addr: PeerAddr, peer_info: &PeerInfo,
_was_requested: bool, _was_requested: bool,
) -> Result<bool, chain::Error> { ) -> Result<bool, chain::Error> {
let bh = b.hash(); let bh = b.hash();
self.push_recv(bh); self.push_recv(bh);
self.adapter.block_received(b, addr, self.has_req(bh)) self.adapter.block_received(b, peer_info, self.has_req(bh))
} }
fn compact_block_received( fn compact_block_received(
&self, &self,
cb: core::CompactBlock, cb: core::CompactBlock,
addr: PeerAddr, peer_info: &PeerInfo,
) -> Result<bool, chain::Error> { ) -> Result<bool, chain::Error> {
self.push_recv(cb.hash()); self.push_recv(cb.hash());
self.adapter.compact_block_received(cb, addr) self.adapter.compact_block_received(cb, peer_info)
} }
fn header_received(&self, bh: core::BlockHeader, addr: PeerAddr) -> Result<bool, chain::Error> { fn header_received(
&self,
bh: core::BlockHeader,
peer_info: &PeerInfo,
) -> Result<bool, chain::Error> {
self.push_recv(bh.hash()); self.push_recv(bh.hash());
self.adapter.header_received(bh, addr) self.adapter.header_received(bh, peer_info)
} }
fn headers_received( fn headers_received(
&self, &self,
bh: &[core::BlockHeader], bh: &[core::BlockHeader],
addr: PeerAddr, peer_info: &PeerInfo,
) -> Result<bool, chain::Error> { ) -> Result<bool, chain::Error> {
self.adapter.headers_received(bh, addr) self.adapter.headers_received(bh, peer_info)
} }
fn locate_headers(&self, locator: &[Hash]) -> Result<Vec<core::BlockHeader>, chain::Error> { fn locate_headers(&self, locator: &[Hash]) -> Result<Vec<core::BlockHeader>, chain::Error> {
@ -606,9 +613,9 @@ impl ChainAdapter for TrackingAdapter {
&self, &self,
h: Hash, h: Hash,
txhashset_data: File, txhashset_data: File,
peer_addr: PeerAddr, peer_info: &PeerInfo,
) -> Result<bool, chain::Error> { ) -> Result<bool, chain::Error> {
self.adapter.txhashset_write(h, txhashset_data, peer_addr) self.adapter.txhashset_write(h, txhashset_data, peer_info)
} }
fn txhashset_download_update( fn txhashset_download_update(

View file

@ -30,7 +30,7 @@ use chrono::Duration;
use crate::peer::Peer; use crate::peer::Peer;
use crate::store::{PeerData, PeerStore, State}; use crate::store::{PeerData, PeerStore, State};
use crate::types::{ use crate::types::{
Capabilities, ChainAdapter, Direction, Error, NetAdapter, P2PConfig, PeerAddr, ReasonForBan, Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan,
TxHashSetRead, MAX_PEER_ADDRS, TxHashSetRead, MAX_PEER_ADDRS,
}; };
@ -104,12 +104,10 @@ impl Peers {
} }
pub fn outgoing_connected_peers(&self) -> Vec<Arc<Peer>> { pub fn outgoing_connected_peers(&self) -> Vec<Arc<Peer>> {
let peers = self.connected_peers(); self.connected_peers()
let res = peers
.into_iter() .into_iter()
.filter(|x| x.info.direction == Direction::Outbound) .filter(|x| x.info.is_outbound())
.collect::<Vec<_>>(); .collect()
res
} }
/// Get a peer we're connected to by address. /// Get a peer we're connected to by address.
@ -119,20 +117,12 @@ impl Peers {
/// Number of peers currently connected to. /// Number of peers currently connected to.
pub fn peer_count(&self) -> u32 { pub fn peer_count(&self) -> u32 {
self.peers self.connected_peers().len() as u32
.read()
.values()
.filter(|x| x.is_connected())
.count() as u32
} }
/// Number of outbound peers currently connected to. /// Number of outbound peers currently connected to.
pub fn peer_outbound_count(&self) -> u32 { pub fn peer_outbound_count(&self) -> u32 {
self.peers self.outgoing_connected_peers().len() as u32
.read()
.values()
.filter(|x| x.is_connected() && x.info.is_outbound())
.count() as u32
} }
// Return vec of connected peers that currently advertise more work // Return vec of connected peers that currently advertise more work
@ -498,8 +488,12 @@ impl ChainAdapter for Peers {
self.adapter.get_transaction(kernel_hash) self.adapter.get_transaction(kernel_hash)
} }
fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr) -> Result<bool, chain::Error> { fn tx_kernel_received(
self.adapter.tx_kernel_received(kernel_hash, addr) &self,
kernel_hash: Hash,
peer_info: &PeerInfo,
) -> Result<bool, chain::Error> {
self.adapter.tx_kernel_received(kernel_hash, peer_info)
} }
fn transaction_received( fn transaction_received(
@ -513,18 +507,18 @@ impl ChainAdapter for Peers {
fn block_received( fn block_received(
&self, &self,
b: core::Block, b: core::Block,
peer_addr: PeerAddr, peer_info: &PeerInfo,
was_requested: bool, was_requested: bool,
) -> Result<bool, chain::Error> { ) -> Result<bool, chain::Error> {
let hash = b.hash(); let hash = b.hash();
if !self.adapter.block_received(b, peer_addr, was_requested)? { if !self.adapter.block_received(b, peer_info, was_requested)? {
// if the peer sent us a block that's intrinsically bad // if the peer sent us a block that's intrinsically bad
// they are either mistaken or malevolent, both of which require a ban // they are either mistaken or malevolent, both of which require a ban
debug!( debug!(
"Received a bad block {} from {}, the peer will be banned", "Received a bad block {} from {}, the peer will be banned",
hash, peer_addr hash, peer_info.addr,
); );
self.ban_peer(peer_addr, ReasonForBan::BadBlock); self.ban_peer(peer_info.addr, ReasonForBan::BadBlock);
Ok(false) Ok(false)
} else { } else {
Ok(true) Ok(true)
@ -534,17 +528,17 @@ impl ChainAdapter for Peers {
fn compact_block_received( fn compact_block_received(
&self, &self,
cb: core::CompactBlock, cb: core::CompactBlock,
peer_addr: PeerAddr, peer_info: &PeerInfo,
) -> Result<bool, chain::Error> { ) -> Result<bool, chain::Error> {
let hash = cb.hash(); let hash = cb.hash();
if !self.adapter.compact_block_received(cb, peer_addr)? { if !self.adapter.compact_block_received(cb, peer_info)? {
// if the peer sent us a block that's intrinsically bad // if the peer sent us a block that's intrinsically bad
// they are either mistaken or malevolent, both of which require a ban // they are either mistaken or malevolent, both of which require a ban
debug!( debug!(
"Received a bad compact block {} from {}, the peer will be banned", "Received a bad compact block {} from {}, the peer will be banned",
hash, peer_addr hash, peer_info.addr
); );
self.ban_peer(peer_addr, ReasonForBan::BadCompactBlock); self.ban_peer(peer_info.addr, ReasonForBan::BadCompactBlock);
Ok(false) Ok(false)
} else { } else {
Ok(true) Ok(true)
@ -554,12 +548,12 @@ impl ChainAdapter for Peers {
fn header_received( fn header_received(
&self, &self,
bh: core::BlockHeader, bh: core::BlockHeader,
peer_addr: PeerAddr, peer_info: &PeerInfo,
) -> Result<bool, chain::Error> { ) -> Result<bool, chain::Error> {
if !self.adapter.header_received(bh, peer_addr)? { if !self.adapter.header_received(bh, peer_info)? {
// if the peer sent us a block header that's intrinsically bad // if the peer sent us a block header that's intrinsically bad
// they are either mistaken or malevolent, both of which require a ban // they are either mistaken or malevolent, both of which require a ban
self.ban_peer(peer_addr, ReasonForBan::BadBlockHeader); self.ban_peer(peer_info.addr, ReasonForBan::BadBlockHeader);
Ok(false) Ok(false)
} else { } else {
Ok(true) Ok(true)
@ -569,12 +563,12 @@ impl ChainAdapter for Peers {
fn headers_received( fn headers_received(
&self, &self,
headers: &[core::BlockHeader], headers: &[core::BlockHeader],
peer_addr: PeerAddr, peer_info: &PeerInfo,
) -> Result<bool, chain::Error> { ) -> Result<bool, chain::Error> {
if !self.adapter.headers_received(headers, peer_addr)? { if !self.adapter.headers_received(headers, peer_info)? {
// if the peer sent us a block header that's intrinsically bad // if the peer sent us a block header that's intrinsically bad
// they are either mistaken or malevolent, both of which require a ban // they are either mistaken or malevolent, both of which require a ban
self.ban_peer(peer_addr, ReasonForBan::BadBlockHeader); self.ban_peer(peer_info.addr, ReasonForBan::BadBlockHeader);
Ok(false) Ok(false)
} else { } else {
Ok(true) Ok(true)
@ -601,14 +595,14 @@ impl ChainAdapter for Peers {
&self, &self,
h: Hash, h: Hash,
txhashset_data: File, txhashset_data: File,
peer_addr: PeerAddr, peer_info: &PeerInfo,
) -> Result<bool, chain::Error> { ) -> Result<bool, chain::Error> {
if !self.adapter.txhashset_write(h, txhashset_data, peer_addr)? { if !self.adapter.txhashset_write(h, txhashset_data, peer_info)? {
debug!( debug!(
"Received a bad txhashset data from {}, the peer will be banned", "Received a bad txhashset data from {}, the peer will be banned",
&peer_addr peer_info.addr
); );
self.ban_peer(peer_addr, ReasonForBan::BadTxHashSet); self.ban_peer(peer_info.addr, ReasonForBan::BadTxHashSet);
Ok(false) Ok(false)
} else { } else {
Ok(true) Ok(true)

View file

@ -27,16 +27,16 @@ use crate::msg::{
BanReason, GetPeerAddrs, Headers, Locator, PeerAddrs, Ping, Pong, TxHashSetArchive, BanReason, GetPeerAddrs, Headers, Locator, PeerAddrs, Ping, Pong, TxHashSetArchive,
TxHashSetRequest, Type, TxHashSetRequest, Type,
}; };
use crate::types::{Error, NetAdapter, PeerAddr}; use crate::types::{Error, NetAdapter, PeerInfo};
pub struct Protocol { pub struct Protocol {
adapter: Arc<dyn NetAdapter>, adapter: Arc<dyn NetAdapter>,
addr: PeerAddr, peer_info: PeerInfo,
} }
impl Protocol { impl Protocol {
pub fn new(adapter: Arc<dyn NetAdapter>, addr: PeerAddr) -> Protocol { pub fn new(adapter: Arc<dyn NetAdapter>, peer_info: PeerInfo) -> Protocol {
Protocol { adapter, addr } Protocol { adapter, peer_info }
} }
} }
@ -52,10 +52,10 @@ impl MessageHandler for Protocol {
// If we received a msg from a banned peer then log and drop it. // If we received a msg from a banned peer then log and drop it.
// If we are getting a lot of these then maybe we are not cleaning // If we are getting a lot of these then maybe we are not cleaning
// banned peers up correctly? // banned peers up correctly?
if adapter.is_banned(self.addr.clone()) { if adapter.is_banned(self.peer_info.addr) {
debug!( debug!(
"handler: consume: peer {:?} banned, received: {:?}, dropping.", "handler: consume: peer {:?} banned, received: {:?}, dropping.",
self.addr, msg.header.msg_type, self.peer_info.addr, msg.header.msg_type,
); );
return Ok(None); return Ok(None);
} }
@ -63,7 +63,7 @@ impl MessageHandler for Protocol {
match msg.header.msg_type { match msg.header.msg_type {
Type::Ping => { Type::Ping => {
let ping: Ping = msg.body()?; let ping: Ping = msg.body()?;
adapter.peer_difficulty(self.addr, ping.total_difficulty, ping.height); adapter.peer_difficulty(self.peer_info.addr, ping.total_difficulty, ping.height);
Ok(Some(Response::new( Ok(Some(Response::new(
Type::Pong, Type::Pong,
@ -77,7 +77,7 @@ impl MessageHandler for Protocol {
Type::Pong => { Type::Pong => {
let pong: Pong = msg.body()?; let pong: Pong = msg.body()?;
adapter.peer_difficulty(self.addr, pong.total_difficulty, pong.height); adapter.peer_difficulty(self.peer_info.addr, pong.total_difficulty, pong.height);
Ok(None) Ok(None)
} }
@ -93,7 +93,7 @@ impl MessageHandler for Protocol {
"handle_payload: received tx kernel: {}, msg_len: {}", "handle_payload: received tx kernel: {}, msg_len: {}",
h, msg.header.msg_len h, msg.header.msg_len
); );
adapter.tx_kernel_received(h, self.addr)?; adapter.tx_kernel_received(h, &self.peer_info)?;
Ok(None) Ok(None)
} }
@ -155,7 +155,7 @@ impl MessageHandler for Protocol {
// we can't know at this level whether we requested the block or not, // we can't know at this level whether we requested the block or not,
// the boolean should be properly set in higher level adapter // the boolean should be properly set in higher level adapter
adapter.block_received(b, self.addr, false)?; adapter.block_received(b, &self.peer_info, false)?;
Ok(None) Ok(None)
} }
@ -176,7 +176,7 @@ impl MessageHandler for Protocol {
); );
let b: core::CompactBlock = msg.body()?; let b: core::CompactBlock = msg.body()?;
adapter.compact_block_received(b, self.addr)?; adapter.compact_block_received(b, &self.peer_info)?;
Ok(None) Ok(None)
} }
@ -197,7 +197,7 @@ impl MessageHandler for Protocol {
// we can go request it from some of our peers // we can go request it from some of our peers
Type::Header => { Type::Header => {
let header: core::BlockHeader = msg.body()?; let header: core::BlockHeader = msg.body()?;
adapter.header_received(header, self.addr)?; adapter.header_received(header, &self.peer_info)?;
Ok(None) Ok(None)
} }
@ -217,7 +217,7 @@ impl MessageHandler for Protocol {
headers.push(header); headers.push(header);
total_bytes_read += bytes_read; total_bytes_read += bytes_read;
} }
adapter.headers_received(&headers, self.addr)?; adapter.headers_received(&headers, &self.peer_info)?;
} }
// Now check we read the correct total number of bytes off the stream. // Now check we read the correct total number of bytes off the stream.
@ -335,7 +335,7 @@ impl MessageHandler for Protocol {
let tmp_zip = File::open(tmp)?; let tmp_zip = File::open(tmp)?;
let res = self let res = self
.adapter .adapter
.txhashset_write(sm_arch.hash, tmp_zip, self.addr)?; .txhashset_write(sm_arch.hash, tmp_zip, &self.peer_info)?;
debug!( debug!(
"handle_payload: txhashset archive for {} at {}, DONE. Data Ok: {}", "handle_payload: txhashset archive for {} at {}, DONE. Data Ok: {}",

View file

@ -28,7 +28,8 @@ use crate::peer::Peer;
use crate::peers::Peers; use crate::peers::Peers;
use crate::store::PeerStore; use crate::store::PeerStore;
use crate::types::{ use crate::types::{
Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, ReasonForBan, TxHashSetRead, Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan,
TxHashSetRead,
}; };
use crate::util::{Mutex, StopState}; use crate::util::{Mutex, StopState};
use chrono::prelude::{DateTime, Utc}; use chrono::prelude::{DateTime, Utc};
@ -240,7 +241,7 @@ impl ChainAdapter for DummyAdapter {
None None
} }
fn tx_kernel_received(&self, _h: Hash, _addr: PeerAddr) -> Result<bool, chain::Error> { fn tx_kernel_received(&self, _h: Hash, _peer_info: &PeerInfo) -> Result<bool, chain::Error> {
Ok(true) Ok(true)
} }
fn transaction_received( fn transaction_received(
@ -253,21 +254,25 @@ impl ChainAdapter for DummyAdapter {
fn compact_block_received( fn compact_block_received(
&self, &self,
_cb: core::CompactBlock, _cb: core::CompactBlock,
_addr: PeerAddr, _peer_info: &PeerInfo,
) -> Result<bool, chain::Error> { ) -> Result<bool, chain::Error> {
Ok(true) Ok(true)
} }
fn header_received( fn header_received(
&self, &self,
_bh: core::BlockHeader, _bh: core::BlockHeader,
_addr: PeerAddr, _peer_info: &PeerInfo,
) -> Result<bool, chain::Error> { ) -> Result<bool, chain::Error> {
Ok(true) Ok(true)
} }
fn block_received(&self, _: core::Block, _: PeerAddr, _: bool) -> Result<bool, chain::Error> { fn block_received(&self, _: core::Block, _: &PeerInfo, _: bool) -> Result<bool, chain::Error> {
Ok(true) Ok(true)
} }
fn headers_received(&self, _: &[core::BlockHeader], _: PeerAddr) -> Result<bool, chain::Error> { fn headers_received(
&self,
_: &[core::BlockHeader],
_: &PeerInfo,
) -> Result<bool, chain::Error> {
Ok(true) Ok(true)
} }
fn locate_headers(&self, _: &[Hash]) -> Result<Vec<core::BlockHeader>, chain::Error> { fn locate_headers(&self, _: &[Hash]) -> Result<Vec<core::BlockHeader>, chain::Error> {
@ -288,7 +293,7 @@ impl ChainAdapter for DummyAdapter {
&self, &self,
_h: Hash, _h: Hash,
_txhashset_data: File, _txhashset_data: File,
_peer_addr: PeerAddr, _peer_info: &PeerInfo,
) -> Result<bool, chain::Error> { ) -> Result<bool, chain::Error> {
Ok(false) Ok(false)
} }

View file

@ -483,7 +483,11 @@ pub trait ChainAdapter: Sync + Send {
fn get_transaction(&self, kernel_hash: Hash) -> Option<core::Transaction>; fn get_transaction(&self, kernel_hash: Hash) -> Option<core::Transaction>;
fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr) -> Result<bool, chain::Error>; fn tx_kernel_received(
&self,
kernel_hash: Hash,
peer_info: &PeerInfo,
) -> Result<bool, chain::Error>;
/// A block has been received from one of our peers. Returns true if the /// A block has been received from one of our peers. Returns true if the
/// block could be handled properly and is not deemed defective by the /// block could be handled properly and is not deemed defective by the
@ -492,17 +496,21 @@ pub trait ChainAdapter: Sync + Send {
fn block_received( fn block_received(
&self, &self,
b: core::Block, b: core::Block,
addr: PeerAddr, peer_info: &PeerInfo,
was_requested: bool, was_requested: bool,
) -> Result<bool, chain::Error>; ) -> Result<bool, chain::Error>;
fn compact_block_received( fn compact_block_received(
&self, &self,
cb: core::CompactBlock, cb: core::CompactBlock,
addr: PeerAddr, peer_info: &PeerInfo,
) -> Result<bool, chain::Error>; ) -> Result<bool, chain::Error>;
fn header_received(&self, bh: core::BlockHeader, addr: PeerAddr) -> Result<bool, chain::Error>; fn header_received(
&self,
bh: core::BlockHeader,
peer_info: &PeerInfo,
) -> Result<bool, chain::Error>;
/// A set of block header has been received, typically in response to a /// A set of block header has been received, typically in response to a
/// block /// block
@ -510,7 +518,7 @@ pub trait ChainAdapter: Sync + Send {
fn headers_received( fn headers_received(
&self, &self,
bh: &[core::BlockHeader], bh: &[core::BlockHeader],
addr: PeerAddr, peer_info: &PeerInfo,
) -> Result<bool, chain::Error>; ) -> Result<bool, chain::Error>;
/// Finds a list of block headers based on the provided locator. Tries to /// Finds a list of block headers based on the provided locator. Tries to
@ -548,7 +556,7 @@ pub trait ChainAdapter: Sync + Send {
&self, &self,
h: Hash, h: Hash,
txhashset_data: File, txhashset_data: File,
peer_addr: PeerAddr, peer_peer_info: &PeerInfo,
) -> Result<bool, chain::Error>; ) -> Result<bool, chain::Error>;
} }

View file

@ -33,7 +33,7 @@ use crate::core::core::{BlockHeader, BlockSums, CompactBlock};
use crate::core::pow::Difficulty; use crate::core::pow::Difficulty;
use crate::core::{core, global}; use crate::core::{core, global};
use crate::p2p; use crate::p2p;
use crate::p2p::types::PeerAddr; use crate::p2p::types::PeerInfo;
use crate::pool; use crate::pool;
use crate::pool::types::DandelionConfig; use crate::pool::types::DandelionConfig;
use crate::util::OneTime; use crate::util::OneTime;
@ -67,7 +67,11 @@ impl p2p::ChainAdapter for NetToChainAdapter {
self.tx_pool.read().retrieve_tx_by_kernel_hash(kernel_hash) self.tx_pool.read().retrieve_tx_by_kernel_hash(kernel_hash)
} }
fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr) -> Result<bool, chain::Error> { fn tx_kernel_received(
&self,
kernel_hash: Hash,
peer_info: &PeerInfo,
) -> Result<bool, chain::Error> {
// nothing much we can do with a new transaction while syncing // nothing much we can do with a new transaction while syncing
if self.sync_state.is_syncing() { if self.sync_state.is_syncing() {
return Ok(true); return Ok(true);
@ -76,7 +80,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
let tx = self.tx_pool.read().retrieve_tx_by_kernel_hash(kernel_hash); let tx = self.tx_pool.read().retrieve_tx_by_kernel_hash(kernel_hash);
if tx.is_none() { if tx.is_none() {
self.request_transaction(kernel_hash, addr); self.request_transaction(kernel_hash, peer_info);
} }
Ok(true) Ok(true)
} }
@ -117,32 +121,32 @@ impl p2p::ChainAdapter for NetToChainAdapter {
fn block_received( fn block_received(
&self, &self,
b: core::Block, b: core::Block,
addr: PeerAddr, peer_info: &PeerInfo,
was_requested: bool, was_requested: bool,
) -> Result<bool, chain::Error> { ) -> Result<bool, chain::Error> {
debug!( debug!(
"Received block {} at {} from {} [in/out/kern: {}/{}/{}] going to process.", "Received block {} at {} from {} [in/out/kern: {}/{}/{}] going to process.",
b.hash(), b.hash(),
b.header.height, b.header.height,
addr, peer_info.addr,
b.inputs().len(), b.inputs().len(),
b.outputs().len(), b.outputs().len(),
b.kernels().len(), b.kernels().len(),
); );
self.process_block(b, addr, was_requested) self.process_block(b, peer_info, was_requested)
} }
fn compact_block_received( fn compact_block_received(
&self, &self,
cb: core::CompactBlock, cb: core::CompactBlock,
addr: PeerAddr, peer_info: &PeerInfo,
) -> Result<bool, chain::Error> { ) -> Result<bool, chain::Error> {
let bhash = cb.hash(); let bhash = cb.hash();
debug!( debug!(
"Received compact_block {} at {} from {} [out/kern/kern_ids: {}/{}/{}] going to process.", "Received compact_block {} at {} from {} [out/kern/kern_ids: {}/{}/{}] going to process.",
bhash, bhash,
cb.header.height, cb.header.height,
addr, peer_info.addr,
cb.out_full().len(), cb.out_full().len(),
cb.kern_full().len(), cb.kern_full().len(),
cb.kern_ids().len(), cb.kern_ids().len(),
@ -155,10 +159,10 @@ impl p2p::ChainAdapter for NetToChainAdapter {
Ok(block) => { Ok(block) => {
if !self.sync_state.is_syncing() { if !self.sync_state.is_syncing() {
for hook in &self.hooks { for hook in &self.hooks {
hook.on_block_received(&block, &addr); hook.on_block_received(&block, &peer_info.addr);
} }
} }
self.process_block(block, addr, false) self.process_block(block, peer_info, false)
} }
Err(e) => { Err(e) => {
debug!("Invalid hydrated block {}: {:?}", cb_hash, e); debug!("Invalid hydrated block {}: {:?}", cb_hash, e);
@ -196,7 +200,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
Ok(block) => { Ok(block) => {
if !self.sync_state.is_syncing() { if !self.sync_state.is_syncing() {
for hook in &self.hooks { for hook in &self.hooks {
hook.on_block_received(&block, &addr); hook.on_block_received(&block, &peer_info.addr);
} }
} }
block block
@ -213,11 +217,11 @@ impl p2p::ChainAdapter for NetToChainAdapter {
.is_ok() .is_ok()
{ {
debug!("successfully hydrated block from tx pool!"); debug!("successfully hydrated block from tx pool!");
self.process_block(block, addr, false) self.process_block(block, peer_info, false)
} else { } else {
if self.sync_state.status() == SyncStatus::NoSync { if self.sync_state.status() == SyncStatus::NoSync {
debug!("adapter: block invalid after hydration, requesting full block"); debug!("adapter: block invalid after hydration, requesting full block");
self.request_block(&cb.header, addr); self.request_block(&cb.header, peer_info);
Ok(true) Ok(true)
} else { } else {
debug!("block invalid after hydration, ignoring it, cause still syncing"); debug!("block invalid after hydration, ignoring it, cause still syncing");
@ -231,11 +235,15 @@ impl p2p::ChainAdapter for NetToChainAdapter {
} }
} }
fn header_received(&self, bh: core::BlockHeader, addr: PeerAddr) -> Result<bool, chain::Error> { fn header_received(
&self,
bh: core::BlockHeader,
peer_info: &PeerInfo,
) -> Result<bool, chain::Error> {
let bhash = bh.hash(); let bhash = bh.hash();
debug!( debug!(
"Received block header {} at {} from {}, going to process.", "Received block header {} at {} from {}, going to process.",
bhash, bh.height, addr, bhash, bh.height, peer_info.addr,
); );
// pushing the new block header through the header chain pipeline // pushing the new block header through the header chain pipeline
@ -257,7 +265,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
// we have successfully processed a block header // we have successfully processed a block header
// so we can go request the block itself // so we can go request the block itself
self.request_compact_block(&bh, addr); self.request_compact_block(&bh, peer_info);
// done receiving the header // done receiving the header
Ok(true) Ok(true)
@ -266,9 +274,13 @@ impl p2p::ChainAdapter for NetToChainAdapter {
fn headers_received( fn headers_received(
&self, &self,
bhs: &[core::BlockHeader], bhs: &[core::BlockHeader],
addr: PeerAddr, peer_info: &PeerInfo,
) -> Result<bool, chain::Error> { ) -> Result<bool, chain::Error> {
info!("Received {} block headers from {}", bhs.len(), addr,); info!(
"Received {} block headers from {}",
bhs.len(),
peer_info.addr
);
if bhs.len() == 0 { if bhs.len() == 0 {
return Ok(false); return Ok(false);
@ -388,7 +400,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
&self, &self,
h: Hash, h: Hash,
txhashset_data: File, txhashset_data: File,
_peer_addr: PeerAddr, _peer_info: &PeerInfo,
) -> Result<bool, chain::Error> { ) -> Result<bool, chain::Error> {
// check status again after download, in case 2 txhashsets made it somehow // check status again after download, in case 2 txhashsets made it somehow
if let SyncStatus::TxHashsetDownload { .. } = self.sync_state.status() { if let SyncStatus::TxHashsetDownload { .. } = self.sync_state.status() {
@ -475,7 +487,7 @@ impl NetToChainAdapter {
fn process_block( fn process_block(
&self, &self,
b: core::Block, b: core::Block,
addr: PeerAddr, peer_info: &PeerInfo,
was_requested: bool, was_requested: bool,
) -> Result<bool, chain::Error> { ) -> Result<bool, chain::Error> {
// We cannot process blocks earlier than the horizon so check for this here. // We cannot process blocks earlier than the horizon so check for this here.
@ -514,7 +526,7 @@ impl NetToChainAdapter {
&& !self.sync_state.is_syncing() && !self.sync_state.is_syncing()
{ {
debug!("process_block: received an orphan block, checking the parent: {:}", previous.hash()); debug!("process_block: received an orphan block, checking the parent: {:}", previous.hash());
self.request_block_by_hash(previous.hash(), addr) self.request_block_by_hash(previous.hash(), peer_info)
} }
} }
Ok(true) Ok(true)
@ -581,39 +593,39 @@ impl NetToChainAdapter {
} }
} }
fn request_transaction(&self, h: Hash, addr: PeerAddr) { fn request_transaction(&self, h: Hash, peer_info: &PeerInfo) {
self.send_tx_request_to_peer(h, addr, |peer, h| peer.send_tx_request(h)) self.send_tx_request_to_peer(h, peer_info, |peer, h| peer.send_tx_request(h))
} }
// After receiving a compact block if we cannot successfully hydrate // After receiving a compact block if we cannot successfully hydrate
// it into a full block then fallback to requesting the full block // it into a full block then fallback to requesting the full block
// from the same peer that gave us the compact block // from the same peer that gave us the compact block
// consider additional peers for redundancy? // consider additional peers for redundancy?
fn request_block(&self, bh: &BlockHeader, addr: PeerAddr) { fn request_block(&self, bh: &BlockHeader, peer_info: &PeerInfo) {
self.request_block_by_hash(bh.hash(), addr) self.request_block_by_hash(bh.hash(), peer_info)
} }
fn request_block_by_hash(&self, h: Hash, addr: PeerAddr) { fn request_block_by_hash(&self, h: Hash, peer_info: &PeerInfo) {
self.send_block_request_to_peer(h, addr, |peer, h| peer.send_block_request(h)) self.send_block_request_to_peer(h, peer_info, |peer, h| peer.send_block_request(h))
} }
// After we have received a block header in "header first" propagation // After we have received a block header in "header first" propagation
// we need to go request the block (compact representation) from the // we need to go request the block (compact representation) from the
// same peer that gave us the header (unless we have already accepted the block) // same peer that gave us the header (unless we have already accepted the block)
fn request_compact_block(&self, bh: &BlockHeader, addr: PeerAddr) { fn request_compact_block(&self, bh: &BlockHeader, peer_info: &PeerInfo) {
self.send_block_request_to_peer(bh.hash(), addr, |peer, h| { self.send_block_request_to_peer(bh.hash(), peer_info, |peer, h| {
peer.send_compact_block_request(h) peer.send_compact_block_request(h)
}) })
} }
fn send_tx_request_to_peer<F>(&self, h: Hash, addr: PeerAddr, f: F) fn send_tx_request_to_peer<F>(&self, h: Hash, peer_info: &PeerInfo, f: F)
where where
F: Fn(&p2p::Peer, Hash) -> Result<(), p2p::Error>, F: Fn(&p2p::Peer, Hash) -> Result<(), p2p::Error>,
{ {
match self.peers().get_connected_peer(addr) { match self.peers().get_connected_peer(peer_info.addr) {
None => debug!( None => debug!(
"send_tx_request_to_peer: can't send request to peer {:?}, not connected", "send_tx_request_to_peer: can't send request to peer {:?}, not connected",
addr peer_info.addr
), ),
Some(peer) => { Some(peer) => {
if let Err(e) = f(&peer, h) { if let Err(e) = f(&peer, h) {
@ -623,15 +635,15 @@ impl NetToChainAdapter {
} }
} }
fn send_block_request_to_peer<F>(&self, h: Hash, addr: PeerAddr, f: F) fn send_block_request_to_peer<F>(&self, h: Hash, peer_info: &PeerInfo, f: F)
where where
F: Fn(&p2p::Peer, Hash) -> Result<(), p2p::Error>, F: Fn(&p2p::Peer, Hash) -> Result<(), p2p::Error>,
{ {
match self.chain().block_exists(h) { match self.chain().block_exists(h) {
Ok(false) => match self.peers().get_connected_peer(addr) { Ok(false) => match self.peers().get_connected_peer(peer_info.addr) {
None => debug!( None => debug!(
"send_block_request_to_peer: can't send request to peer {:?}, not connected", "send_block_request_to_peer: can't send request to peer {:?}, not connected",
addr peer_info.addr
), ),
Some(peer) => { Some(peer) => {
if let Err(e) = f(&peer, h) { if let Err(e) = f(&peer, h) {