Simplify peer connection handling (#2801)

* connection no longer wrapped in an Option in peer

* introduce peer.send()

* remove some Arc indirection

* self.send() cleanup

* extract Peer:new() from connect and accept

* fixup

* cleanup
This commit is contained in:
Antioch Peverell 2019-05-03 15:56:25 +01:00 committed by GitHub
parent 6c54c90101
commit 4ef4212f1f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 66 additions and 92 deletions

View file

@ -188,6 +188,11 @@ impl Tracker {
Ok(()) Ok(())
} }
/// Schedule this connection to safely close via the async close_channel.
pub fn close(&self) {
let _ = self.close_channel.send(());
}
} }
/// Start listening on the provided connection and wraps it. Does not hang /// Start listening on the provided connection and wraps it. Does not hang

View file

@ -23,9 +23,10 @@ use crate::chain;
use crate::conn; use crate::conn;
use crate::core::core::hash::{Hash, Hashed}; use crate::core::core::hash::{Hash, Hashed};
use crate::core::pow::Difficulty; use crate::core::pow::Difficulty;
use crate::core::ser::Writeable;
use crate::core::{core, global}; use crate::core::{core, global};
use crate::handshake::Handshake; use crate::handshake::Handshake;
use crate::msg::{self, BanReason, GetPeerAddrs, Locator, Ping, TxHashSetRequest}; use crate::msg::{self, BanReason, GetPeerAddrs, Locator, Ping, TxHashSetRequest, Type};
use crate::protocol::Protocol; use crate::protocol::Protocol;
use crate::types::{ use crate::types::{
Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan, Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan,
@ -50,16 +51,7 @@ pub struct Peer {
state: Arc<RwLock<State>>, state: Arc<RwLock<State>>,
// set of all hashes known to this peer (so no need to send) // set of all hashes known to this peer (so no need to send)
tracking_adapter: TrackingAdapter, tracking_adapter: TrackingAdapter,
connection: Option<Mutex<conn::Tracker>>, connection: Mutex<conn::Tracker>,
}
macro_rules! connection {
($holder:expr) => {
match $holder.connection.as_ref() {
Some(conn) => conn.lock(),
None => return Err(Error::ConnectionClose),
}
};
} }
impl fmt::Debug for Peer { impl fmt::Debug for Peer {
@ -70,26 +62,30 @@ impl fmt::Debug for Peer {
impl Peer { impl Peer {
// Only accept and connect can be externally used to build a peer // Only accept and connect can be externally used to build a peer
fn new(info: PeerInfo, adapter: Arc<dyn NetAdapter>) -> Peer { fn new(info: PeerInfo, conn: TcpStream, adapter: Arc<dyn NetAdapter>) -> Peer {
let state = Arc::new(RwLock::new(State::Connected));
let tracking_adapter = TrackingAdapter::new(adapter);
let handler = Protocol::new(Arc::new(tracking_adapter.clone()), info.clone());
let connection = Mutex::new(conn::listen(conn, handler));
Peer { Peer {
info, info,
state: Arc::new(RwLock::new(State::Connected)), state,
tracking_adapter: TrackingAdapter::new(adapter), tracking_adapter,
connection: None, connection,
} }
} }
pub fn accept( pub fn accept(
conn: &mut TcpStream, mut conn: TcpStream,
capab: Capabilities, capab: Capabilities,
total_difficulty: Difficulty, total_difficulty: Difficulty,
hs: &Handshake, hs: &Handshake,
adapter: Arc<dyn NetAdapter>, adapter: Arc<dyn NetAdapter>,
) -> Result<Peer, Error> { ) -> Result<Peer, Error> {
debug!("accept: handshaking from {:?}", conn.peer_addr()); debug!("accept: handshaking from {:?}", conn.peer_addr());
let info = hs.accept(capab, total_difficulty, conn); let info = hs.accept(capab, total_difficulty, &mut conn);
match info { match info {
Ok(peer_info) => Ok(Peer::new(peer_info, adapter)), Ok(info) => Ok(Peer::new(info, conn, adapter)),
Err(e) => { Err(e) => {
debug!( debug!(
"accept: handshaking from {:?} failed with error: {:?}", "accept: handshaking from {:?} failed with error: {:?}",
@ -105,17 +101,17 @@ impl Peer {
} }
pub fn connect( pub fn connect(
conn: &mut TcpStream, mut conn: TcpStream,
capab: Capabilities, capab: Capabilities,
total_difficulty: Difficulty, total_difficulty: Difficulty,
self_addr: PeerAddr, self_addr: PeerAddr,
hs: &Handshake, hs: &Handshake,
na: Arc<dyn NetAdapter>, adapter: Arc<dyn NetAdapter>,
) -> Result<Peer, Error> { ) -> Result<Peer, Error> {
debug!("connect: handshaking with {:?}", conn.peer_addr()); debug!("connect: handshaking with {:?}", conn.peer_addr());
let info = hs.initiate(capab, total_difficulty, self_addr, conn); let info = hs.initiate(capab, total_difficulty, self_addr, &mut conn);
match info { match info {
Ok(peer_info) => Ok(Peer::new(peer_info, na)), Ok(info) => Ok(Peer::new(info, conn, adapter)),
Err(e) => { Err(e) => {
debug!( debug!(
"connect: handshaking with {:?} failed with error: {:?}", "connect: handshaking with {:?} failed with error: {:?}",
@ -130,14 +126,6 @@ impl Peer {
} }
} }
/// Main peer loop listening for messages and forwarding to the rest of the
/// system.
pub fn start(&mut self, conn: TcpStream) {
let adapter = Arc::new(self.tracking_adapter.clone());
let handler = Protocol::new(adapter, self.info.clone());
self.connection = Some(Mutex::new(conn::listen(conn, handler)));
}
pub fn is_denied(config: &P2PConfig, peer_addr: PeerAddr) -> bool { pub fn is_denied(config: &P2PConfig, peer_addr: PeerAddr) -> bool {
if let Some(ref denied) = config.peers_deny { if let Some(ref denied) = config.peers_deny {
if denied.contains(&peer_addr) { if denied.contains(&peer_addr) {
@ -171,9 +159,6 @@ impl Peer {
/// Whether this peer is currently connected. /// Whether this peer is currently connected.
pub fn is_connected(&self) -> bool { pub fn is_connected(&self) -> bool {
if self.connection.is_none() {
return false;
}
State::Connected == *self.state.read() State::Connected == *self.state.read()
} }
@ -196,45 +181,31 @@ impl Peer {
/// Whether the peer is considered abusive, mostly for spammy nodes /// Whether the peer is considered abusive, mostly for spammy nodes
pub fn is_abusive(&self) -> bool { pub fn is_abusive(&self) -> bool {
if let Some(ref conn) = self.connection { let conn = self.connection.lock();
let conn = conn.lock(); let rec = conn.received_bytes.read();
let rec = conn.received_bytes.read(); let sent = conn.sent_bytes.read();
let sent = conn.sent_bytes.read(); rec.count_per_min() > MAX_PEER_MSG_PER_MIN || sent.count_per_min() > MAX_PEER_MSG_PER_MIN
rec.count_per_min() > MAX_PEER_MSG_PER_MIN
|| sent.count_per_min() > MAX_PEER_MSG_PER_MIN
} else {
false
}
} }
/// Number of bytes sent to the peer /// Number of bytes sent to the peer
pub fn last_min_sent_bytes(&self) -> Option<u64> { pub fn last_min_sent_bytes(&self) -> Option<u64> {
if let Some(ref tracker) = self.connection { let conn = self.connection.lock();
let conn = tracker.lock(); let sent_bytes = conn.sent_bytes.read();
let sent_bytes = conn.sent_bytes.read(); Some(sent_bytes.bytes_per_min())
return Some(sent_bytes.bytes_per_min());
}
None
} }
/// Number of bytes received from the peer /// Number of bytes received from the peer
pub fn last_min_received_bytes(&self) -> Option<u64> { pub fn last_min_received_bytes(&self) -> Option<u64> {
if let Some(ref tracker) = self.connection { let conn = self.connection.lock();
let conn = tracker.lock(); let received_bytes = conn.received_bytes.read();
let received_bytes = conn.received_bytes.read(); Some(received_bytes.bytes_per_min())
return Some(received_bytes.bytes_per_min());
}
None
} }
pub fn last_min_message_counts(&self) -> Option<(u64, u64)> { pub fn last_min_message_counts(&self) -> Option<(u64, u64)> {
if let Some(ref tracker) = self.connection { let conn = self.connection.lock();
let conn = tracker.lock(); let received_bytes = conn.received_bytes.read();
let received_bytes = conn.received_bytes.read(); let sent_bytes = conn.sent_bytes.read();
let sent_bytes = conn.sent_bytes.read(); Some((sent_bytes.count_per_min(), received_bytes.count_per_min()))
return Some((sent_bytes.count_per_min(), received_bytes.count_per_min()));
}
None
} }
/// Set this peer status to banned /// Set this peer status to banned
@ -242,6 +213,11 @@ impl Peer {
*self.state.write() = State::Banned; *self.state.write() = State::Banned;
} }
/// Send a msg with given msg_type to our peer via the connection.
fn send<T: Writeable>(&self, msg: T, msg_type: Type) -> Result<(), Error> {
self.connection.lock().send(msg, msg_type)
}
/// Send a ping to the remote peer, providing our local difficulty and /// Send a ping to the remote peer, providing our local difficulty and
/// height /// height
pub fn send_ping(&self, total_difficulty: Difficulty, height: u64) -> Result<(), Error> { pub fn send_ping(&self, total_difficulty: Difficulty, height: u64) -> Result<(), Error> {
@ -249,15 +225,13 @@ impl Peer {
total_difficulty, total_difficulty,
height, height,
}; };
connection!(self).send(ping_msg, msg::Type::Ping) self.send(ping_msg, msg::Type::Ping)
} }
/// Send the ban reason before banning /// Send the ban reason before banning
pub fn send_ban_reason(&self, ban_reason: ReasonForBan) -> Result<(), Error> { pub fn send_ban_reason(&self, ban_reason: ReasonForBan) -> Result<(), Error> {
let ban_reason_msg = BanReason { ban_reason }; let ban_reason_msg = BanReason { ban_reason };
connection!(self) self.send(ban_reason_msg, msg::Type::BanReason).map(|_| ())
.send(ban_reason_msg, msg::Type::BanReason)
.map(|_| ())
} }
/// Sends the provided block to the remote peer. The request may be dropped /// Sends the provided block to the remote peer. The request may be dropped
@ -265,7 +239,7 @@ impl Peer {
pub fn send_block(&self, b: &core::Block) -> Result<bool, Error> { pub fn send_block(&self, b: &core::Block) -> Result<bool, Error> {
if !self.tracking_adapter.has_recv(b.hash()) { if !self.tracking_adapter.has_recv(b.hash()) {
trace!("Send block {} to {}", b.hash(), self.info.addr); trace!("Send block {} to {}", b.hash(), self.info.addr);
connection!(self).send(b, msg::Type::Block)?; self.send(b, msg::Type::Block)?;
Ok(true) Ok(true)
} else { } else {
debug!( debug!(
@ -280,7 +254,7 @@ impl Peer {
pub fn send_compact_block(&self, b: &core::CompactBlock) -> Result<bool, Error> { pub fn send_compact_block(&self, b: &core::CompactBlock) -> Result<bool, Error> {
if !self.tracking_adapter.has_recv(b.hash()) { if !self.tracking_adapter.has_recv(b.hash()) {
trace!("Send compact block {} to {}", b.hash(), self.info.addr); trace!("Send compact block {} to {}", b.hash(), self.info.addr);
connection!(self).send(b, msg::Type::CompactBlock)?; self.send(b, msg::Type::CompactBlock)?;
Ok(true) Ok(true)
} else { } else {
debug!( debug!(
@ -295,7 +269,7 @@ impl Peer {
pub fn send_header(&self, bh: &core::BlockHeader) -> Result<bool, Error> { pub fn send_header(&self, bh: &core::BlockHeader) -> Result<bool, Error> {
if !self.tracking_adapter.has_recv(bh.hash()) { if !self.tracking_adapter.has_recv(bh.hash()) {
debug!("Send header {} to {}", bh.hash(), self.info.addr); debug!("Send header {} to {}", bh.hash(), self.info.addr);
connection!(self).send(bh, msg::Type::Header)?; self.send(bh, msg::Type::Header)?;
Ok(true) Ok(true)
} else { } else {
debug!( debug!(
@ -310,7 +284,7 @@ impl Peer {
pub fn send_tx_kernel_hash(&self, h: Hash) -> Result<bool, Error> { pub fn send_tx_kernel_hash(&self, h: Hash) -> Result<bool, Error> {
if !self.tracking_adapter.has_recv(h) { if !self.tracking_adapter.has_recv(h) {
debug!("Send tx kernel hash {} to {}", h, self.info.addr); debug!("Send tx kernel hash {} to {}", h, self.info.addr);
connection!(self).send(h, msg::Type::TransactionKernel)?; self.send(h, msg::Type::TransactionKernel)?;
Ok(true) Ok(true)
} else { } else {
debug!( debug!(
@ -338,7 +312,7 @@ impl Peer {
if !self.tracking_adapter.has_recv(kernel.hash()) { if !self.tracking_adapter.has_recv(kernel.hash()) {
debug!("Send full tx {} to {}", tx.hash(), self.info.addr); debug!("Send full tx {} to {}", tx.hash(), self.info.addr);
connection!(self).send(tx, msg::Type::Transaction)?; self.send(tx, msg::Type::Transaction)?;
Ok(true) Ok(true)
} else { } else {
debug!( debug!(
@ -355,12 +329,12 @@ impl Peer {
/// embargo). /// embargo).
pub fn send_stem_transaction(&self, tx: &core::Transaction) -> Result<(), Error> { pub fn send_stem_transaction(&self, tx: &core::Transaction) -> Result<(), Error> {
debug!("Send (stem) tx {} to {}", tx.hash(), self.info.addr); debug!("Send (stem) tx {} to {}", tx.hash(), self.info.addr);
connection!(self).send(tx, msg::Type::StemTransaction) self.send(tx, msg::Type::StemTransaction)
} }
/// Sends a request for block headers from the provided block locator /// Sends a request for block headers from the provided block locator
pub fn send_header_request(&self, locator: Vec<Hash>) -> Result<(), Error> { pub fn send_header_request(&self, locator: Vec<Hash>) -> Result<(), Error> {
connection!(self).send(&Locator { hashes: locator }, msg::Type::GetHeaders) self.send(&Locator { hashes: locator }, msg::Type::GetHeaders)
} }
pub fn send_tx_request(&self, h: Hash) -> Result<(), Error> { pub fn send_tx_request(&self, h: Hash) -> Result<(), Error> {
@ -368,25 +342,25 @@ impl Peer {
"Requesting tx (kernel hash) {} from peer {}.", "Requesting tx (kernel hash) {} from peer {}.",
h, self.info.addr h, self.info.addr
); );
connection!(self).send(&h, msg::Type::GetTransaction) self.send(&h, msg::Type::GetTransaction)
} }
/// Sends a request for a specific block by hash /// Sends a request for a specific block by hash
pub fn send_block_request(&self, h: Hash) -> Result<(), Error> { pub fn send_block_request(&self, h: Hash) -> Result<(), Error> {
debug!("Requesting block {} from peer {}.", h, self.info.addr); debug!("Requesting block {} from peer {}.", h, self.info.addr);
self.tracking_adapter.push_req(h); self.tracking_adapter.push_req(h);
connection!(self).send(&h, msg::Type::GetBlock) self.send(&h, msg::Type::GetBlock)
} }
/// Sends a request for a specific compact block by hash /// Sends a request for a specific compact block by hash
pub fn send_compact_block_request(&self, h: Hash) -> Result<(), Error> { pub fn send_compact_block_request(&self, h: Hash) -> Result<(), Error> {
debug!("Requesting compact block {} from {}", h, self.info.addr); debug!("Requesting compact block {} from {}", h, self.info.addr);
connection!(self).send(&h, msg::Type::GetCompactBlock) self.send(&h, msg::Type::GetCompactBlock)
} }
pub fn send_peer_request(&self, capab: Capabilities) -> Result<(), Error> { pub fn send_peer_request(&self, capab: Capabilities) -> Result<(), Error> {
trace!("Asking {} for more peers {:?}", self.info.addr, capab); trace!("Asking {} for more peers {:?}", self.info.addr, capab);
connection!(self).send( self.send(
&GetPeerAddrs { &GetPeerAddrs {
capabilities: capab, capabilities: capab,
}, },
@ -399,7 +373,7 @@ impl Peer {
"Asking {} for txhashset archive at {} {}.", "Asking {} for txhashset archive at {} {}.",
self.info.addr, height, hash self.info.addr, height, hash
); );
connection!(self).send( self.send(
&TxHashSetRequest { hash, height }, &TxHashSetRequest { hash, height },
msg::Type::TxHashSetRequest, msg::Type::TxHashSetRequest,
) )
@ -407,9 +381,7 @@ impl Peer {
/// Stops the peer, closing its connection /// Stops the peer, closing its connection
pub fn stop(&self) { pub fn stop(&self) {
if let Some(conn) = self.connection.as_ref() { self.connection.lock().close();
let _ = conn.lock().close_channel.send(());
}
} }
} }

View file

@ -138,19 +138,18 @@ impl Server {
addr addr
); );
match TcpStream::connect_timeout(&addr.0, Duration::from_secs(10)) { match TcpStream::connect_timeout(&addr.0, Duration::from_secs(10)) {
Ok(mut stream) => { Ok(stream) => {
let addr = SocketAddr::new(self.config.host, self.config.port); let addr = SocketAddr::new(self.config.host, self.config.port);
let total_diff = self.peers.total_difficulty()?; let total_diff = self.peers.total_difficulty()?;
let mut peer = Peer::connect( let peer = Peer::connect(
&mut stream, stream,
self.capabilities, self.capabilities,
total_diff, total_diff,
PeerAddr(addr), PeerAddr(addr),
&self.handshake, &self.handshake,
self.peers.clone(), self.peers.clone(),
)?; )?;
peer.start(stream);
let peer = Arc::new(peer); let peer = Arc::new(peer);
self.peers.add_connected(peer.clone())?; self.peers.add_connected(peer.clone())?;
Ok(peer) Ok(peer)
@ -168,18 +167,17 @@ impl Server {
} }
} }
fn handle_new_peer(&self, mut stream: TcpStream) -> Result<(), Error> { fn handle_new_peer(&self, stream: TcpStream) -> Result<(), Error> {
let total_diff = self.peers.total_difficulty()?; let total_diff = self.peers.total_difficulty()?;
// accept the peer and add it to the server map // accept the peer and add it to the server map
let mut peer = Peer::accept( let peer = Peer::accept(
&mut stream, stream,
self.capabilities, self.capabilities,
total_diff, total_diff,
&self.handshake, &self.handshake,
self.peers.clone(), self.peers.clone(),
)?; )?;
peer.start(stream);
self.peers.add_connected(Arc::new(peer))?; self.peers.add_connected(Arc::new(peer))?;
Ok(()) Ok(())
} }

View file

@ -67,11 +67,11 @@ fn peer_handshake() {
thread::sleep(time::Duration::from_secs(1)); thread::sleep(time::Duration::from_secs(1));
let addr = SocketAddr::new(p2p_config.host, p2p_config.port); let addr = SocketAddr::new(p2p_config.host, p2p_config.port);
let mut socket = TcpStream::connect_timeout(&addr, time::Duration::from_secs(10)).unwrap(); let socket = TcpStream::connect_timeout(&addr, time::Duration::from_secs(10)).unwrap();
let my_addr = PeerAddr("127.0.0.1:5000".parse().unwrap()); let my_addr = PeerAddr("127.0.0.1:5000".parse().unwrap());
let mut peer = Peer::connect( let peer = Peer::connect(
&mut socket, socket,
p2p::Capabilities::UNKNOWN, p2p::Capabilities::UNKNOWN,
Difficulty::min(), Difficulty::min(),
my_addr, my_addr,
@ -82,7 +82,6 @@ fn peer_handshake() {
assert!(peer.info.user_agent.ends_with(env!("CARGO_PKG_VERSION"))); assert!(peer.info.user_agent.ends_with(env!("CARGO_PKG_VERSION")));
peer.start(socket);
thread::sleep(time::Duration::from_secs(1)); thread::sleep(time::Duration::from_secs(1));
peer.send_ping(Difficulty::min(), 0).unwrap(); peer.send_ping(Difficulty::min(), 0).unwrap();