diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 784f5d8a2..b1d17187f 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -15,7 +15,7 @@ use std::fs::File; use std::net::{SocketAddr, TcpStream}; use std::sync::Arc; -use util::RwLock; +use util::{Mutex, RwLock}; use chrono::prelude::{DateTime, Utc}; use conn; @@ -49,7 +49,7 @@ pub struct Peer { state: Arc>, // set of all hashes known to this peer (so no need to send) tracking_adapter: TrackingAdapter, - connection: Option, + connection: Option>, } impl Peer { @@ -92,7 +92,7 @@ impl Peer { let addr = self.info.addr; let adapter = Arc::new(self.tracking_adapter.clone()); let handler = Protocol::new(adapter, addr); - self.connection = Some(conn::listen(conn, handler)); + self.connection = Some(Mutex::new(conn::listen(conn, handler))); } pub fn is_denied(config: &P2PConfig, peer_addr: &SocketAddr) -> bool { @@ -152,7 +152,8 @@ impl Peer { /// Number of bytes sent to the peer pub fn sent_bytes(&self) -> Option { if let Some(ref tracker) = self.connection { - let sent_bytes = tracker.sent_bytes.read(); + let conn = tracker.lock(); + let sent_bytes = conn.sent_bytes.read(); return Some(*sent_bytes); } None @@ -161,7 +162,8 @@ impl Peer { /// Number of bytes received from the peer pub fn received_bytes(&self) -> Option { if let Some(ref tracker) = self.connection { - let received_bytes = tracker.received_bytes.read(); + let conn = tracker.lock(); + let received_bytes = conn.received_bytes.read(); return Some(*received_bytes); } None @@ -182,6 +184,7 @@ impl Peer { self.connection .as_ref() .unwrap() + .lock() .send(ping_msg, msg::Type::Ping) } @@ -192,6 +195,7 @@ impl Peer { .connection .as_ref() .unwrap() + .lock() .send(ban_reason_msg, msg::Type::BanReason) { Ok(_) => debug!("Sent ban reason {:?} to {}", ban_reason, self.info.addr), @@ -210,6 +214,7 @@ impl Peer { self.connection .as_ref() .unwrap() + .lock() .send(b, msg::Type::Block)?; Ok(true) } else { @@ -228,6 +233,7 @@ impl Peer { self.connection .as_ref() .unwrap() + .lock() .send(b, msg::Type::CompactBlock)?; Ok(true) } else { @@ -246,6 +252,7 @@ impl Peer { self.connection .as_ref() .unwrap() + .lock() .send(bh, msg::Type::Header)?; Ok(true) } else { @@ -266,6 +273,7 @@ impl Peer { self.connection .as_ref() .unwrap() + .lock() .send(tx, msg::Type::Transaction)?; Ok(true) } else { @@ -286,6 +294,7 @@ impl Peer { self.connection .as_ref() .unwrap() + .lock() .send(tx, msg::Type::StemTransaction)?; Ok(()) } @@ -295,6 +304,7 @@ impl Peer { self.connection .as_ref() .unwrap() + .lock() .send(&Locator { hashes: locator }, msg::Type::GetHeaders) } @@ -304,6 +314,7 @@ impl Peer { self.connection .as_ref() .unwrap() + .lock() .send(&h, msg::Type::GetBlock) } @@ -313,12 +324,13 @@ impl Peer { self.connection .as_ref() .unwrap() + .lock() .send(&h, msg::Type::GetCompactBlock) } pub fn send_peer_request(&self, capab: Capabilities) -> Result<(), Error> { debug!("Asking {} for more peers.", self.info.addr); - self.connection.as_ref().unwrap().send( + self.connection.as_ref().unwrap().lock().send( &GetPeerAddrs { capabilities: capab, }, @@ -331,7 +343,7 @@ impl Peer { "Asking {} for txhashset archive at {} {}.", self.info.addr, height, hash ); - self.connection.as_ref().unwrap().send( + self.connection.as_ref().unwrap().lock().send( &TxHashSetRequest { hash, height }, msg::Type::TxHashSetRequest, ) @@ -339,11 +351,24 @@ impl Peer { /// Stops the peer, closing its connection pub fn stop(&self) { - let _ = self.connection.as_ref().unwrap().close_channel.send(()); + let _ = self + .connection + .as_ref() + .unwrap() + .lock() + .close_channel + .send(()); } fn check_connection(&self) -> bool { - match self.connection.as_ref().unwrap().error_channel.try_recv() { + match self + .connection + .as_ref() + .unwrap() + .lock() + .error_channel + .try_recv() + { Ok(Error::Serialization(e)) => { let need_stop = { let mut state = self.state.write(); diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index b47a1bf0b..96f936a15 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -42,9 +42,6 @@ pub struct Peers { config: P2PConfig, } -unsafe impl Send for Peers {} -unsafe impl Sync for Peers {} - impl Peers { pub fn new(store: PeerStore, adapter: Arc, config: P2PConfig) -> Peers { Peers {