Remove RwLock around individual peer instances (#1673)

* get rid of RwLock around peer instances

* rustfmt

* rename stuff to live_info

* rustfmt

* serialize into PeerInfoDisplay
limit live_info write lock to peer_info.update()
rename to PeerLiveInfo

* simplify broadcast logic a bit more
connected peers are connected by definition

* return true/false on broadcast so we can send to more peers

* rustfmt

* fix p2p and server tests for PeerInfoDisplay

* commit
This commit is contained in:
Antioch Peverell 2018-10-09 08:27:34 +01:00 committed by GitHub
parent 536b905690
commit 8cfe9e64ac
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 268 additions and 260 deletions

View file

@ -27,7 +27,7 @@ use core::core::hash::{Hash, Hashed};
use core::core::{OutputFeatures, OutputIdentifier, Transaction}; use core::core::{OutputFeatures, OutputIdentifier, Transaction};
use core::ser; use core::ser;
use p2p; use p2p;
use p2p::types::ReasonForBan; use p2p::types::{PeerInfoDisplay, ReasonForBan};
use pool; use pool;
use regex::Regex; use regex::Regex;
use rest::*; use rest::*;
@ -405,11 +405,10 @@ pub struct PeersConnectedHandler {
impl Handler for PeersConnectedHandler { impl Handler for PeersConnectedHandler {
fn get(&self, _req: Request<Body>) -> ResponseFuture { fn get(&self, _req: Request<Body>) -> ResponseFuture {
let mut peers = vec![]; let mut peers: Vec<PeerInfoDisplay> = vec![];
for p in &w(&self.peers).connected_peers() { for p in &w(&self.peers).connected_peers() {
let p = p.read().unwrap();
let peer_info = p.info.clone(); let peer_info = p.info.clone();
peers.push(peer_info); peers.push(peer_info.into());
} }
json_response(&peers) json_response(&peers)
} }

View file

@ -23,7 +23,7 @@ use core::core::hash::Hash;
use core::pow::Difficulty; use core::pow::Difficulty;
use msg::{read_message, write_message, Hand, Shake, SockAddr, Type, PROTOCOL_VERSION, USER_AGENT}; use msg::{read_message, write_message, Hand, Shake, SockAddr, Type, PROTOCOL_VERSION, USER_AGENT};
use peer::Peer; use peer::Peer;
use types::{Capabilities, Direction, Error, P2PConfig, PeerInfo}; use types::{Capabilities, Direction, Error, P2PConfig, PeerInfo, PeerLiveInfo};
use util::LOGGER; use util::LOGGER;
const NONCES_CAP: usize = 100; const NONCES_CAP: usize = 100;
@ -98,10 +98,12 @@ impl Handshake {
user_agent: shake.user_agent, user_agent: shake.user_agent,
addr: peer_addr, addr: peer_addr,
version: shake.version, version: shake.version,
total_difficulty: shake.total_difficulty, live_info: Arc::new(RwLock::new(PeerLiveInfo {
height: 0, total_difficulty: shake.total_difficulty,
height: 0,
last_seen: Utc::now(),
})),
direction: Direction::Outbound, direction: Direction::Outbound,
last_seen: Utc::now(),
}; };
// If denied then we want to close the connection // If denied then we want to close the connection
@ -113,7 +115,7 @@ impl Handshake {
debug!( debug!(
LOGGER, LOGGER,
"Connected! Cumulative {} offered from {:?} {:?} {:?}", "Connected! Cumulative {} offered from {:?} {:?} {:?}",
peer_info.total_difficulty.to_num(), shake.total_difficulty.to_num(),
peer_info.addr, peer_info.addr,
peer_info.user_agent, peer_info.user_agent,
peer_info.capabilities peer_info.capabilities
@ -155,10 +157,12 @@ impl Handshake {
user_agent: hand.user_agent, user_agent: hand.user_agent,
addr: extract_ip(&hand.sender_addr.0, &conn), addr: extract_ip(&hand.sender_addr.0, &conn),
version: hand.version, version: hand.version,
total_difficulty: hand.total_difficulty, live_info: Arc::new(RwLock::new(PeerLiveInfo {
height: 0, total_difficulty: hand.total_difficulty,
height: 0,
last_seen: Utc::now(),
})),
direction: Direction::Inbound, direction: Direction::Inbound,
last_seen: Utc::now(),
}; };
// At this point we know the published ip and port of the peer // At this point we know the published ip and port of the peer

View file

@ -56,11 +56,11 @@ unsafe impl Send for Peer {}
impl Peer { impl Peer {
// Only accept and connect can be externally used to build a peer // Only accept and connect can be externally used to build a peer
fn new(info: PeerInfo, na: Arc<NetAdapter>) -> Peer { fn new(info: PeerInfo, adapter: Arc<NetAdapter>) -> Peer {
Peer { Peer {
info: info, info,
state: Arc::new(RwLock::new(State::Connected)), state: Arc::new(RwLock::new(State::Connected)),
tracking_adapter: TrackingAdapter::new(na), tracking_adapter: TrackingAdapter::new(adapter),
connection: None, connection: None,
} }
} }
@ -70,10 +70,10 @@ impl Peer {
capab: Capabilities, capab: Capabilities,
total_difficulty: Difficulty, total_difficulty: Difficulty,
hs: &Handshake, hs: &Handshake,
na: Arc<NetAdapter>, adapter: Arc<NetAdapter>,
) -> Result<Peer, Error> { ) -> Result<Peer, Error> {
let info = hs.accept(capab, total_difficulty, conn)?; let info = hs.accept(capab, total_difficulty, conn)?;
Ok(Peer::new(info, na)) Ok(Peer::new(info, adapter))
} }
pub fn connect( pub fn connect(
@ -179,10 +179,14 @@ impl Peer {
/// Sends the provided block to the remote peer. The request may be dropped /// Sends the provided block to the remote peer. The request may be dropped
/// if the remote peer is known to already have the block. /// if the remote peer is known to already have the block.
pub fn send_block(&self, b: &core::Block) -> Result<(), Error> { pub fn send_block(&self, b: &core::Block) -> Result<bool, Error> {
if !self.tracking_adapter.has(b.hash()) { if !self.tracking_adapter.has(b.hash()) {
trace!(LOGGER, "Send block {} to {}", b.hash(), self.info.addr); trace!(LOGGER, "Send block {} to {}", b.hash(), self.info.addr);
self.connection.as_ref().unwrap().send(b, msg::Type::Block) self.connection
.as_ref()
.unwrap()
.send(b, msg::Type::Block)?;
Ok(true)
} else { } else {
debug!( debug!(
LOGGER, LOGGER,
@ -190,11 +194,11 @@ impl Peer {
b.hash(), b.hash(),
self.info.addr, self.info.addr,
); );
Ok(()) Ok(false)
} }
} }
pub fn send_compact_block(&self, b: &core::CompactBlock) -> Result<(), Error> { pub fn send_compact_block(&self, b: &core::CompactBlock) -> Result<bool, Error> {
if !self.tracking_adapter.has(b.hash()) { if !self.tracking_adapter.has(b.hash()) {
trace!( trace!(
LOGGER, LOGGER,
@ -205,7 +209,8 @@ impl Peer {
self.connection self.connection
.as_ref() .as_ref()
.unwrap() .unwrap()
.send(b, msg::Type::CompactBlock) .send(b, msg::Type::CompactBlock)?;
Ok(true)
} else { } else {
debug!( debug!(
LOGGER, LOGGER,
@ -213,37 +218,39 @@ impl Peer {
b.hash(), b.hash(),
self.info.addr, self.info.addr,
); );
Ok(()) Ok(false)
} }
} }
pub fn send_header(&self, bh: &core::BlockHeader) -> Result<(), Error> { pub fn send_header(&self, bh: &core::BlockHeader) -> Result<bool, Error> {
if !self.tracking_adapter.has(bh.hash()) { if !self.tracking_adapter.has(bh.hash()) {
debug!(LOGGER, "Send header {} to {}", bh.hash(), self.info.addr); debug!(LOGGER, "Send header {} to {}", bh.hash(), self.info.addr);
self.connection self.connection
.as_ref() .as_ref()
.unwrap() .unwrap()
.send(bh, msg::Type::Header) .send(bh, msg::Type::Header)?;
Ok(true)
} else { } else {
trace!( debug!(
LOGGER, LOGGER,
"Suppress header send {} to {} (already seen)", "Suppress header send {} to {} (already seen)",
bh.hash(), bh.hash(),
self.info.addr, self.info.addr,
); );
Ok(()) Ok(false)
} }
} }
/// Sends the provided transaction to the remote peer. The request may be /// Sends the provided transaction to the remote peer. The request may be
/// dropped if the remote peer is known to already have the transaction. /// dropped if the remote peer is known to already have the transaction.
pub fn send_transaction(&self, tx: &core::Transaction) -> Result<(), Error> { pub fn send_transaction(&self, tx: &core::Transaction) -> Result<bool, Error> {
if !self.tracking_adapter.has(tx.hash()) { if !self.tracking_adapter.has(tx.hash()) {
debug!(LOGGER, "Send tx {} to {}", tx.hash(), self.info.addr); debug!(LOGGER, "Send tx {} to {}", tx.hash(), self.info.addr);
self.connection self.connection
.as_ref() .as_ref()
.unwrap() .unwrap()
.send(tx, msg::Type::Transaction) .send(tx, msg::Type::Transaction)?;
Ok(true)
} else { } else {
debug!( debug!(
LOGGER, LOGGER,
@ -251,7 +258,7 @@ impl Peer {
tx.hash(), tx.hash(),
self.info.addr self.info.addr
); );
Ok(()) Ok(false)
} }
} }

View file

@ -35,8 +35,8 @@ use types::{
pub struct Peers { pub struct Peers {
pub adapter: Arc<ChainAdapter>, pub adapter: Arc<ChainAdapter>,
store: PeerStore, store: PeerStore,
peers: RwLock<HashMap<SocketAddr, Arc<RwLock<Peer>>>>, peers: RwLock<HashMap<SocketAddr, Arc<Peer>>>,
dandelion_relay: RwLock<HashMap<i64, Arc<RwLock<Peer>>>>, dandelion_relay: RwLock<HashMap<i64, Arc<Peer>>>,
config: P2PConfig, config: P2PConfig,
} }
@ -56,20 +56,19 @@ impl Peers {
/// Adds the peer to our internal peer mapping. Note that the peer is still /// Adds the peer to our internal peer mapping. Note that the peer is still
/// returned so the server can run it. /// returned so the server can run it.
pub fn add_connected(&self, peer: Arc<RwLock<Peer>>) -> Result<(), Error> { pub fn add_connected(&self, peer: Arc<Peer>) -> Result<(), Error> {
let peer_data: PeerData; let peer_data: PeerData;
let addr: SocketAddr; let addr: SocketAddr;
{ {
let p = peer.read().unwrap();
peer_data = PeerData { peer_data = PeerData {
addr: p.info.addr, addr: peer.info.addr,
capabilities: p.info.capabilities, capabilities: peer.info.capabilities,
user_agent: p.info.user_agent.clone(), user_agent: peer.info.user_agent.clone(),
flags: State::Healthy, flags: State::Healthy,
last_banned: 0, last_banned: 0,
ban_reason: ReasonForBan::None, ban_reason: ReasonForBan::None,
}; };
addr = p.info.addr.clone(); addr = peer.info.addr.clone();
} }
debug!(LOGGER, "Saving newly connected peer {}.", addr); debug!(LOGGER, "Saving newly connected peer {}.", addr);
self.save_peer(&peer_data)?; self.save_peer(&peer_data)?;
@ -96,8 +95,7 @@ impl Peers {
.insert(Utc::now().timestamp(), peer.clone()); .insert(Utc::now().timestamp(), peer.clone());
debug!( debug!(
LOGGER, LOGGER,
"Successfully updated Dandelion relay to: {}", "Successfully updated Dandelion relay to: {}", peer.info.addr
peer.read().unwrap().info.addr
); );
} }
None => debug!(LOGGER, "Could not update dandelion relay"), None => debug!(LOGGER, "Could not update dandelion relay"),
@ -105,7 +103,7 @@ impl Peers {
} }
// Get the dandelion relay // Get the dandelion relay
pub fn get_dandelion_relay(&self) -> HashMap<i64, Arc<RwLock<Peer>>> { pub fn get_dandelion_relay(&self) -> HashMap<i64, Arc<Peer>> {
self.dandelion_relay.read().unwrap().clone() self.dandelion_relay.read().unwrap().clone()
} }
@ -114,32 +112,30 @@ impl Peers {
} }
/// Get vec of peers we are currently connected to. /// Get vec of peers we are currently connected to.
pub fn connected_peers(&self) -> Vec<Arc<RwLock<Peer>>> { pub fn connected_peers(&self) -> Vec<Arc<Peer>> {
let mut res = self let mut res = self
.peers .peers
.read() .read()
.unwrap() .unwrap()
.values() .values()
.filter(|p| p.read().unwrap().is_connected()) .filter(|p| p.is_connected())
.cloned() .cloned()
.collect::<Vec<_>>(); .collect::<Vec<_>>();
thread_rng().shuffle(&mut res); thread_rng().shuffle(&mut res);
res res
} }
pub fn outgoing_connected_peers(&self) -> Vec<Arc<RwLock<Peer>>> { pub fn outgoing_connected_peers(&self) -> Vec<Arc<Peer>> {
let peers = self.connected_peers(); let peers = self.connected_peers();
let res = peers let res = peers
.into_iter() .into_iter()
.filter(|x| match x.try_read() { .filter(|x| x.info.direction == Direction::Outbound)
Ok(peer) => peer.info.direction == Direction::Outbound, .collect::<Vec<_>>();
Err(_) => false,
}).collect::<Vec<_>>();
res res
} }
/// Get a peer we're connected to by address. /// Get a peer we're connected to by address.
pub fn get_connected_peer(&self, addr: &SocketAddr) -> Option<Arc<RwLock<Peer>>> { pub fn get_connected_peer(&self, addr: &SocketAddr) -> Option<Arc<Peer>> {
self.peers.read().unwrap().get(addr).map(|p| p.clone()) self.peers.read().unwrap().get(addr).map(|p| p.clone())
} }
@ -149,13 +145,13 @@ impl Peers {
.read() .read()
.unwrap() .unwrap()
.values() .values()
.filter(|p| p.read().unwrap().is_connected()) .filter(|x| x.is_connected())
.count() as u32 .count() as u32
} }
// Return vec of connected peers that currently advertise more work // Return vec of connected peers that currently advertise more work
// (total_difficulty) than we do. // (total_difficulty) than we do.
pub fn more_work_peers(&self) -> Vec<Arc<RwLock<Peer>>> { pub fn more_work_peers(&self) -> Vec<Arc<Peer>> {
let peers = self.connected_peers(); let peers = self.connected_peers();
if peers.len() == 0 { if peers.len() == 0 {
return vec![]; return vec![];
@ -165,10 +161,8 @@ impl Peers {
let mut max_peers = peers let mut max_peers = peers
.into_iter() .into_iter()
.filter(|x| match x.try_read() { .filter(|x| x.info.total_difficulty() > total_difficulty)
Ok(peer) => peer.info.total_difficulty > total_difficulty, .collect::<Vec<_>>();
Err(_) => false,
}).collect::<Vec<_>>();
thread_rng().shuffle(&mut max_peers); thread_rng().shuffle(&mut max_peers);
max_peers max_peers
@ -176,7 +170,7 @@ impl Peers {
// Return vec of connected peers that currently advertise more work // Return vec of connected peers that currently advertise more work
// (total_difficulty) than we do and are also full archival nodes. // (total_difficulty) than we do and are also full archival nodes.
pub fn more_work_archival_peers(&self) -> Vec<Arc<RwLock<Peer>>> { pub fn more_work_archival_peers(&self) -> Vec<Arc<Peer>> {
let peers = self.connected_peers(); let peers = self.connected_peers();
if peers.len() == 0 { if peers.len() == 0 {
return vec![]; return vec![];
@ -186,12 +180,9 @@ impl Peers {
let mut max_peers = peers let mut max_peers = peers
.into_iter() .into_iter()
.filter(|x| match x.try_read() { .filter(|x| {
Ok(peer) => { x.info.total_difficulty() > total_difficulty
peer.info.total_difficulty > total_difficulty && x.info.capabilities.contains(Capabilities::FULL_HIST)
&& peer.info.capabilities.contains(Capabilities::FULL_HIST)
}
Err(_) => false,
}).collect::<Vec<_>>(); }).collect::<Vec<_>>();
thread_rng().shuffle(&mut max_peers); thread_rng().shuffle(&mut max_peers);
@ -199,18 +190,18 @@ impl Peers {
} }
/// Returns single random peer with more work than us. /// Returns single random peer with more work than us.
pub fn more_work_peer(&self) -> Option<Arc<RwLock<Peer>>> { pub fn more_work_peer(&self) -> Option<Arc<Peer>> {
self.more_work_peers().pop() self.more_work_peers().pop()
} }
/// Returns single random archival peer with more work than us. /// Returns single random archival peer with more work than us.
pub fn more_work_archival_peer(&self) -> Option<Arc<RwLock<Peer>>> { pub fn more_work_archival_peer(&self) -> Option<Arc<Peer>> {
self.more_work_archival_peers().pop() self.more_work_archival_peers().pop()
} }
/// Return vec of connected peers that currently have the most worked /// Return vec of connected peers that currently have the most worked
/// branch, showing the highest total difficulty. /// branch, showing the highest total difficulty.
pub fn most_work_peers(&self) -> Vec<Arc<RwLock<Peer>>> { pub fn most_work_peers(&self) -> Vec<Arc<Peer>> {
let peers = self.connected_peers(); let peers = self.connected_peers();
if peers.len() == 0 { if peers.len() == 0 {
return vec![]; return vec![];
@ -218,18 +209,14 @@ impl Peers {
let max_total_difficulty = peers let max_total_difficulty = peers
.iter() .iter()
.map(|x| match x.try_read() { .map(|x| x.info.total_difficulty())
Ok(peer) => peer.info.total_difficulty.clone(), .max()
Err(_) => Difficulty::zero(),
}).max()
.unwrap(); .unwrap();
let mut max_peers = peers let mut max_peers = peers
.into_iter() .into_iter()
.filter(|x| match x.try_read() { .filter(|x| x.info.total_difficulty() == max_total_difficulty)
Ok(peer) => peer.info.total_difficulty == max_total_difficulty, .collect::<Vec<_>>();
Err(_) => false,
}).collect::<Vec<_>>();
thread_rng().shuffle(&mut max_peers); thread_rng().shuffle(&mut max_peers);
max_peers max_peers
@ -237,7 +224,7 @@ impl Peers {
/// Returns single random peer with the most worked branch, showing the /// Returns single random peer with the most worked branch, showing the
/// highest total difficulty. /// highest total difficulty.
pub fn most_work_peer(&self) -> Option<Arc<RwLock<Peer>>> { pub fn most_work_peer(&self) -> Option<Arc<Peer>> {
self.most_work_peers().pop() self.most_work_peers().pop()
} }
@ -259,7 +246,6 @@ impl Peers {
if let Some(peer) = self.get_connected_peer(peer_addr) { if let Some(peer) = self.get_connected_peer(peer_addr) {
debug!(LOGGER, "Banning peer {}", peer_addr); debug!(LOGGER, "Banning peer {}", peer_addr);
// setting peer status will get it removed at the next clean_peer // setting peer status will get it removed at the next clean_peer
let peer = peer.write().unwrap();
peer.send_ban_reason(ban_reason); peer.send_ban_reason(ban_reason);
peer.set_banned(); peer.set_banned();
peer.stop(); peer.stop();
@ -282,27 +268,19 @@ impl Peers {
}; };
} }
fn broadcast<F>(&self, obj_name: &str, num_peers: u32, f: F) -> u32 fn broadcast<F>(&self, obj_name: &str, num_peers: u32, inner: F) -> u32
where where
F: Fn(&Peer) -> Result<(), Error>, F: Fn(&Peer) -> Result<bool, Error>,
{ {
let peers = self.connected_peers();
let mut count = 0; let mut count = 0;
// Iterate over our connected peers. // Iterate over our connected peers.
// Try our best to send to at most num_peers peers. // Try our best to send to at most num_peers peers.
for p in peers.iter() { for p in self.connected_peers().iter() {
match p.try_read() { match inner(&p) {
Ok(p) => { Ok(true) => count += 1,
if p.is_connected() { Ok(false) => (),
if let Err(e) = f(&p) { Err(e) => debug!(LOGGER, "Error sending {} to peer: {:?}", obj_name, e),
debug!(LOGGER, "Error sending {} to peer: {:?}", obj_name, e);
} else {
count += 1;
}
}
}
Err(_) => (),
} }
if count >= num_peers { if count >= num_peers {
@ -361,7 +339,6 @@ impl Peers {
return Err(Error::NoDandelionRelay); return Err(Error::NoDandelionRelay);
} }
for relay in dandelion_relay.values() { for relay in dandelion_relay.values() {
let relay = relay.read().unwrap();
if relay.is_connected() { if relay.is_connected() {
if let Err(e) = relay.send_stem_transaction(tx) { if let Err(e) = relay.send_stem_transaction(tx) {
debug!( debug!(
@ -395,7 +372,6 @@ impl Peers {
pub fn check_all(&self, total_difficulty: Difficulty, height: u64) { pub fn check_all(&self, total_difficulty: Difficulty, height: u64) {
let peers_map = self.peers.read().unwrap(); let peers_map = self.peers.read().unwrap();
for p in peers_map.values() { for p in peers_map.values() {
let p = p.read().unwrap();
if p.is_connected() { if p.is_connected() {
let _ = p.send_ping(total_difficulty, height); let _ = p.send_ping(total_difficulty, height);
} }
@ -442,18 +418,11 @@ impl Peers {
// build a list of peers to be cleaned up // build a list of peers to be cleaned up
for peer in self.peers.read().unwrap().values() { for peer in self.peers.read().unwrap().values() {
let peer_inner = peer.read().unwrap(); if peer.is_banned() {
if peer_inner.is_banned() { debug!(LOGGER, "clean_peers {:?}, peer banned", peer.info.addr);
debug!(
LOGGER,
"clean_peers {:?}, peer banned", peer_inner.info.addr
);
rm.push(peer.clone()); rm.push(peer.clone());
} else if !peer_inner.is_connected() { } else if !peer.is_connected() {
debug!( debug!(LOGGER, "clean_peers {:?}, not connected", peer.info.addr);
LOGGER,
"clean_peers {:?}, not connected", peer_inner.info.addr
);
rm.push(peer.clone()); rm.push(peer.clone());
} }
} }
@ -462,7 +431,6 @@ impl Peers {
{ {
let mut peers = self.peers.write().unwrap(); let mut peers = self.peers.write().unwrap();
for p in rm { for p in rm {
let p = p.read().unwrap();
peers.remove(&p.info.addr); peers.remove(&p.info.addr);
} }
} }
@ -478,14 +446,11 @@ impl Peers {
}; };
// map peers to addrs in a block to bound how long we keep the read lock for // map peers to addrs in a block to bound how long we keep the read lock for
let addrs = { let addrs = self
self.connected_peers() .connected_peers()
.iter() .iter()
.map(|x| { .map(|x| x.info.addr.clone())
let p = x.read().unwrap(); .collect::<Vec<_>>();
p.info.addr.clone()
}).collect::<Vec<_>>()
};
// now remove them taking a short-lived write lock each time // now remove them taking a short-lived write lock each time
// maybe better to take write lock once and remove them all? // maybe better to take write lock once and remove them all?
@ -498,7 +463,6 @@ impl Peers {
pub fn stop(&self) { pub fn stop(&self) {
let mut peers = self.peers.write().unwrap(); let mut peers = self.peers.write().unwrap();
for (_, peer) in peers.drain() { for (_, peer) in peers.drain() {
let peer = peer.read().unwrap();
peer.stop(); peer.stop();
} }
} }
@ -645,16 +609,12 @@ impl NetAdapter for Peers {
fn peer_difficulty(&self, addr: SocketAddr, diff: Difficulty, height: u64) { fn peer_difficulty(&self, addr: SocketAddr, diff: Difficulty, height: u64) {
if let Some(peer) = self.get_connected_peer(&addr) { if let Some(peer) = self.get_connected_peer(&addr) {
let mut peer = peer.write().unwrap(); peer.info.update(height, diff);
peer.info.total_difficulty = diff;
peer.info.height = height;
peer.info.last_seen = Utc::now();
} }
} }
fn is_banned(&self, addr: SocketAddr) -> bool { fn is_banned(&self, addr: SocketAddr) -> bool {
if let Some(peer) = self.get_connected_peer(&addr) { if let Some(peer) = self.get_connected_peer(&addr) {
let peer = peer.read().unwrap();
peer.is_banned() peer.is_banned()
} else { } else {
false false

View file

@ -15,7 +15,7 @@
use std::fs::File; use std::fs::File;
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream}; use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock}; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use std::{io, thread}; use std::{io, thread};
@ -127,7 +127,7 @@ impl Server {
/// Asks the server to connect to a new peer. Directly returns the peer if /// Asks the server to connect to a new peer. Directly returns the peer if
/// we're already connected to the provided address. /// we're already connected to the provided address.
pub fn connect(&self, addr: &SocketAddr) -> Result<Arc<RwLock<Peer>>, Error> { pub fn connect(&self, addr: &SocketAddr) -> Result<Arc<Peer>, Error> {
if Peer::is_denied(&self.config, &addr) { if Peer::is_denied(&self.config, &addr) {
debug!( debug!(
LOGGER, LOGGER,
@ -163,18 +163,16 @@ impl Server {
let addr = SocketAddr::new(self.config.host, self.config.port); let addr = SocketAddr::new(self.config.host, self.config.port);
let total_diff = self.peers.total_difficulty(); let total_diff = self.peers.total_difficulty();
let peer = Arc::new(RwLock::new(Peer::connect( let mut peer = Peer::connect(
&mut stream, &mut stream,
self.capabilities, self.capabilities,
total_diff, total_diff,
addr, addr,
&self.handshake, &self.handshake,
self.peers.clone(), self.peers.clone(),
)?)); )?;
{ peer.start(stream);
let mut peer = peer.write().unwrap(); let peer = Arc::new(peer);
peer.start(stream);
}
self.peers.add_connected(peer.clone())?; self.peers.add_connected(peer.clone())?;
Ok(peer) Ok(peer)
} }
@ -196,18 +194,15 @@ impl Server {
let total_diff = self.peers.total_difficulty(); let total_diff = self.peers.total_difficulty();
// accept the peer and add it to the server map // accept the peer and add it to the server map
let peer = Arc::new(RwLock::new(Peer::accept( let mut peer = Peer::accept(
&mut stream, &mut stream,
self.capabilities, self.capabilities,
total_diff, total_diff,
&self.handshake, &self.handshake,
self.peers.clone(), self.peers.clone(),
)?)); )?;
{ peer.start(stream);
let mut peer = peer.write().unwrap(); self.peers.add_connected(Arc::new(peer))?;
peer.start(stream);
}
self.peers.add_connected(peer)?;
Ok(()) Ok(())
} }

View file

@ -17,6 +17,7 @@ use std::fs::File;
use std::io; use std::io;
use std::net::{IpAddr, SocketAddr}; use std::net::{IpAddr, SocketAddr};
use std::sync::mpsc; use std::sync::mpsc;
use std::sync::{Arc, RwLock};
use chrono::prelude::*; use chrono::prelude::*;
@ -235,17 +236,75 @@ enum_from_primitive! {
} }
} }
#[derive(Clone, Debug)]
pub struct PeerLiveInfo {
pub total_difficulty: Difficulty,
pub height: u64,
pub last_seen: DateTime<Utc>,
}
/// General information about a connected peer that's useful to other modules. /// General information about a connected peer that's useful to other modules.
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug)]
pub struct PeerInfo { pub struct PeerInfo {
pub capabilities: Capabilities, pub capabilities: Capabilities,
pub user_agent: String, pub user_agent: String,
pub version: u32, pub version: u32,
pub addr: SocketAddr, pub addr: SocketAddr,
pub direction: Direction,
pub live_info: Arc<RwLock<PeerLiveInfo>>,
}
impl PeerInfo {
/// The current total_difficulty of the peer.
pub fn total_difficulty(&self) -> Difficulty {
self.live_info.read().unwrap().total_difficulty
}
/// The current height of the peer.
pub fn height(&self) -> u64 {
self.live_info.read().unwrap().height
}
/// Time of last_seen for this peer (via ping/pong).
pub fn last_seen(&self) -> DateTime<Utc> {
self.live_info.read().unwrap().last_seen
}
/// Update the total_difficulty, height and last_seen of the peer.
/// Takes a write lock on the live_info.
pub fn update(&self, height: u64, total_difficulty: Difficulty) {
let mut live_info = self.live_info.write().unwrap();
live_info.height = height;
live_info.total_difficulty = total_difficulty;
live_info.last_seen = Utc::now()
}
}
/// Flatten out a PeerInfo and nested PeerLiveInfo (taking a read lock on it)
/// so we can serialize/deserialize the data for the API and the TUI.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PeerInfoDisplay {
pub capabilities: Capabilities,
pub user_agent: String,
pub version: u32,
pub addr: SocketAddr,
pub direction: Direction,
pub total_difficulty: Difficulty, pub total_difficulty: Difficulty,
pub height: u64, pub height: u64,
pub direction: Direction, }
pub last_seen: DateTime<Utc>,
impl From<PeerInfo> for PeerInfoDisplay {
fn from(info: PeerInfo) -> PeerInfoDisplay {
PeerInfoDisplay {
capabilities: info.capabilities.clone(),
user_agent: info.user_agent.clone(),
version: info.version.clone(),
addr: info.addr.clone(),
direction: info.direction.clone(),
total_difficulty: info.total_difficulty(),
height: info.height(),
}
}
} }
/// The full txhashset data along with indexes required for a consumer to /// The full txhashset data along with indexes required for a consumer to

View file

@ -90,7 +90,6 @@ fn peer_handshake() {
thread::sleep(time::Duration::from_secs(1)); thread::sleep(time::Duration::from_secs(1));
let server_peer = server.peers.get_connected_peer(&my_addr).unwrap(); let server_peer = server.peers.get_connected_peer(&my_addr).unwrap();
let server_peer = server_peer.read().unwrap(); assert_eq!(server_peer.info.total_difficulty(), Difficulty::one());
assert_eq!(server_peer.info.total_difficulty, Difficulty::one());
assert!(server.peers.peer_count() > 0); assert!(server.peers.peer_count() > 0);
} }

View file

@ -571,23 +571,26 @@ impl NetToChainAdapter {
F: Fn(&p2p::Peer, Hash) -> Result<(), p2p::Error>, F: Fn(&p2p::Peer, Hash) -> Result<(), p2p::Error>,
{ {
match w(&self.chain).block_exists(h) { match w(&self.chain).block_exists(h) {
Ok(false) => { Ok(false) => match wo(&self.peers).get_connected_peer(addr) {
match wo(&self.peers).get_connected_peer(addr) { None => debug!(
None => debug!(LOGGER, "send_block_request_to_peer: can't send request to peer {:?}, not connected", addr), LOGGER,
Some(peer) => { "send_block_request_to_peer: can't send request to peer {:?}, not connected",
match peer.read() { addr
Err(e) => debug!(LOGGER, "send_block_request_to_peer: can't send request to peer {:?}, read fails: {:?}", addr, e), ),
Ok(p) => { Some(peer) => {
if let Err(e) = f(&p, h) { if let Err(e) = f(&peer, h) {
error!(LOGGER, "send_block_request_to_peer: failed: {:?}", e) error!(LOGGER, "send_block_request_to_peer: failed: {:?}", e)
}
}
}
} }
} }
} },
Ok(true) => debug!(LOGGER, "send_block_request_to_peer: block {} already known", h), Ok(true) => debug!(
Err(e) => error!(LOGGER, "send_block_request_to_peer: failed to check block exists: {:?}", e) LOGGER,
"send_block_request_to_peer: block {} already known", h
),
Err(e) => error!(
LOGGER,
"send_block_request_to_peer: failed to check block exists: {:?}", e
),
} }
} }

View file

@ -177,10 +177,10 @@ impl PeerStats {
state: state.to_string(), state: state.to_string(),
addr: addr, addr: addr,
version: peer.info.version, version: peer.info.version,
total_difficulty: peer.info.total_difficulty.to_num(), total_difficulty: peer.info.total_difficulty().to_num(),
height: peer.info.height, height: peer.info.height(),
direction: direction.to_string(), direction: direction.to_string(),
last_seen: peer.info.last_seen, last_seen: peer.info.last_seen(),
} }
} }
} }

View file

@ -159,16 +159,12 @@ fn monitor_peers(
// ask them for their list of peers // ask them for their list of peers
let mut connected_peers: Vec<SocketAddr> = vec![]; let mut connected_peers: Vec<SocketAddr> = vec![];
for p in peers.connected_peers() { for p in peers.connected_peers() {
if let Ok(p) = p.try_read() { debug!(
debug!( LOGGER,
LOGGER, "monitor_peers: {}:{} ask {} for more peers", config.host, config.port, p.info.addr,
"monitor_peers: {}:{} ask {} for more peers", config.host, config.port, p.info.addr, );
); let _ = p.send_peer_request(capabilities);
let _ = p.send_peer_request(capabilities); connected_peers.push(p.info.addr)
connected_peers.push(p.info.addr)
} else {
warn!(LOGGER, "monitor_peers: failed to get read lock on peer");
}
} }
// Attempt to connect to preferred peers if there is some // Attempt to connect to preferred peers if there is some
@ -286,9 +282,7 @@ fn listen_for_addrs(
for _ in 0..3 { for _ in 0..3 {
match p2p_c.connect(&addr) { match p2p_c.connect(&addr) {
Ok(p) => { Ok(p) => {
if let Ok(p) = p.try_read() { let _ = p.send_peer_request(capab);
let _ = p.send_peer_request(capab);
}
let _ = peers_c.update_state(addr, p2p::State::Healthy); let _ = peers_c.update_state(addr, p2p::State::Healthy);
break; break;
} }
@ -326,8 +320,7 @@ pub fn dns_seeds() -> Box<Fn() -> Vec<SocketAddr> + Send> {
.map(|mut addr| { .map(|mut addr| {
addr.set_port(13414); addr.set_port(13414);
addr addr
}) }).filter(|addr| !temp_addresses.contains(addr))
.filter(|addr| !temp_addresses.contains(addr))
.collect()), .collect()),
), ),
Err(e) => debug!( Err(e) => debug!(

View file

@ -442,10 +442,8 @@ impl Server {
.peers .peers
.connected_peers() .connected_peers()
.into_iter() .into_iter()
.map(|p| { .map(|p| PeerStats::from_peer(&p))
let p = p.read().unwrap(); .collect();
PeerStats::from_peer(&p)
}).collect();
Ok(ServerStats { Ok(ServerStats {
peer_count: self.peer_count(), peer_count: self.peer_count(),
head: self.head(), head: self.head(),

View file

@ -159,12 +159,10 @@ impl BodySync {
self.peers.more_work_peer() self.peers.more_work_peer()
}; };
if let Some(peer) = peer { if let Some(peer) = peer {
if let Ok(peer) = peer.try_read() { if let Err(e) = peer.send_block_request(*hash) {
if let Err(e) = peer.send_block_request(*hash) { debug!(LOGGER, "Skipped request to {}: {:?}", peer.info.addr, e);
debug!(LOGGER, "Skipped request to {}: {:?}", peer.info.addr, e); } else {
} else { self.body_sync_hashes.push(hash.clone());
self.body_sync_hashes.push(hash.clone());
}
} }
} }
} }

View file

@ -124,11 +124,8 @@ impl HeaderSync {
let difficulty = header_head.total_difficulty; let difficulty = header_head.total_difficulty;
if let Some(peer) = self.peers.most_work_peer() { if let Some(peer) = self.peers.most_work_peer() {
if let Ok(p) = peer.try_read() { if peer.info.total_difficulty() > difficulty {
let peer_difficulty = p.info.total_difficulty.clone(); self.request_headers(&peer);
if peer_difficulty > difficulty {
self.request_headers(&p);
}
} }
} }
} }

View file

@ -14,7 +14,7 @@
use chrono::prelude::{DateTime, Utc}; use chrono::prelude::{DateTime, Utc};
use chrono::Duration; use chrono::Duration;
use std::sync::{Arc, RwLock}; use std::sync::Arc;
use chain; use chain;
use common::types::{Error, SyncState, SyncStatus}; use common::types::{Error, SyncState, SyncStatus};
@ -36,7 +36,7 @@ pub struct StateSync {
archive_mode: bool, archive_mode: bool,
prev_fast_sync: Option<DateTime<Utc>>, prev_fast_sync: Option<DateTime<Utc>>,
fast_sync_peer: Option<Arc<RwLock<Peer>>>, fast_sync_peer: Option<Arc<Peer>>,
} }
impl StateSync { impl StateSync {
@ -88,14 +88,12 @@ impl StateSync {
// check peer connection status of this sync // check peer connection status of this sync
if let Some(ref peer) = self.fast_sync_peer { if let Some(ref peer) = self.fast_sync_peer {
if let Ok(p) = peer.try_read() { if !peer.is_connected() && SyncStatus::TxHashsetDownload == self.sync_state.status() {
if !p.is_connected() && SyncStatus::TxHashsetDownload == self.sync_state.status() { sync_need_restart = true;
sync_need_restart = true; info!(
info!( LOGGER,
LOGGER, "fast_sync: peer connection lost: {:?}. restart", peer.info.addr,
"fast_sync: peer connection lost: {:?}. restart", p.info.addr, );
);
}
} }
} }
@ -131,38 +129,36 @@ impl StateSync {
true true
} }
fn request_state(&self, header_head: &chain::Tip) -> Result<Arc<RwLock<Peer>>, p2p::Error> { fn request_state(&self, header_head: &chain::Tip) -> Result<Arc<Peer>, p2p::Error> {
let horizon = global::cut_through_horizon() as u64; let horizon = global::cut_through_horizon() as u64;
if let Some(peer) = self.peers.most_work_peer() { if let Some(peer) = self.peers.most_work_peer() {
if let Ok(p) = peer.try_read() { // ask for txhashset at 90% of horizon, this still leaves time for download
// ask for txhashset at 90% of horizon, this still leaves time for download // and validation to happen and stay within horizon
// and validation to happen and stay within horizon let mut txhashset_head = self
let mut txhashset_head = self .chain
.get_block_header(&header_head.prev_block_h)
.unwrap();
for _ in 0..(horizon - horizon / 10) {
txhashset_head = self
.chain .chain
.get_block_header(&header_head.prev_block_h) .get_block_header(&txhashset_head.previous)
.unwrap(); .unwrap();
for _ in 0..(horizon - horizon / 10) {
txhashset_head = self
.chain
.get_block_header(&txhashset_head.previous)
.unwrap();
}
let bhash = txhashset_head.hash();
debug!(
LOGGER,
"fast_sync: before txhashset request, header head: {} / {}, txhashset_head: {} / {}",
header_head.height,
header_head.last_block_h,
txhashset_head.height,
bhash
);
if let Err(e) = p.send_txhashset_request(txhashset_head.height, bhash) {
error!(LOGGER, "fast_sync: send_txhashset_request err! {:?}", e);
return Err(e);
}
return Ok(peer.clone());
} }
let bhash = txhashset_head.hash();
debug!(
LOGGER,
"fast_sync: before txhashset request, header head: {} / {}, txhashset_head: {} / {}",
header_head.height,
header_head.last_block_h,
txhashset_head.height,
bhash
);
if let Err(e) = peer.send_txhashset_request(txhashset_head.height, bhash) {
error!(LOGGER, "fast_sync: send_txhashset_request err! {:?}", e);
return Err(e);
}
return Ok(peer.clone());
} }
Err(p2p::Error::PeerException) Err(p2p::Error::PeerException)
} }

View file

@ -159,22 +159,19 @@ fn needs_syncing(
// difficulty than us // difficulty than us
if is_syncing { if is_syncing {
if let Some(peer) = peer { if let Some(peer) = peer {
if let Ok(peer) = peer.try_read() { most_work_height = peer.info.height();
most_work_height = peer.info.height; if peer.info.total_difficulty() <= local_diff {
let ch = chain.head().unwrap();
info!(
LOGGER,
"synchronized at {} @ {} [{}]",
local_diff.to_num(),
ch.height,
ch.last_block_h
);
if peer.info.total_difficulty <= local_diff { let _ = chain.reset_head();
let ch = chain.head().unwrap(); return (false, most_work_height);
info!(
LOGGER,
"synchronized at {} @ {} [{}]",
local_diff.to_num(),
ch.height,
ch.last_block_h
);
let _ = chain.reset_head();
return (false, most_work_height);
}
} }
} else { } else {
warn!(LOGGER, "sync: no peers available, disabling sync"); warn!(LOGGER, "sync: no peers available, disabling sync");
@ -182,26 +179,25 @@ fn needs_syncing(
} }
} else { } else {
if let Some(peer) = peer { if let Some(peer) = peer {
if let Ok(peer) = peer.try_read() { most_work_height = peer.info.height();
most_work_height = peer.info.height;
// sum the last 5 difficulties to give us the threshold // sum the last 5 difficulties to give us the threshold
let threshold = chain let threshold = chain
.difficulty_iter() .difficulty_iter()
.filter_map(|x| x.map(|(_, x)| x).ok()) .filter_map(|x| x.map(|(_, x)| x).ok())
.take(5) .take(5)
.fold(Difficulty::zero(), |sum, val| sum + val); .fold(Difficulty::zero(), |sum, val| sum + val);
if peer.info.total_difficulty > local_diff.clone() + threshold.clone() { let peer_diff = peer.info.total_difficulty();
info!( if peer_diff > local_diff.clone() + threshold.clone() {
LOGGER, info!(
"sync: total_difficulty {}, peer_difficulty {}, threshold {} (last 5 blocks), enabling sync", LOGGER,
local_diff, "sync: total_difficulty {}, peer_difficulty {}, threshold {} (last 5 blocks), enabling sync",
peer.info.total_difficulty, local_diff,
threshold, peer_diff,
); threshold,
return (true, most_work_height); );
} return (true, most_work_height);
} }
} }
} }

View file

@ -460,12 +460,13 @@ pub fn get_peer(
pub fn get_connected_peers( pub fn get_connected_peers(
base_addr: &String, base_addr: &String,
api_server_port: u16, api_server_port: u16,
) -> Result<Vec<p2p::PeerInfo>, Error> { ) -> Result<Vec<p2p::types::PeerInfoDisplay>, Error> {
let url = format!( let url = format!(
"http://{}:{}/v1/peers/connected", "http://{}:{}/v1/peers/connected",
base_addr, api_server_port base_addr, api_server_port
); );
api::client::get::<Vec<p2p::PeerInfo>>(url.as_str(), None).map_err(|e| Error::API(e)) api::client::get::<Vec<p2p::types::PeerInfoDisplay>>(url.as_str(), None)
.map_err(|e| Error::API(e))
} }
pub fn get_all_peers( pub fn get_all_peers(

View file

@ -127,7 +127,7 @@ fn simulate_seeding() {
"http://{}:{}/v1/peers/connected", "http://{}:{}/v1/peers/connected",
&server_config.base_addr, 30020 &server_config.base_addr, 30020
); );
let peers_all = api::client::get::<Vec<p2p::PeerInfo>>(url.as_str(), None); let peers_all = api::client::get::<Vec<p2p::types::PeerInfoDisplay>>(url.as_str(), None);
assert!(peers_all.is_ok()); assert!(peers_all.is_ok());
assert_eq!(peers_all.unwrap().len(), 4); assert_eq!(peers_all.unwrap().len(), 4);

View file

@ -122,8 +122,9 @@ pub fn unban_peer(config: &ServerConfig, peer_addr: &SocketAddr, api_secret: Opt
pub fn list_connected_peers(config: &ServerConfig, api_secret: Option<String>) { pub fn list_connected_peers(config: &ServerConfig, api_secret: Option<String>) {
let mut e = term::stdout().unwrap(); let mut e = term::stdout().unwrap();
let url = format!("http://{}/v1/peers/connected", config.api_http_addr); let url = format!("http://{}/v1/peers/connected", config.api_http_addr);
let peers_info: Result<Vec<p2p::PeerInfo>, api::Error>; // let peers_info: Result<Vec<p2p::PeerInfoDisplay>, api::Error>;
peers_info = api::client::get::<Vec<p2p::PeerInfo>>(url.as_str(), api_secret);
let peers_info = api::client::get::<Vec<p2p::types::PeerInfoDisplay>>(url.as_str(), api_secret);
match peers_info.map_err(|e| Error::API(e)) { match peers_info.map_err(|e| Error::API(e)) {
Ok(connected_peers) => { Ok(connected_peers) => {
@ -134,6 +135,7 @@ pub fn list_connected_peers(config: &ServerConfig, api_secret: Option<String>) {
writeln!(e, "User agent: {}", connected_peer.user_agent).unwrap(); writeln!(e, "User agent: {}", connected_peer.user_agent).unwrap();
writeln!(e, "Version: {}", connected_peer.version).unwrap(); writeln!(e, "Version: {}", connected_peer.version).unwrap();
writeln!(e, "Peer address: {}", connected_peer.addr).unwrap(); writeln!(e, "Peer address: {}", connected_peer.addr).unwrap();
writeln!(e, "Height: {}", connected_peer.height).unwrap();
writeln!(e, "Total difficulty: {}", connected_peer.total_difficulty).unwrap(); writeln!(e, "Total difficulty: {}", connected_peer.total_difficulty).unwrap();
writeln!(e, "Direction: {:?}", connected_peer.direction).unwrap(); writeln!(e, "Direction: {:?}", connected_peer.direction).unwrap();
println!(); println!();
@ -142,6 +144,7 @@ pub fn list_connected_peers(config: &ServerConfig, api_secret: Option<String>) {
} }
Err(_) => writeln!(e, "Failed to get connected peers").unwrap(), Err(_) => writeln!(e, "Failed to get connected peers").unwrap(),
}; };
e.reset().unwrap(); e.reset().unwrap();
} }