Remove unsafe sync/send impl for Peers (#1938)

This is really unsafe. Fixes #1937

* Switch to better mutex
This commit is contained in:
hashmap 2018-11-06 17:59:46 +01:00 committed by Ignotus Peverell
parent 0af1f13bf9
commit 8742c334dd
2 changed files with 34 additions and 12 deletions

View file

@ -15,7 +15,7 @@
use std::fs::File;
use std::net::{SocketAddr, TcpStream};
use std::sync::Arc;
use util::RwLock;
use util::{Mutex, RwLock};
use chrono::prelude::{DateTime, Utc};
use conn;
@ -49,7 +49,7 @@ pub struct Peer {
state: Arc<RwLock<State>>,
// set of all hashes known to this peer (so no need to send)
tracking_adapter: TrackingAdapter,
connection: Option<conn::Tracker>,
connection: Option<Mutex<conn::Tracker>>,
}
impl Peer {
@ -92,7 +92,7 @@ impl Peer {
let addr = self.info.addr;
let adapter = Arc::new(self.tracking_adapter.clone());
let handler = Protocol::new(adapter, addr);
self.connection = Some(conn::listen(conn, handler));
self.connection = Some(Mutex::new(conn::listen(conn, handler)));
}
pub fn is_denied(config: &P2PConfig, peer_addr: &SocketAddr) -> bool {
@ -152,7 +152,8 @@ impl Peer {
/// Number of bytes sent to the peer
pub fn sent_bytes(&self) -> Option<u64> {
if let Some(ref tracker) = self.connection {
let sent_bytes = tracker.sent_bytes.read();
let conn = tracker.lock();
let sent_bytes = conn.sent_bytes.read();
return Some(*sent_bytes);
}
None
@ -161,7 +162,8 @@ impl Peer {
/// Number of bytes received from the peer
pub fn received_bytes(&self) -> Option<u64> {
if let Some(ref tracker) = self.connection {
let received_bytes = tracker.received_bytes.read();
let conn = tracker.lock();
let received_bytes = conn.received_bytes.read();
return Some(*received_bytes);
}
None
@ -182,6 +184,7 @@ impl Peer {
self.connection
.as_ref()
.unwrap()
.lock()
.send(ping_msg, msg::Type::Ping)
}
@ -192,6 +195,7 @@ impl Peer {
.connection
.as_ref()
.unwrap()
.lock()
.send(ban_reason_msg, msg::Type::BanReason)
{
Ok(_) => debug!("Sent ban reason {:?} to {}", ban_reason, self.info.addr),
@ -210,6 +214,7 @@ impl Peer {
self.connection
.as_ref()
.unwrap()
.lock()
.send(b, msg::Type::Block)?;
Ok(true)
} else {
@ -228,6 +233,7 @@ impl Peer {
self.connection
.as_ref()
.unwrap()
.lock()
.send(b, msg::Type::CompactBlock)?;
Ok(true)
} else {
@ -246,6 +252,7 @@ impl Peer {
self.connection
.as_ref()
.unwrap()
.lock()
.send(bh, msg::Type::Header)?;
Ok(true)
} else {
@ -266,6 +273,7 @@ impl Peer {
self.connection
.as_ref()
.unwrap()
.lock()
.send(tx, msg::Type::Transaction)?;
Ok(true)
} else {
@ -286,6 +294,7 @@ impl Peer {
self.connection
.as_ref()
.unwrap()
.lock()
.send(tx, msg::Type::StemTransaction)?;
Ok(())
}
@ -295,6 +304,7 @@ impl Peer {
self.connection
.as_ref()
.unwrap()
.lock()
.send(&Locator { hashes: locator }, msg::Type::GetHeaders)
}
@ -304,6 +314,7 @@ impl Peer {
self.connection
.as_ref()
.unwrap()
.lock()
.send(&h, msg::Type::GetBlock)
}
@ -313,12 +324,13 @@ impl Peer {
self.connection
.as_ref()
.unwrap()
.lock()
.send(&h, msg::Type::GetCompactBlock)
}
pub fn send_peer_request(&self, capab: Capabilities) -> Result<(), Error> {
debug!("Asking {} for more peers.", self.info.addr);
self.connection.as_ref().unwrap().send(
self.connection.as_ref().unwrap().lock().send(
&GetPeerAddrs {
capabilities: capab,
},
@ -331,7 +343,7 @@ impl Peer {
"Asking {} for txhashset archive at {} {}.",
self.info.addr, height, hash
);
self.connection.as_ref().unwrap().send(
self.connection.as_ref().unwrap().lock().send(
&TxHashSetRequest { hash, height },
msg::Type::TxHashSetRequest,
)
@ -339,11 +351,24 @@ impl Peer {
/// Stops the peer, closing its connection
pub fn stop(&self) {
let _ = self.connection.as_ref().unwrap().close_channel.send(());
let _ = self
.connection
.as_ref()
.unwrap()
.lock()
.close_channel
.send(());
}
fn check_connection(&self) -> bool {
match self.connection.as_ref().unwrap().error_channel.try_recv() {
match self
.connection
.as_ref()
.unwrap()
.lock()
.error_channel
.try_recv()
{
Ok(Error::Serialization(e)) => {
let need_stop = {
let mut state = self.state.write();

View file

@ -42,9 +42,6 @@ pub struct Peers {
config: P2PConfig,
}
unsafe impl Send for Peers {}
unsafe impl Sync for Peers {}
impl Peers {
pub fn new(store: PeerStore, adapter: Arc<ChainAdapter>, config: P2PConfig) -> Peers {
Peers {