From 4ef4212f1fd99594f3d88a6c712642def34eab3b Mon Sep 17 00:00:00 2001 From: Antioch Peverell Date: Fri, 3 May 2019 15:56:25 +0100 Subject: [PATCH] Simplify peer connection handling (#2801) * connection no longer wrapped in an Option in peer * introduce peer.send() * remove some Arc indirection * self.send() cleanup * extract Peer:new() from connect and accept * fixup * cleanup --- p2p/src/conn.rs | 5 ++ p2p/src/peer.rs | 132 ++++++++++++++---------------------- p2p/src/serv.rs | 14 ++-- p2p/tests/peer_handshake.rs | 7 +- 4 files changed, 66 insertions(+), 92 deletions(-) diff --git a/p2p/src/conn.rs b/p2p/src/conn.rs index 8a0177cfe..e2e400a83 100644 --- a/p2p/src/conn.rs +++ b/p2p/src/conn.rs @@ -188,6 +188,11 @@ impl Tracker { Ok(()) } + + /// Schedule this connection to safely close via the async close_channel. + pub fn close(&self) { + let _ = self.close_channel.send(()); + } } /// Start listening on the provided connection and wraps it. Does not hang diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 2a302db01..1b1df25d1 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -23,9 +23,10 @@ use crate::chain; use crate::conn; use crate::core::core::hash::{Hash, Hashed}; use crate::core::pow::Difficulty; +use crate::core::ser::Writeable; use crate::core::{core, global}; use crate::handshake::Handshake; -use crate::msg::{self, BanReason, GetPeerAddrs, Locator, Ping, TxHashSetRequest}; +use crate::msg::{self, BanReason, GetPeerAddrs, Locator, Ping, TxHashSetRequest, Type}; use crate::protocol::Protocol; use crate::types::{ Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan, @@ -50,16 +51,7 @@ pub struct Peer { state: Arc>, // set of all hashes known to this peer (so no need to send) tracking_adapter: TrackingAdapter, - connection: Option>, -} - -macro_rules! connection { - ($holder:expr) => { - match $holder.connection.as_ref() { - Some(conn) => conn.lock(), - None => return Err(Error::ConnectionClose), - } - }; + connection: Mutex, } impl fmt::Debug for Peer { @@ -70,26 +62,30 @@ impl fmt::Debug for Peer { impl Peer { // Only accept and connect can be externally used to build a peer - fn new(info: PeerInfo, adapter: Arc) -> Peer { + fn new(info: PeerInfo, conn: TcpStream, adapter: Arc) -> Peer { + let state = Arc::new(RwLock::new(State::Connected)); + let tracking_adapter = TrackingAdapter::new(adapter); + let handler = Protocol::new(Arc::new(tracking_adapter.clone()), info.clone()); + let connection = Mutex::new(conn::listen(conn, handler)); Peer { info, - state: Arc::new(RwLock::new(State::Connected)), - tracking_adapter: TrackingAdapter::new(adapter), - connection: None, + state, + tracking_adapter, + connection, } } pub fn accept( - conn: &mut TcpStream, + mut conn: TcpStream, capab: Capabilities, total_difficulty: Difficulty, hs: &Handshake, adapter: Arc, ) -> Result { debug!("accept: handshaking from {:?}", conn.peer_addr()); - let info = hs.accept(capab, total_difficulty, conn); + let info = hs.accept(capab, total_difficulty, &mut conn); match info { - Ok(peer_info) => Ok(Peer::new(peer_info, adapter)), + Ok(info) => Ok(Peer::new(info, conn, adapter)), Err(e) => { debug!( "accept: handshaking from {:?} failed with error: {:?}", @@ -105,17 +101,17 @@ impl Peer { } pub fn connect( - conn: &mut TcpStream, + mut conn: TcpStream, capab: Capabilities, total_difficulty: Difficulty, self_addr: PeerAddr, hs: &Handshake, - na: Arc, + adapter: Arc, ) -> Result { debug!("connect: handshaking with {:?}", conn.peer_addr()); - let info = hs.initiate(capab, total_difficulty, self_addr, conn); + let info = hs.initiate(capab, total_difficulty, self_addr, &mut conn); match info { - Ok(peer_info) => Ok(Peer::new(peer_info, na)), + Ok(info) => Ok(Peer::new(info, conn, adapter)), Err(e) => { debug!( "connect: handshaking with {:?} failed with error: {:?}", @@ -130,14 +126,6 @@ impl Peer { } } - /// Main peer loop listening for messages and forwarding to the rest of the - /// system. - pub fn start(&mut self, conn: TcpStream) { - let adapter = Arc::new(self.tracking_adapter.clone()); - let handler = Protocol::new(adapter, self.info.clone()); - self.connection = Some(Mutex::new(conn::listen(conn, handler))); - } - pub fn is_denied(config: &P2PConfig, peer_addr: PeerAddr) -> bool { if let Some(ref denied) = config.peers_deny { if denied.contains(&peer_addr) { @@ -171,9 +159,6 @@ impl Peer { /// Whether this peer is currently connected. pub fn is_connected(&self) -> bool { - if self.connection.is_none() { - return false; - } State::Connected == *self.state.read() } @@ -196,45 +181,31 @@ impl Peer { /// Whether the peer is considered abusive, mostly for spammy nodes pub fn is_abusive(&self) -> bool { - if let Some(ref conn) = self.connection { - let conn = conn.lock(); - let rec = conn.received_bytes.read(); - let sent = conn.sent_bytes.read(); - rec.count_per_min() > MAX_PEER_MSG_PER_MIN - || sent.count_per_min() > MAX_PEER_MSG_PER_MIN - } else { - false - } + let conn = self.connection.lock(); + let rec = conn.received_bytes.read(); + let sent = conn.sent_bytes.read(); + rec.count_per_min() > MAX_PEER_MSG_PER_MIN || sent.count_per_min() > MAX_PEER_MSG_PER_MIN } /// Number of bytes sent to the peer pub fn last_min_sent_bytes(&self) -> Option { - if let Some(ref tracker) = self.connection { - let conn = tracker.lock(); - let sent_bytes = conn.sent_bytes.read(); - return Some(sent_bytes.bytes_per_min()); - } - None + let conn = self.connection.lock(); + let sent_bytes = conn.sent_bytes.read(); + Some(sent_bytes.bytes_per_min()) } /// Number of bytes received from the peer pub fn last_min_received_bytes(&self) -> Option { - if let Some(ref tracker) = self.connection { - let conn = tracker.lock(); - let received_bytes = conn.received_bytes.read(); - return Some(received_bytes.bytes_per_min()); - } - None + let conn = self.connection.lock(); + let received_bytes = conn.received_bytes.read(); + Some(received_bytes.bytes_per_min()) } pub fn last_min_message_counts(&self) -> Option<(u64, u64)> { - if let Some(ref tracker) = self.connection { - let conn = tracker.lock(); - let received_bytes = conn.received_bytes.read(); - let sent_bytes = conn.sent_bytes.read(); - return Some((sent_bytes.count_per_min(), received_bytes.count_per_min())); - } - None + let conn = self.connection.lock(); + let received_bytes = conn.received_bytes.read(); + let sent_bytes = conn.sent_bytes.read(); + Some((sent_bytes.count_per_min(), received_bytes.count_per_min())) } /// Set this peer status to banned @@ -242,6 +213,11 @@ impl Peer { *self.state.write() = State::Banned; } + /// Send a msg with given msg_type to our peer via the connection. + fn send(&self, msg: T, msg_type: Type) -> Result<(), Error> { + self.connection.lock().send(msg, msg_type) + } + /// Send a ping to the remote peer, providing our local difficulty and /// height pub fn send_ping(&self, total_difficulty: Difficulty, height: u64) -> Result<(), Error> { @@ -249,15 +225,13 @@ impl Peer { total_difficulty, height, }; - connection!(self).send(ping_msg, msg::Type::Ping) + self.send(ping_msg, msg::Type::Ping) } /// Send the ban reason before banning pub fn send_ban_reason(&self, ban_reason: ReasonForBan) -> Result<(), Error> { let ban_reason_msg = BanReason { ban_reason }; - connection!(self) - .send(ban_reason_msg, msg::Type::BanReason) - .map(|_| ()) + self.send(ban_reason_msg, msg::Type::BanReason).map(|_| ()) } /// Sends the provided block to the remote peer. The request may be dropped @@ -265,7 +239,7 @@ impl Peer { pub fn send_block(&self, b: &core::Block) -> Result { if !self.tracking_adapter.has_recv(b.hash()) { trace!("Send block {} to {}", b.hash(), self.info.addr); - connection!(self).send(b, msg::Type::Block)?; + self.send(b, msg::Type::Block)?; Ok(true) } else { debug!( @@ -280,7 +254,7 @@ impl Peer { pub fn send_compact_block(&self, b: &core::CompactBlock) -> Result { if !self.tracking_adapter.has_recv(b.hash()) { trace!("Send compact block {} to {}", b.hash(), self.info.addr); - connection!(self).send(b, msg::Type::CompactBlock)?; + self.send(b, msg::Type::CompactBlock)?; Ok(true) } else { debug!( @@ -295,7 +269,7 @@ impl Peer { pub fn send_header(&self, bh: &core::BlockHeader) -> Result { if !self.tracking_adapter.has_recv(bh.hash()) { debug!("Send header {} to {}", bh.hash(), self.info.addr); - connection!(self).send(bh, msg::Type::Header)?; + self.send(bh, msg::Type::Header)?; Ok(true) } else { debug!( @@ -310,7 +284,7 @@ impl Peer { pub fn send_tx_kernel_hash(&self, h: Hash) -> Result { if !self.tracking_adapter.has_recv(h) { debug!("Send tx kernel hash {} to {}", h, self.info.addr); - connection!(self).send(h, msg::Type::TransactionKernel)?; + self.send(h, msg::Type::TransactionKernel)?; Ok(true) } else { debug!( @@ -338,7 +312,7 @@ impl Peer { if !self.tracking_adapter.has_recv(kernel.hash()) { debug!("Send full tx {} to {}", tx.hash(), self.info.addr); - connection!(self).send(tx, msg::Type::Transaction)?; + self.send(tx, msg::Type::Transaction)?; Ok(true) } else { debug!( @@ -355,12 +329,12 @@ impl Peer { /// embargo). pub fn send_stem_transaction(&self, tx: &core::Transaction) -> Result<(), Error> { debug!("Send (stem) tx {} to {}", tx.hash(), self.info.addr); - connection!(self).send(tx, msg::Type::StemTransaction) + self.send(tx, msg::Type::StemTransaction) } /// Sends a request for block headers from the provided block locator pub fn send_header_request(&self, locator: Vec) -> Result<(), Error> { - connection!(self).send(&Locator { hashes: locator }, msg::Type::GetHeaders) + self.send(&Locator { hashes: locator }, msg::Type::GetHeaders) } pub fn send_tx_request(&self, h: Hash) -> Result<(), Error> { @@ -368,25 +342,25 @@ impl Peer { "Requesting tx (kernel hash) {} from peer {}.", h, self.info.addr ); - connection!(self).send(&h, msg::Type::GetTransaction) + self.send(&h, msg::Type::GetTransaction) } /// Sends a request for a specific block by hash pub fn send_block_request(&self, h: Hash) -> Result<(), Error> { debug!("Requesting block {} from peer {}.", h, self.info.addr); self.tracking_adapter.push_req(h); - connection!(self).send(&h, msg::Type::GetBlock) + self.send(&h, msg::Type::GetBlock) } /// Sends a request for a specific compact block by hash pub fn send_compact_block_request(&self, h: Hash) -> Result<(), Error> { debug!("Requesting compact block {} from {}", h, self.info.addr); - connection!(self).send(&h, msg::Type::GetCompactBlock) + self.send(&h, msg::Type::GetCompactBlock) } pub fn send_peer_request(&self, capab: Capabilities) -> Result<(), Error> { trace!("Asking {} for more peers {:?}", self.info.addr, capab); - connection!(self).send( + self.send( &GetPeerAddrs { capabilities: capab, }, @@ -399,7 +373,7 @@ impl Peer { "Asking {} for txhashset archive at {} {}.", self.info.addr, height, hash ); - connection!(self).send( + self.send( &TxHashSetRequest { hash, height }, msg::Type::TxHashSetRequest, ) @@ -407,9 +381,7 @@ impl Peer { /// Stops the peer, closing its connection pub fn stop(&self) { - if let Some(conn) = self.connection.as_ref() { - let _ = conn.lock().close_channel.send(()); - } + self.connection.lock().close(); } } diff --git a/p2p/src/serv.rs b/p2p/src/serv.rs index 5cbb1614d..7b7ff05f2 100644 --- a/p2p/src/serv.rs +++ b/p2p/src/serv.rs @@ -138,19 +138,18 @@ impl Server { addr ); match TcpStream::connect_timeout(&addr.0, Duration::from_secs(10)) { - Ok(mut stream) => { + Ok(stream) => { let addr = SocketAddr::new(self.config.host, self.config.port); let total_diff = self.peers.total_difficulty()?; - let mut peer = Peer::connect( - &mut stream, + let peer = Peer::connect( + stream, self.capabilities, total_diff, PeerAddr(addr), &self.handshake, self.peers.clone(), )?; - peer.start(stream); let peer = Arc::new(peer); self.peers.add_connected(peer.clone())?; Ok(peer) @@ -168,18 +167,17 @@ impl Server { } } - fn handle_new_peer(&self, mut stream: TcpStream) -> Result<(), Error> { + fn handle_new_peer(&self, stream: TcpStream) -> Result<(), Error> { let total_diff = self.peers.total_difficulty()?; // accept the peer and add it to the server map - let mut peer = Peer::accept( - &mut stream, + let peer = Peer::accept( + stream, self.capabilities, total_diff, &self.handshake, self.peers.clone(), )?; - peer.start(stream); self.peers.add_connected(Arc::new(peer))?; Ok(()) } diff --git a/p2p/tests/peer_handshake.rs b/p2p/tests/peer_handshake.rs index e1bd035d4..3afe32c80 100644 --- a/p2p/tests/peer_handshake.rs +++ b/p2p/tests/peer_handshake.rs @@ -67,11 +67,11 @@ fn peer_handshake() { thread::sleep(time::Duration::from_secs(1)); let addr = SocketAddr::new(p2p_config.host, p2p_config.port); - let mut socket = TcpStream::connect_timeout(&addr, time::Duration::from_secs(10)).unwrap(); + let socket = TcpStream::connect_timeout(&addr, time::Duration::from_secs(10)).unwrap(); let my_addr = PeerAddr("127.0.0.1:5000".parse().unwrap()); - let mut peer = Peer::connect( - &mut socket, + let peer = Peer::connect( + socket, p2p::Capabilities::UNKNOWN, Difficulty::min(), my_addr, @@ -82,7 +82,6 @@ fn peer_handshake() { assert!(peer.info.user_agent.ends_with(env!("CARGO_PKG_VERSION"))); - peer.start(socket); thread::sleep(time::Duration::from_secs(1)); peer.send_ping(Difficulty::min(), 0).unwrap();