diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index e2c2a626d..d0f2c9619 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -136,9 +136,8 @@ impl Peer { /// Main peer loop listening for messages and forwarding to the rest of the /// system. pub fn start(&mut self, conn: TcpStream) { - let addr = self.info.addr; let adapter = Arc::new(self.tracking_adapter.clone()); - let handler = Protocol::new(adapter, addr); + let handler = Protocol::new(adapter, self.info.clone()); self.connection = Some(Mutex::new(conn::listen(conn, handler))); } @@ -533,9 +532,13 @@ impl ChainAdapter for TrackingAdapter { self.adapter.get_transaction(kernel_hash) } - fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr) -> Result { + fn tx_kernel_received( + &self, + kernel_hash: Hash, + peer_info: &PeerInfo, + ) -> Result { self.push_recv(kernel_hash); - self.adapter.tx_kernel_received(kernel_hash, addr) + self.adapter.tx_kernel_received(kernel_hash, peer_info) } fn transaction_received( @@ -556,34 +559,38 @@ impl ChainAdapter for TrackingAdapter { fn block_received( &self, b: core::Block, - addr: PeerAddr, + peer_info: &PeerInfo, _was_requested: bool, ) -> Result { let bh = b.hash(); self.push_recv(bh); - self.adapter.block_received(b, addr, self.has_req(bh)) + self.adapter.block_received(b, peer_info, self.has_req(bh)) } fn compact_block_received( &self, cb: core::CompactBlock, - addr: PeerAddr, + peer_info: &PeerInfo, ) -> Result { self.push_recv(cb.hash()); - self.adapter.compact_block_received(cb, addr) + self.adapter.compact_block_received(cb, peer_info) } - fn header_received(&self, bh: core::BlockHeader, addr: PeerAddr) -> Result { + fn header_received( + &self, + bh: core::BlockHeader, + peer_info: &PeerInfo, + ) -> Result { self.push_recv(bh.hash()); - self.adapter.header_received(bh, addr) + self.adapter.header_received(bh, peer_info) } fn headers_received( &self, bh: &[core::BlockHeader], - addr: PeerAddr, + peer_info: &PeerInfo, ) -> Result { - self.adapter.headers_received(bh, addr) + self.adapter.headers_received(bh, peer_info) } fn locate_headers(&self, locator: &[Hash]) -> Result, chain::Error> { @@ -606,9 +613,9 @@ impl ChainAdapter for TrackingAdapter { &self, h: Hash, txhashset_data: File, - peer_addr: PeerAddr, + peer_info: &PeerInfo, ) -> Result { - self.adapter.txhashset_write(h, txhashset_data, peer_addr) + self.adapter.txhashset_write(h, txhashset_data, peer_info) } fn txhashset_download_update( diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index 1d03436a7..53598fc53 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -30,7 +30,7 @@ use chrono::Duration; use crate::peer::Peer; use crate::store::{PeerData, PeerStore, State}; use crate::types::{ - Capabilities, ChainAdapter, Direction, Error, NetAdapter, P2PConfig, PeerAddr, ReasonForBan, + Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan, TxHashSetRead, MAX_PEER_ADDRS, }; @@ -104,12 +104,10 @@ impl Peers { } pub fn outgoing_connected_peers(&self) -> Vec> { - let peers = self.connected_peers(); - let res = peers + self.connected_peers() .into_iter() - .filter(|x| x.info.direction == Direction::Outbound) - .collect::>(); - res + .filter(|x| x.info.is_outbound()) + .collect() } /// Get a peer we're connected to by address. @@ -119,20 +117,12 @@ impl Peers { /// Number of peers currently connected to. pub fn peer_count(&self) -> u32 { - self.peers - .read() - .values() - .filter(|x| x.is_connected()) - .count() as u32 + self.connected_peers().len() as u32 } /// Number of outbound peers currently connected to. pub fn peer_outbound_count(&self) -> u32 { - self.peers - .read() - .values() - .filter(|x| x.is_connected() && x.info.is_outbound()) - .count() as u32 + self.outgoing_connected_peers().len() as u32 } // Return vec of connected peers that currently advertise more work @@ -498,8 +488,12 @@ impl ChainAdapter for Peers { self.adapter.get_transaction(kernel_hash) } - fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr) -> Result { - self.adapter.tx_kernel_received(kernel_hash, addr) + fn tx_kernel_received( + &self, + kernel_hash: Hash, + peer_info: &PeerInfo, + ) -> Result { + self.adapter.tx_kernel_received(kernel_hash, peer_info) } fn transaction_received( @@ -513,18 +507,18 @@ impl ChainAdapter for Peers { fn block_received( &self, b: core::Block, - peer_addr: PeerAddr, + peer_info: &PeerInfo, was_requested: bool, ) -> Result { let hash = b.hash(); - if !self.adapter.block_received(b, peer_addr, was_requested)? { + if !self.adapter.block_received(b, peer_info, was_requested)? { // if the peer sent us a block that's intrinsically bad // they are either mistaken or malevolent, both of which require a ban debug!( "Received a bad block {} from {}, the peer will be banned", - hash, peer_addr + hash, peer_info.addr, ); - self.ban_peer(peer_addr, ReasonForBan::BadBlock); + self.ban_peer(peer_info.addr, ReasonForBan::BadBlock); Ok(false) } else { Ok(true) @@ -534,17 +528,17 @@ impl ChainAdapter for Peers { fn compact_block_received( &self, cb: core::CompactBlock, - peer_addr: PeerAddr, + peer_info: &PeerInfo, ) -> Result { let hash = cb.hash(); - if !self.adapter.compact_block_received(cb, peer_addr)? { + if !self.adapter.compact_block_received(cb, peer_info)? { // if the peer sent us a block that's intrinsically bad // they are either mistaken or malevolent, both of which require a ban debug!( "Received a bad compact block {} from {}, the peer will be banned", - hash, peer_addr + hash, peer_info.addr ); - self.ban_peer(peer_addr, ReasonForBan::BadCompactBlock); + self.ban_peer(peer_info.addr, ReasonForBan::BadCompactBlock); Ok(false) } else { Ok(true) @@ -554,12 +548,12 @@ impl ChainAdapter for Peers { fn header_received( &self, bh: core::BlockHeader, - peer_addr: PeerAddr, + peer_info: &PeerInfo, ) -> Result { - if !self.adapter.header_received(bh, peer_addr)? { + if !self.adapter.header_received(bh, peer_info)? { // if the peer sent us a block header that's intrinsically bad // they are either mistaken or malevolent, both of which require a ban - self.ban_peer(peer_addr, ReasonForBan::BadBlockHeader); + self.ban_peer(peer_info.addr, ReasonForBan::BadBlockHeader); Ok(false) } else { Ok(true) @@ -569,12 +563,12 @@ impl ChainAdapter for Peers { fn headers_received( &self, headers: &[core::BlockHeader], - peer_addr: PeerAddr, + peer_info: &PeerInfo, ) -> Result { - if !self.adapter.headers_received(headers, peer_addr)? { + if !self.adapter.headers_received(headers, peer_info)? { // if the peer sent us a block header that's intrinsically bad // they are either mistaken or malevolent, both of which require a ban - self.ban_peer(peer_addr, ReasonForBan::BadBlockHeader); + self.ban_peer(peer_info.addr, ReasonForBan::BadBlockHeader); Ok(false) } else { Ok(true) @@ -601,14 +595,14 @@ impl ChainAdapter for Peers { &self, h: Hash, txhashset_data: File, - peer_addr: PeerAddr, + peer_info: &PeerInfo, ) -> Result { - if !self.adapter.txhashset_write(h, txhashset_data, peer_addr)? { + if !self.adapter.txhashset_write(h, txhashset_data, peer_info)? { debug!( "Received a bad txhashset data from {}, the peer will be banned", - &peer_addr + peer_info.addr ); - self.ban_peer(peer_addr, ReasonForBan::BadTxHashSet); + self.ban_peer(peer_info.addr, ReasonForBan::BadTxHashSet); Ok(false) } else { Ok(true) diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index b98df4240..413b615fd 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -27,16 +27,16 @@ use crate::msg::{ BanReason, GetPeerAddrs, Headers, Locator, PeerAddrs, Ping, Pong, TxHashSetArchive, TxHashSetRequest, Type, }; -use crate::types::{Error, NetAdapter, PeerAddr}; +use crate::types::{Error, NetAdapter, PeerInfo}; pub struct Protocol { adapter: Arc, - addr: PeerAddr, + peer_info: PeerInfo, } impl Protocol { - pub fn new(adapter: Arc, addr: PeerAddr) -> Protocol { - Protocol { adapter, addr } + pub fn new(adapter: Arc, peer_info: PeerInfo) -> Protocol { + Protocol { adapter, peer_info } } } @@ -52,10 +52,10 @@ impl MessageHandler for Protocol { // If we received a msg from a banned peer then log and drop it. // If we are getting a lot of these then maybe we are not cleaning // banned peers up correctly? - if adapter.is_banned(self.addr.clone()) { + if adapter.is_banned(self.peer_info.addr) { debug!( "handler: consume: peer {:?} banned, received: {:?}, dropping.", - self.addr, msg.header.msg_type, + self.peer_info.addr, msg.header.msg_type, ); return Ok(None); } @@ -63,7 +63,7 @@ impl MessageHandler for Protocol { match msg.header.msg_type { Type::Ping => { let ping: Ping = msg.body()?; - adapter.peer_difficulty(self.addr, ping.total_difficulty, ping.height); + adapter.peer_difficulty(self.peer_info.addr, ping.total_difficulty, ping.height); Ok(Some(Response::new( Type::Pong, @@ -77,7 +77,7 @@ impl MessageHandler for Protocol { Type::Pong => { let pong: Pong = msg.body()?; - adapter.peer_difficulty(self.addr, pong.total_difficulty, pong.height); + adapter.peer_difficulty(self.peer_info.addr, pong.total_difficulty, pong.height); Ok(None) } @@ -93,7 +93,7 @@ impl MessageHandler for Protocol { "handle_payload: received tx kernel: {}, msg_len: {}", h, msg.header.msg_len ); - adapter.tx_kernel_received(h, self.addr)?; + adapter.tx_kernel_received(h, &self.peer_info)?; Ok(None) } @@ -155,7 +155,7 @@ impl MessageHandler for Protocol { // we can't know at this level whether we requested the block or not, // the boolean should be properly set in higher level adapter - adapter.block_received(b, self.addr, false)?; + adapter.block_received(b, &self.peer_info, false)?; Ok(None) } @@ -176,7 +176,7 @@ impl MessageHandler for Protocol { ); let b: core::CompactBlock = msg.body()?; - adapter.compact_block_received(b, self.addr)?; + adapter.compact_block_received(b, &self.peer_info)?; Ok(None) } @@ -197,7 +197,7 @@ impl MessageHandler for Protocol { // we can go request it from some of our peers Type::Header => { let header: core::BlockHeader = msg.body()?; - adapter.header_received(header, self.addr)?; + adapter.header_received(header, &self.peer_info)?; Ok(None) } @@ -217,7 +217,7 @@ impl MessageHandler for Protocol { headers.push(header); total_bytes_read += bytes_read; } - adapter.headers_received(&headers, self.addr)?; + adapter.headers_received(&headers, &self.peer_info)?; } // Now check we read the correct total number of bytes off the stream. @@ -335,7 +335,7 @@ impl MessageHandler for Protocol { let tmp_zip = File::open(tmp)?; let res = self .adapter - .txhashset_write(sm_arch.hash, tmp_zip, self.addr)?; + .txhashset_write(sm_arch.hash, tmp_zip, &self.peer_info)?; debug!( "handle_payload: txhashset archive for {} at {}, DONE. Data Ok: {}", diff --git a/p2p/src/serv.rs b/p2p/src/serv.rs index 8b54f8e7c..5192b0d9e 100644 --- a/p2p/src/serv.rs +++ b/p2p/src/serv.rs @@ -28,7 +28,8 @@ use crate::peer::Peer; use crate::peers::Peers; use crate::store::PeerStore; use crate::types::{ - Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, ReasonForBan, TxHashSetRead, + Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan, + TxHashSetRead, }; use crate::util::{Mutex, StopState}; use chrono::prelude::{DateTime, Utc}; @@ -240,7 +241,7 @@ impl ChainAdapter for DummyAdapter { None } - fn tx_kernel_received(&self, _h: Hash, _addr: PeerAddr) -> Result { + fn tx_kernel_received(&self, _h: Hash, _peer_info: &PeerInfo) -> Result { Ok(true) } fn transaction_received( @@ -253,21 +254,25 @@ impl ChainAdapter for DummyAdapter { fn compact_block_received( &self, _cb: core::CompactBlock, - _addr: PeerAddr, + _peer_info: &PeerInfo, ) -> Result { Ok(true) } fn header_received( &self, _bh: core::BlockHeader, - _addr: PeerAddr, + _peer_info: &PeerInfo, ) -> Result { Ok(true) } - fn block_received(&self, _: core::Block, _: PeerAddr, _: bool) -> Result { + fn block_received(&self, _: core::Block, _: &PeerInfo, _: bool) -> Result { Ok(true) } - fn headers_received(&self, _: &[core::BlockHeader], _: PeerAddr) -> Result { + fn headers_received( + &self, + _: &[core::BlockHeader], + _: &PeerInfo, + ) -> Result { Ok(true) } fn locate_headers(&self, _: &[Hash]) -> Result, chain::Error> { @@ -288,7 +293,7 @@ impl ChainAdapter for DummyAdapter { &self, _h: Hash, _txhashset_data: File, - _peer_addr: PeerAddr, + _peer_info: &PeerInfo, ) -> Result { Ok(false) } diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 5bf2b8e4e..87028e9ea 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -483,7 +483,11 @@ pub trait ChainAdapter: Sync + Send { fn get_transaction(&self, kernel_hash: Hash) -> Option; - fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr) -> Result; + fn tx_kernel_received( + &self, + kernel_hash: Hash, + peer_info: &PeerInfo, + ) -> Result; /// A block has been received from one of our peers. Returns true if the /// block could be handled properly and is not deemed defective by the @@ -492,17 +496,21 @@ pub trait ChainAdapter: Sync + Send { fn block_received( &self, b: core::Block, - addr: PeerAddr, + peer_info: &PeerInfo, was_requested: bool, ) -> Result; fn compact_block_received( &self, cb: core::CompactBlock, - addr: PeerAddr, + peer_info: &PeerInfo, ) -> Result; - fn header_received(&self, bh: core::BlockHeader, addr: PeerAddr) -> Result; + fn header_received( + &self, + bh: core::BlockHeader, + peer_info: &PeerInfo, + ) -> Result; /// A set of block header has been received, typically in response to a /// block @@ -510,7 +518,7 @@ pub trait ChainAdapter: Sync + Send { fn headers_received( &self, bh: &[core::BlockHeader], - addr: PeerAddr, + peer_info: &PeerInfo, ) -> Result; /// Finds a list of block headers based on the provided locator. Tries to @@ -548,7 +556,7 @@ pub trait ChainAdapter: Sync + Send { &self, h: Hash, txhashset_data: File, - peer_addr: PeerAddr, + peer_peer_info: &PeerInfo, ) -> Result; } diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index 088d5db59..104701bdd 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -33,7 +33,7 @@ use crate::core::core::{BlockHeader, BlockSums, CompactBlock}; use crate::core::pow::Difficulty; use crate::core::{core, global}; use crate::p2p; -use crate::p2p::types::PeerAddr; +use crate::p2p::types::PeerInfo; use crate::pool; use crate::pool::types::DandelionConfig; use crate::util::OneTime; @@ -67,7 +67,11 @@ impl p2p::ChainAdapter for NetToChainAdapter { self.tx_pool.read().retrieve_tx_by_kernel_hash(kernel_hash) } - fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr) -> Result { + fn tx_kernel_received( + &self, + kernel_hash: Hash, + peer_info: &PeerInfo, + ) -> Result { // nothing much we can do with a new transaction while syncing if self.sync_state.is_syncing() { return Ok(true); @@ -76,7 +80,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { let tx = self.tx_pool.read().retrieve_tx_by_kernel_hash(kernel_hash); if tx.is_none() { - self.request_transaction(kernel_hash, addr); + self.request_transaction(kernel_hash, peer_info); } Ok(true) } @@ -117,32 +121,32 @@ impl p2p::ChainAdapter for NetToChainAdapter { fn block_received( &self, b: core::Block, - addr: PeerAddr, + peer_info: &PeerInfo, was_requested: bool, ) -> Result { debug!( "Received block {} at {} from {} [in/out/kern: {}/{}/{}] going to process.", b.hash(), b.header.height, - addr, + peer_info.addr, b.inputs().len(), b.outputs().len(), b.kernels().len(), ); - self.process_block(b, addr, was_requested) + self.process_block(b, peer_info, was_requested) } fn compact_block_received( &self, cb: core::CompactBlock, - addr: PeerAddr, + peer_info: &PeerInfo, ) -> Result { let bhash = cb.hash(); debug!( "Received compact_block {} at {} from {} [out/kern/kern_ids: {}/{}/{}] going to process.", bhash, cb.header.height, - addr, + peer_info.addr, cb.out_full().len(), cb.kern_full().len(), cb.kern_ids().len(), @@ -155,10 +159,10 @@ impl p2p::ChainAdapter for NetToChainAdapter { Ok(block) => { if !self.sync_state.is_syncing() { for hook in &self.hooks { - hook.on_block_received(&block, &addr); + hook.on_block_received(&block, &peer_info.addr); } } - self.process_block(block, addr, false) + self.process_block(block, peer_info, false) } Err(e) => { debug!("Invalid hydrated block {}: {:?}", cb_hash, e); @@ -196,7 +200,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { Ok(block) => { if !self.sync_state.is_syncing() { for hook in &self.hooks { - hook.on_block_received(&block, &addr); + hook.on_block_received(&block, &peer_info.addr); } } block @@ -213,11 +217,11 @@ impl p2p::ChainAdapter for NetToChainAdapter { .is_ok() { debug!("successfully hydrated block from tx pool!"); - self.process_block(block, addr, false) + self.process_block(block, peer_info, false) } else { if self.sync_state.status() == SyncStatus::NoSync { debug!("adapter: block invalid after hydration, requesting full block"); - self.request_block(&cb.header, addr); + self.request_block(&cb.header, peer_info); Ok(true) } else { debug!("block invalid after hydration, ignoring it, cause still syncing"); @@ -231,11 +235,15 @@ impl p2p::ChainAdapter for NetToChainAdapter { } } - fn header_received(&self, bh: core::BlockHeader, addr: PeerAddr) -> Result { + fn header_received( + &self, + bh: core::BlockHeader, + peer_info: &PeerInfo, + ) -> Result { let bhash = bh.hash(); debug!( "Received block header {} at {} from {}, going to process.", - bhash, bh.height, addr, + bhash, bh.height, peer_info.addr, ); // pushing the new block header through the header chain pipeline @@ -257,7 +265,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { // we have successfully processed a block header // so we can go request the block itself - self.request_compact_block(&bh, addr); + self.request_compact_block(&bh, peer_info); // done receiving the header Ok(true) @@ -266,9 +274,13 @@ impl p2p::ChainAdapter for NetToChainAdapter { fn headers_received( &self, bhs: &[core::BlockHeader], - addr: PeerAddr, + peer_info: &PeerInfo, ) -> Result { - info!("Received {} block headers from {}", bhs.len(), addr,); + info!( + "Received {} block headers from {}", + bhs.len(), + peer_info.addr + ); if bhs.len() == 0 { return Ok(false); @@ -388,7 +400,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { &self, h: Hash, txhashset_data: File, - _peer_addr: PeerAddr, + _peer_info: &PeerInfo, ) -> Result { // check status again after download, in case 2 txhashsets made it somehow if let SyncStatus::TxHashsetDownload { .. } = self.sync_state.status() { @@ -475,7 +487,7 @@ impl NetToChainAdapter { fn process_block( &self, b: core::Block, - addr: PeerAddr, + peer_info: &PeerInfo, was_requested: bool, ) -> Result { // We cannot process blocks earlier than the horizon so check for this here. @@ -514,7 +526,7 @@ impl NetToChainAdapter { && !self.sync_state.is_syncing() { debug!("process_block: received an orphan block, checking the parent: {:}", previous.hash()); - self.request_block_by_hash(previous.hash(), addr) + self.request_block_by_hash(previous.hash(), peer_info) } } Ok(true) @@ -581,39 +593,39 @@ impl NetToChainAdapter { } } - fn request_transaction(&self, h: Hash, addr: PeerAddr) { - self.send_tx_request_to_peer(h, addr, |peer, h| peer.send_tx_request(h)) + fn request_transaction(&self, h: Hash, peer_info: &PeerInfo) { + self.send_tx_request_to_peer(h, peer_info, |peer, h| peer.send_tx_request(h)) } // After receiving a compact block if we cannot successfully hydrate // it into a full block then fallback to requesting the full block // from the same peer that gave us the compact block // consider additional peers for redundancy? - fn request_block(&self, bh: &BlockHeader, addr: PeerAddr) { - self.request_block_by_hash(bh.hash(), addr) + fn request_block(&self, bh: &BlockHeader, peer_info: &PeerInfo) { + self.request_block_by_hash(bh.hash(), peer_info) } - fn request_block_by_hash(&self, h: Hash, addr: PeerAddr) { - self.send_block_request_to_peer(h, addr, |peer, h| peer.send_block_request(h)) + fn request_block_by_hash(&self, h: Hash, peer_info: &PeerInfo) { + self.send_block_request_to_peer(h, peer_info, |peer, h| peer.send_block_request(h)) } // After we have received a block header in "header first" propagation // we need to go request the block (compact representation) from the // same peer that gave us the header (unless we have already accepted the block) - fn request_compact_block(&self, bh: &BlockHeader, addr: PeerAddr) { - self.send_block_request_to_peer(bh.hash(), addr, |peer, h| { + fn request_compact_block(&self, bh: &BlockHeader, peer_info: &PeerInfo) { + self.send_block_request_to_peer(bh.hash(), peer_info, |peer, h| { peer.send_compact_block_request(h) }) } - fn send_tx_request_to_peer(&self, h: Hash, addr: PeerAddr, f: F) + fn send_tx_request_to_peer(&self, h: Hash, peer_info: &PeerInfo, f: F) where F: Fn(&p2p::Peer, Hash) -> Result<(), p2p::Error>, { - match self.peers().get_connected_peer(addr) { + match self.peers().get_connected_peer(peer_info.addr) { None => debug!( "send_tx_request_to_peer: can't send request to peer {:?}, not connected", - addr + peer_info.addr ), Some(peer) => { if let Err(e) = f(&peer, h) { @@ -623,15 +635,15 @@ impl NetToChainAdapter { } } - fn send_block_request_to_peer(&self, h: Hash, addr: PeerAddr, f: F) + fn send_block_request_to_peer(&self, h: Hash, peer_info: &PeerInfo, f: F) where F: Fn(&p2p::Peer, Hash) -> Result<(), p2p::Error>, { match self.chain().block_exists(h) { - Ok(false) => match self.peers().get_connected_peer(addr) { + Ok(false) => match self.peers().get_connected_peer(peer_info.addr) { None => debug!( "send_block_request_to_peer: can't send request to peer {:?}, not connected", - addr + peer_info.addr ), Some(peer) => { if let Err(e) = f(&peer, h) {