diff --git a/api/src/handlers.rs b/api/src/handlers.rs index 0d4c0653f..3cd1ef6e6 100644 --- a/api/src/handlers.rs +++ b/api/src/handlers.rs @@ -27,7 +27,7 @@ use core::core::hash::{Hash, Hashed}; use core::core::{OutputFeatures, OutputIdentifier, Transaction}; use core::ser; use p2p; -use p2p::types::ReasonForBan; +use p2p::types::{PeerInfoDisplay, ReasonForBan}; use pool; use regex::Regex; use rest::*; @@ -405,11 +405,10 @@ pub struct PeersConnectedHandler { impl Handler for PeersConnectedHandler { fn get(&self, _req: Request) -> ResponseFuture { - let mut peers = vec![]; + let mut peers: Vec = vec![]; for p in &w(&self.peers).connected_peers() { - let p = p.read().unwrap(); let peer_info = p.info.clone(); - peers.push(peer_info); + peers.push(peer_info.into()); } json_response(&peers) } diff --git a/p2p/src/handshake.rs b/p2p/src/handshake.rs index ce076e65e..3fa9f879b 100644 --- a/p2p/src/handshake.rs +++ b/p2p/src/handshake.rs @@ -23,7 +23,7 @@ use core::core::hash::Hash; use core::pow::Difficulty; use msg::{read_message, write_message, Hand, Shake, SockAddr, Type, PROTOCOL_VERSION, USER_AGENT}; use peer::Peer; -use types::{Capabilities, Direction, Error, P2PConfig, PeerInfo}; +use types::{Capabilities, Direction, Error, P2PConfig, PeerInfo, PeerLiveInfo}; use util::LOGGER; const NONCES_CAP: usize = 100; @@ -98,10 +98,12 @@ impl Handshake { user_agent: shake.user_agent, addr: peer_addr, version: shake.version, - total_difficulty: shake.total_difficulty, - height: 0, + live_info: Arc::new(RwLock::new(PeerLiveInfo { + total_difficulty: shake.total_difficulty, + height: 0, + last_seen: Utc::now(), + })), direction: Direction::Outbound, - last_seen: Utc::now(), }; // If denied then we want to close the connection @@ -113,7 +115,7 @@ impl Handshake { debug!( LOGGER, "Connected! Cumulative {} offered from {:?} {:?} {:?}", - peer_info.total_difficulty.to_num(), + shake.total_difficulty.to_num(), peer_info.addr, peer_info.user_agent, peer_info.capabilities @@ -155,10 +157,12 @@ impl Handshake { user_agent: hand.user_agent, addr: extract_ip(&hand.sender_addr.0, &conn), version: hand.version, - total_difficulty: hand.total_difficulty, - height: 0, + live_info: Arc::new(RwLock::new(PeerLiveInfo { + total_difficulty: hand.total_difficulty, + height: 0, + last_seen: Utc::now(), + })), direction: Direction::Inbound, - last_seen: Utc::now(), }; // At this point we know the published ip and port of the peer diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 6c8502270..2446c6c77 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -56,11 +56,11 @@ unsafe impl Send for Peer {} impl Peer { // Only accept and connect can be externally used to build a peer - fn new(info: PeerInfo, na: Arc) -> Peer { + fn new(info: PeerInfo, adapter: Arc) -> Peer { Peer { - info: info, + info, state: Arc::new(RwLock::new(State::Connected)), - tracking_adapter: TrackingAdapter::new(na), + tracking_adapter: TrackingAdapter::new(adapter), connection: None, } } @@ -70,10 +70,10 @@ impl Peer { capab: Capabilities, total_difficulty: Difficulty, hs: &Handshake, - na: Arc, + adapter: Arc, ) -> Result { let info = hs.accept(capab, total_difficulty, conn)?; - Ok(Peer::new(info, na)) + Ok(Peer::new(info, adapter)) } pub fn connect( @@ -179,10 +179,14 @@ impl Peer { /// Sends the provided block to the remote peer. The request may be dropped /// if the remote peer is known to already have the block. - pub fn send_block(&self, b: &core::Block) -> Result<(), Error> { + pub fn send_block(&self, b: &core::Block) -> Result { if !self.tracking_adapter.has(b.hash()) { trace!(LOGGER, "Send block {} to {}", b.hash(), self.info.addr); - self.connection.as_ref().unwrap().send(b, msg::Type::Block) + self.connection + .as_ref() + .unwrap() + .send(b, msg::Type::Block)?; + Ok(true) } else { debug!( LOGGER, @@ -190,11 +194,11 @@ impl Peer { b.hash(), self.info.addr, ); - Ok(()) + Ok(false) } } - pub fn send_compact_block(&self, b: &core::CompactBlock) -> Result<(), Error> { + pub fn send_compact_block(&self, b: &core::CompactBlock) -> Result { if !self.tracking_adapter.has(b.hash()) { trace!( LOGGER, @@ -205,7 +209,8 @@ impl Peer { self.connection .as_ref() .unwrap() - .send(b, msg::Type::CompactBlock) + .send(b, msg::Type::CompactBlock)?; + Ok(true) } else { debug!( LOGGER, @@ -213,37 +218,39 @@ impl Peer { b.hash(), self.info.addr, ); - Ok(()) + Ok(false) } } - pub fn send_header(&self, bh: &core::BlockHeader) -> Result<(), Error> { + pub fn send_header(&self, bh: &core::BlockHeader) -> Result { if !self.tracking_adapter.has(bh.hash()) { debug!(LOGGER, "Send header {} to {}", bh.hash(), self.info.addr); self.connection .as_ref() .unwrap() - .send(bh, msg::Type::Header) + .send(bh, msg::Type::Header)?; + Ok(true) } else { - trace!( + debug!( LOGGER, "Suppress header send {} to {} (already seen)", bh.hash(), self.info.addr, ); - Ok(()) + Ok(false) } } /// Sends the provided transaction to the remote peer. The request may be /// dropped if the remote peer is known to already have the transaction. - pub fn send_transaction(&self, tx: &core::Transaction) -> Result<(), Error> { + pub fn send_transaction(&self, tx: &core::Transaction) -> Result { if !self.tracking_adapter.has(tx.hash()) { debug!(LOGGER, "Send tx {} to {}", tx.hash(), self.info.addr); self.connection .as_ref() .unwrap() - .send(tx, msg::Type::Transaction) + .send(tx, msg::Type::Transaction)?; + Ok(true) } else { debug!( LOGGER, @@ -251,7 +258,7 @@ impl Peer { tx.hash(), self.info.addr ); - Ok(()) + Ok(false) } } diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index b56976325..8a77db5fe 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -35,8 +35,8 @@ use types::{ pub struct Peers { pub adapter: Arc, store: PeerStore, - peers: RwLock>>>, - dandelion_relay: RwLock>>>, + peers: RwLock>>, + dandelion_relay: RwLock>>, config: P2PConfig, } @@ -56,20 +56,19 @@ impl Peers { /// Adds the peer to our internal peer mapping. Note that the peer is still /// returned so the server can run it. - pub fn add_connected(&self, peer: Arc>) -> Result<(), Error> { + pub fn add_connected(&self, peer: Arc) -> Result<(), Error> { let peer_data: PeerData; let addr: SocketAddr; { - let p = peer.read().unwrap(); peer_data = PeerData { - addr: p.info.addr, - capabilities: p.info.capabilities, - user_agent: p.info.user_agent.clone(), + addr: peer.info.addr, + capabilities: peer.info.capabilities, + user_agent: peer.info.user_agent.clone(), flags: State::Healthy, last_banned: 0, ban_reason: ReasonForBan::None, }; - addr = p.info.addr.clone(); + addr = peer.info.addr.clone(); } debug!(LOGGER, "Saving newly connected peer {}.", addr); self.save_peer(&peer_data)?; @@ -96,8 +95,7 @@ impl Peers { .insert(Utc::now().timestamp(), peer.clone()); debug!( LOGGER, - "Successfully updated Dandelion relay to: {}", - peer.read().unwrap().info.addr + "Successfully updated Dandelion relay to: {}", peer.info.addr ); } None => debug!(LOGGER, "Could not update dandelion relay"), @@ -105,7 +103,7 @@ impl Peers { } // Get the dandelion relay - pub fn get_dandelion_relay(&self) -> HashMap>> { + pub fn get_dandelion_relay(&self) -> HashMap> { self.dandelion_relay.read().unwrap().clone() } @@ -114,32 +112,30 @@ impl Peers { } /// Get vec of peers we are currently connected to. - pub fn connected_peers(&self) -> Vec>> { + pub fn connected_peers(&self) -> Vec> { let mut res = self .peers .read() .unwrap() .values() - .filter(|p| p.read().unwrap().is_connected()) + .filter(|p| p.is_connected()) .cloned() .collect::>(); thread_rng().shuffle(&mut res); res } - pub fn outgoing_connected_peers(&self) -> Vec>> { + pub fn outgoing_connected_peers(&self) -> Vec> { let peers = self.connected_peers(); let res = peers .into_iter() - .filter(|x| match x.try_read() { - Ok(peer) => peer.info.direction == Direction::Outbound, - Err(_) => false, - }).collect::>(); + .filter(|x| x.info.direction == Direction::Outbound) + .collect::>(); res } /// Get a peer we're connected to by address. - pub fn get_connected_peer(&self, addr: &SocketAddr) -> Option>> { + pub fn get_connected_peer(&self, addr: &SocketAddr) -> Option> { self.peers.read().unwrap().get(addr).map(|p| p.clone()) } @@ -149,13 +145,13 @@ impl Peers { .read() .unwrap() .values() - .filter(|p| p.read().unwrap().is_connected()) + .filter(|x| x.is_connected()) .count() as u32 } // Return vec of connected peers that currently advertise more work // (total_difficulty) than we do. - pub fn more_work_peers(&self) -> Vec>> { + pub fn more_work_peers(&self) -> Vec> { let peers = self.connected_peers(); if peers.len() == 0 { return vec![]; @@ -165,10 +161,8 @@ impl Peers { let mut max_peers = peers .into_iter() - .filter(|x| match x.try_read() { - Ok(peer) => peer.info.total_difficulty > total_difficulty, - Err(_) => false, - }).collect::>(); + .filter(|x| x.info.total_difficulty() > total_difficulty) + .collect::>(); thread_rng().shuffle(&mut max_peers); max_peers @@ -176,7 +170,7 @@ impl Peers { // Return vec of connected peers that currently advertise more work // (total_difficulty) than we do and are also full archival nodes. - pub fn more_work_archival_peers(&self) -> Vec>> { + pub fn more_work_archival_peers(&self) -> Vec> { let peers = self.connected_peers(); if peers.len() == 0 { return vec![]; @@ -186,12 +180,9 @@ impl Peers { let mut max_peers = peers .into_iter() - .filter(|x| match x.try_read() { - Ok(peer) => { - peer.info.total_difficulty > total_difficulty - && peer.info.capabilities.contains(Capabilities::FULL_HIST) - } - Err(_) => false, + .filter(|x| { + x.info.total_difficulty() > total_difficulty + && x.info.capabilities.contains(Capabilities::FULL_HIST) }).collect::>(); thread_rng().shuffle(&mut max_peers); @@ -199,18 +190,18 @@ impl Peers { } /// Returns single random peer with more work than us. - pub fn more_work_peer(&self) -> Option>> { + pub fn more_work_peer(&self) -> Option> { self.more_work_peers().pop() } /// Returns single random archival peer with more work than us. - pub fn more_work_archival_peer(&self) -> Option>> { + pub fn more_work_archival_peer(&self) -> Option> { self.more_work_archival_peers().pop() } /// Return vec of connected peers that currently have the most worked /// branch, showing the highest total difficulty. - pub fn most_work_peers(&self) -> Vec>> { + pub fn most_work_peers(&self) -> Vec> { let peers = self.connected_peers(); if peers.len() == 0 { return vec![]; @@ -218,18 +209,14 @@ impl Peers { let max_total_difficulty = peers .iter() - .map(|x| match x.try_read() { - Ok(peer) => peer.info.total_difficulty.clone(), - Err(_) => Difficulty::zero(), - }).max() + .map(|x| x.info.total_difficulty()) + .max() .unwrap(); let mut max_peers = peers .into_iter() - .filter(|x| match x.try_read() { - Ok(peer) => peer.info.total_difficulty == max_total_difficulty, - Err(_) => false, - }).collect::>(); + .filter(|x| x.info.total_difficulty() == max_total_difficulty) + .collect::>(); thread_rng().shuffle(&mut max_peers); max_peers @@ -237,7 +224,7 @@ impl Peers { /// Returns single random peer with the most worked branch, showing the /// highest total difficulty. - pub fn most_work_peer(&self) -> Option>> { + pub fn most_work_peer(&self) -> Option> { self.most_work_peers().pop() } @@ -259,7 +246,6 @@ impl Peers { if let Some(peer) = self.get_connected_peer(peer_addr) { debug!(LOGGER, "Banning peer {}", peer_addr); // setting peer status will get it removed at the next clean_peer - let peer = peer.write().unwrap(); peer.send_ban_reason(ban_reason); peer.set_banned(); peer.stop(); @@ -282,27 +268,19 @@ impl Peers { }; } - fn broadcast(&self, obj_name: &str, num_peers: u32, f: F) -> u32 + fn broadcast(&self, obj_name: &str, num_peers: u32, inner: F) -> u32 where - F: Fn(&Peer) -> Result<(), Error>, + F: Fn(&Peer) -> Result, { - let peers = self.connected_peers(); let mut count = 0; // Iterate over our connected peers. // Try our best to send to at most num_peers peers. - for p in peers.iter() { - match p.try_read() { - Ok(p) => { - if p.is_connected() { - if let Err(e) = f(&p) { - debug!(LOGGER, "Error sending {} to peer: {:?}", obj_name, e); - } else { - count += 1; - } - } - } - Err(_) => (), + for p in self.connected_peers().iter() { + match inner(&p) { + Ok(true) => count += 1, + Ok(false) => (), + Err(e) => debug!(LOGGER, "Error sending {} to peer: {:?}", obj_name, e), } if count >= num_peers { @@ -361,7 +339,6 @@ impl Peers { return Err(Error::NoDandelionRelay); } for relay in dandelion_relay.values() { - let relay = relay.read().unwrap(); if relay.is_connected() { if let Err(e) = relay.send_stem_transaction(tx) { debug!( @@ -395,7 +372,6 @@ impl Peers { pub fn check_all(&self, total_difficulty: Difficulty, height: u64) { let peers_map = self.peers.read().unwrap(); for p in peers_map.values() { - let p = p.read().unwrap(); if p.is_connected() { let _ = p.send_ping(total_difficulty, height); } @@ -442,18 +418,11 @@ impl Peers { // build a list of peers to be cleaned up for peer in self.peers.read().unwrap().values() { - let peer_inner = peer.read().unwrap(); - if peer_inner.is_banned() { - debug!( - LOGGER, - "clean_peers {:?}, peer banned", peer_inner.info.addr - ); + if peer.is_banned() { + debug!(LOGGER, "clean_peers {:?}, peer banned", peer.info.addr); rm.push(peer.clone()); - } else if !peer_inner.is_connected() { - debug!( - LOGGER, - "clean_peers {:?}, not connected", peer_inner.info.addr - ); + } else if !peer.is_connected() { + debug!(LOGGER, "clean_peers {:?}, not connected", peer.info.addr); rm.push(peer.clone()); } } @@ -462,7 +431,6 @@ impl Peers { { let mut peers = self.peers.write().unwrap(); for p in rm { - let p = p.read().unwrap(); peers.remove(&p.info.addr); } } @@ -478,14 +446,11 @@ impl Peers { }; // map peers to addrs in a block to bound how long we keep the read lock for - let addrs = { - self.connected_peers() - .iter() - .map(|x| { - let p = x.read().unwrap(); - p.info.addr.clone() - }).collect::>() - }; + let addrs = self + .connected_peers() + .iter() + .map(|x| x.info.addr.clone()) + .collect::>(); // now remove them taking a short-lived write lock each time // maybe better to take write lock once and remove them all? @@ -498,7 +463,6 @@ impl Peers { pub fn stop(&self) { let mut peers = self.peers.write().unwrap(); for (_, peer) in peers.drain() { - let peer = peer.read().unwrap(); peer.stop(); } } @@ -645,16 +609,12 @@ impl NetAdapter for Peers { fn peer_difficulty(&self, addr: SocketAddr, diff: Difficulty, height: u64) { if let Some(peer) = self.get_connected_peer(&addr) { - let mut peer = peer.write().unwrap(); - peer.info.total_difficulty = diff; - peer.info.height = height; - peer.info.last_seen = Utc::now(); + peer.info.update(height, diff); } } fn is_banned(&self, addr: SocketAddr) -> bool { if let Some(peer) = self.get_connected_peer(&addr) { - let peer = peer.read().unwrap(); peer.is_banned() } else { false diff --git a/p2p/src/serv.rs b/p2p/src/serv.rs index 1f9a62ffb..b65e418a5 100644 --- a/p2p/src/serv.rs +++ b/p2p/src/serv.rs @@ -15,7 +15,7 @@ use std::fs::File; use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::time::Duration; use std::{io, thread}; @@ -127,7 +127,7 @@ impl Server { /// Asks the server to connect to a new peer. Directly returns the peer if /// we're already connected to the provided address. - pub fn connect(&self, addr: &SocketAddr) -> Result>, Error> { + pub fn connect(&self, addr: &SocketAddr) -> Result, Error> { if Peer::is_denied(&self.config, &addr) { debug!( LOGGER, @@ -163,18 +163,16 @@ impl Server { let addr = SocketAddr::new(self.config.host, self.config.port); let total_diff = self.peers.total_difficulty(); - let peer = Arc::new(RwLock::new(Peer::connect( + let mut peer = Peer::connect( &mut stream, self.capabilities, total_diff, addr, &self.handshake, self.peers.clone(), - )?)); - { - let mut peer = peer.write().unwrap(); - peer.start(stream); - } + )?; + peer.start(stream); + let peer = Arc::new(peer); self.peers.add_connected(peer.clone())?; Ok(peer) } @@ -196,18 +194,15 @@ impl Server { let total_diff = self.peers.total_difficulty(); // accept the peer and add it to the server map - let peer = Arc::new(RwLock::new(Peer::accept( + let mut peer = Peer::accept( &mut stream, self.capabilities, total_diff, &self.handshake, self.peers.clone(), - )?)); - { - let mut peer = peer.write().unwrap(); - peer.start(stream); - } - self.peers.add_connected(peer)?; + )?; + peer.start(stream); + self.peers.add_connected(Arc::new(peer))?; Ok(()) } diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 0dffd5404..c04b59330 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -17,6 +17,7 @@ use std::fs::File; use std::io; use std::net::{IpAddr, SocketAddr}; use std::sync::mpsc; +use std::sync::{Arc, RwLock}; use chrono::prelude::*; @@ -235,17 +236,75 @@ enum_from_primitive! { } } +#[derive(Clone, Debug)] +pub struct PeerLiveInfo { + pub total_difficulty: Difficulty, + pub height: u64, + pub last_seen: DateTime, +} + /// General information about a connected peer that's useful to other modules. -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug)] pub struct PeerInfo { pub capabilities: Capabilities, pub user_agent: String, pub version: u32, pub addr: SocketAddr, + pub direction: Direction, + pub live_info: Arc>, +} + +impl PeerInfo { + /// The current total_difficulty of the peer. + pub fn total_difficulty(&self) -> Difficulty { + self.live_info.read().unwrap().total_difficulty + } + + /// The current height of the peer. + pub fn height(&self) -> u64 { + self.live_info.read().unwrap().height + } + + /// Time of last_seen for this peer (via ping/pong). + pub fn last_seen(&self) -> DateTime { + self.live_info.read().unwrap().last_seen + } + + /// Update the total_difficulty, height and last_seen of the peer. + /// Takes a write lock on the live_info. + pub fn update(&self, height: u64, total_difficulty: Difficulty) { + let mut live_info = self.live_info.write().unwrap(); + live_info.height = height; + live_info.total_difficulty = total_difficulty; + live_info.last_seen = Utc::now() + } +} + +/// Flatten out a PeerInfo and nested PeerLiveInfo (taking a read lock on it) +/// so we can serialize/deserialize the data for the API and the TUI. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct PeerInfoDisplay { + pub capabilities: Capabilities, + pub user_agent: String, + pub version: u32, + pub addr: SocketAddr, + pub direction: Direction, pub total_difficulty: Difficulty, pub height: u64, - pub direction: Direction, - pub last_seen: DateTime, +} + +impl From for PeerInfoDisplay { + fn from(info: PeerInfo) -> PeerInfoDisplay { + PeerInfoDisplay { + capabilities: info.capabilities.clone(), + user_agent: info.user_agent.clone(), + version: info.version.clone(), + addr: info.addr.clone(), + direction: info.direction.clone(), + total_difficulty: info.total_difficulty(), + height: info.height(), + } + } } /// The full txhashset data along with indexes required for a consumer to diff --git a/p2p/tests/peer_handshake.rs b/p2p/tests/peer_handshake.rs index 666dda1d7..44ca7c76c 100644 --- a/p2p/tests/peer_handshake.rs +++ b/p2p/tests/peer_handshake.rs @@ -90,7 +90,6 @@ fn peer_handshake() { thread::sleep(time::Duration::from_secs(1)); let server_peer = server.peers.get_connected_peer(&my_addr).unwrap(); - let server_peer = server_peer.read().unwrap(); - assert_eq!(server_peer.info.total_difficulty, Difficulty::one()); + assert_eq!(server_peer.info.total_difficulty(), Difficulty::one()); assert!(server.peers.peer_count() > 0); } diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index 83476501a..2be2142c1 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -571,23 +571,26 @@ impl NetToChainAdapter { F: Fn(&p2p::Peer, Hash) -> Result<(), p2p::Error>, { match w(&self.chain).block_exists(h) { - Ok(false) => { - match wo(&self.peers).get_connected_peer(addr) { - None => debug!(LOGGER, "send_block_request_to_peer: can't send request to peer {:?}, not connected", addr), - Some(peer) => { - match peer.read() { - Err(e) => debug!(LOGGER, "send_block_request_to_peer: can't send request to peer {:?}, read fails: {:?}", addr, e), - Ok(p) => { - if let Err(e) = f(&p, h) { - error!(LOGGER, "send_block_request_to_peer: failed: {:?}", e) - } - } - } + Ok(false) => match wo(&self.peers).get_connected_peer(addr) { + None => debug!( + LOGGER, + "send_block_request_to_peer: can't send request to peer {:?}, not connected", + addr + ), + Some(peer) => { + if let Err(e) = f(&peer, h) { + error!(LOGGER, "send_block_request_to_peer: failed: {:?}", e) } } - } - Ok(true) => debug!(LOGGER, "send_block_request_to_peer: block {} already known", h), - Err(e) => error!(LOGGER, "send_block_request_to_peer: failed to check block exists: {:?}", e) + }, + Ok(true) => debug!( + LOGGER, + "send_block_request_to_peer: block {} already known", h + ), + Err(e) => error!( + LOGGER, + "send_block_request_to_peer: failed to check block exists: {:?}", e + ), } } diff --git a/servers/src/common/stats.rs b/servers/src/common/stats.rs index 498491ea1..c1d75b4e0 100644 --- a/servers/src/common/stats.rs +++ b/servers/src/common/stats.rs @@ -177,10 +177,10 @@ impl PeerStats { state: state.to_string(), addr: addr, version: peer.info.version, - total_difficulty: peer.info.total_difficulty.to_num(), - height: peer.info.height, + total_difficulty: peer.info.total_difficulty().to_num(), + height: peer.info.height(), direction: direction.to_string(), - last_seen: peer.info.last_seen, + last_seen: peer.info.last_seen(), } } } diff --git a/servers/src/grin/seed.rs b/servers/src/grin/seed.rs index 18f8b5fbc..fa03b3f43 100644 --- a/servers/src/grin/seed.rs +++ b/servers/src/grin/seed.rs @@ -159,16 +159,12 @@ fn monitor_peers( // ask them for their list of peers let mut connected_peers: Vec = vec![]; for p in peers.connected_peers() { - if let Ok(p) = p.try_read() { - debug!( - LOGGER, - "monitor_peers: {}:{} ask {} for more peers", config.host, config.port, p.info.addr, - ); - let _ = p.send_peer_request(capabilities); - connected_peers.push(p.info.addr) - } else { - warn!(LOGGER, "monitor_peers: failed to get read lock on peer"); - } + debug!( + LOGGER, + "monitor_peers: {}:{} ask {} for more peers", config.host, config.port, p.info.addr, + ); + let _ = p.send_peer_request(capabilities); + connected_peers.push(p.info.addr) } // Attempt to connect to preferred peers if there is some @@ -286,9 +282,7 @@ fn listen_for_addrs( for _ in 0..3 { match p2p_c.connect(&addr) { Ok(p) => { - if let Ok(p) = p.try_read() { - let _ = p.send_peer_request(capab); - } + let _ = p.send_peer_request(capab); let _ = peers_c.update_state(addr, p2p::State::Healthy); break; } @@ -326,8 +320,7 @@ pub fn dns_seeds() -> Box Vec + Send> { .map(|mut addr| { addr.set_port(13414); addr - }) - .filter(|addr| !temp_addresses.contains(addr)) + }).filter(|addr| !temp_addresses.contains(addr)) .collect()), ), Err(e) => debug!( diff --git a/servers/src/grin/server.rs b/servers/src/grin/server.rs index a5ef398a6..b25923f27 100644 --- a/servers/src/grin/server.rs +++ b/servers/src/grin/server.rs @@ -442,10 +442,8 @@ impl Server { .peers .connected_peers() .into_iter() - .map(|p| { - let p = p.read().unwrap(); - PeerStats::from_peer(&p) - }).collect(); + .map(|p| PeerStats::from_peer(&p)) + .collect(); Ok(ServerStats { peer_count: self.peer_count(), head: self.head(), diff --git a/servers/src/grin/sync/body_sync.rs b/servers/src/grin/sync/body_sync.rs index 15e449451..fc12f4cf2 100644 --- a/servers/src/grin/sync/body_sync.rs +++ b/servers/src/grin/sync/body_sync.rs @@ -159,12 +159,10 @@ impl BodySync { self.peers.more_work_peer() }; if let Some(peer) = peer { - if let Ok(peer) = peer.try_read() { - if let Err(e) = peer.send_block_request(*hash) { - debug!(LOGGER, "Skipped request to {}: {:?}", peer.info.addr, e); - } else { - self.body_sync_hashes.push(hash.clone()); - } + if let Err(e) = peer.send_block_request(*hash) { + debug!(LOGGER, "Skipped request to {}: {:?}", peer.info.addr, e); + } else { + self.body_sync_hashes.push(hash.clone()); } } } diff --git a/servers/src/grin/sync/header_sync.rs b/servers/src/grin/sync/header_sync.rs index cdfaede9f..fbe542f54 100644 --- a/servers/src/grin/sync/header_sync.rs +++ b/servers/src/grin/sync/header_sync.rs @@ -124,11 +124,8 @@ impl HeaderSync { let difficulty = header_head.total_difficulty; if let Some(peer) = self.peers.most_work_peer() { - if let Ok(p) = peer.try_read() { - let peer_difficulty = p.info.total_difficulty.clone(); - if peer_difficulty > difficulty { - self.request_headers(&p); - } + if peer.info.total_difficulty() > difficulty { + self.request_headers(&peer); } } } diff --git a/servers/src/grin/sync/state_sync.rs b/servers/src/grin/sync/state_sync.rs index d40faec2c..c834f0e27 100644 --- a/servers/src/grin/sync/state_sync.rs +++ b/servers/src/grin/sync/state_sync.rs @@ -14,7 +14,7 @@ use chrono::prelude::{DateTime, Utc}; use chrono::Duration; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use chain; use common::types::{Error, SyncState, SyncStatus}; @@ -36,7 +36,7 @@ pub struct StateSync { archive_mode: bool, prev_fast_sync: Option>, - fast_sync_peer: Option>>, + fast_sync_peer: Option>, } impl StateSync { @@ -88,14 +88,12 @@ impl StateSync { // check peer connection status of this sync if let Some(ref peer) = self.fast_sync_peer { - if let Ok(p) = peer.try_read() { - if !p.is_connected() && SyncStatus::TxHashsetDownload == self.sync_state.status() { - sync_need_restart = true; - info!( - LOGGER, - "fast_sync: peer connection lost: {:?}. restart", p.info.addr, - ); - } + if !peer.is_connected() && SyncStatus::TxHashsetDownload == self.sync_state.status() { + sync_need_restart = true; + info!( + LOGGER, + "fast_sync: peer connection lost: {:?}. restart", peer.info.addr, + ); } } @@ -131,38 +129,36 @@ impl StateSync { true } - fn request_state(&self, header_head: &chain::Tip) -> Result>, p2p::Error> { + fn request_state(&self, header_head: &chain::Tip) -> Result, p2p::Error> { let horizon = global::cut_through_horizon() as u64; if let Some(peer) = self.peers.most_work_peer() { - if let Ok(p) = peer.try_read() { - // ask for txhashset at 90% of horizon, this still leaves time for download - // and validation to happen and stay within horizon - let mut txhashset_head = self + // ask for txhashset at 90% of horizon, this still leaves time for download + // and validation to happen and stay within horizon + let mut txhashset_head = self + .chain + .get_block_header(&header_head.prev_block_h) + .unwrap(); + for _ in 0..(horizon - horizon / 10) { + txhashset_head = self .chain - .get_block_header(&header_head.prev_block_h) + .get_block_header(&txhashset_head.previous) .unwrap(); - for _ in 0..(horizon - horizon / 10) { - txhashset_head = self - .chain - .get_block_header(&txhashset_head.previous) - .unwrap(); - } - let bhash = txhashset_head.hash(); - debug!( - LOGGER, - "fast_sync: before txhashset request, header head: {} / {}, txhashset_head: {} / {}", - header_head.height, - header_head.last_block_h, - txhashset_head.height, - bhash - ); - if let Err(e) = p.send_txhashset_request(txhashset_head.height, bhash) { - error!(LOGGER, "fast_sync: send_txhashset_request err! {:?}", e); - return Err(e); - } - return Ok(peer.clone()); } + let bhash = txhashset_head.hash(); + debug!( + LOGGER, + "fast_sync: before txhashset request, header head: {} / {}, txhashset_head: {} / {}", + header_head.height, + header_head.last_block_h, + txhashset_head.height, + bhash + ); + if let Err(e) = peer.send_txhashset_request(txhashset_head.height, bhash) { + error!(LOGGER, "fast_sync: send_txhashset_request err! {:?}", e); + return Err(e); + } + return Ok(peer.clone()); } Err(p2p::Error::PeerException) } diff --git a/servers/src/grin/sync/syncer.rs b/servers/src/grin/sync/syncer.rs index 0ef220b08..024afdf11 100644 --- a/servers/src/grin/sync/syncer.rs +++ b/servers/src/grin/sync/syncer.rs @@ -159,22 +159,19 @@ fn needs_syncing( // difficulty than us if is_syncing { if let Some(peer) = peer { - if let Ok(peer) = peer.try_read() { - most_work_height = peer.info.height; + most_work_height = peer.info.height(); + if peer.info.total_difficulty() <= local_diff { + let ch = chain.head().unwrap(); + info!( + LOGGER, + "synchronized at {} @ {} [{}]", + local_diff.to_num(), + ch.height, + ch.last_block_h + ); - if peer.info.total_difficulty <= local_diff { - let ch = chain.head().unwrap(); - info!( - LOGGER, - "synchronized at {} @ {} [{}]", - local_diff.to_num(), - ch.height, - ch.last_block_h - ); - - let _ = chain.reset_head(); - return (false, most_work_height); - } + let _ = chain.reset_head(); + return (false, most_work_height); } } else { warn!(LOGGER, "sync: no peers available, disabling sync"); @@ -182,26 +179,25 @@ fn needs_syncing( } } else { if let Some(peer) = peer { - if let Ok(peer) = peer.try_read() { - most_work_height = peer.info.height; + most_work_height = peer.info.height(); - // sum the last 5 difficulties to give us the threshold - let threshold = chain - .difficulty_iter() - .filter_map(|x| x.map(|(_, x)| x).ok()) - .take(5) - .fold(Difficulty::zero(), |sum, val| sum + val); + // sum the last 5 difficulties to give us the threshold + let threshold = chain + .difficulty_iter() + .filter_map(|x| x.map(|(_, x)| x).ok()) + .take(5) + .fold(Difficulty::zero(), |sum, val| sum + val); - if peer.info.total_difficulty > local_diff.clone() + threshold.clone() { - info!( - LOGGER, - "sync: total_difficulty {}, peer_difficulty {}, threshold {} (last 5 blocks), enabling sync", - local_diff, - peer.info.total_difficulty, - threshold, - ); - return (true, most_work_height); - } + let peer_diff = peer.info.total_difficulty(); + if peer_diff > local_diff.clone() + threshold.clone() { + info!( + LOGGER, + "sync: total_difficulty {}, peer_difficulty {}, threshold {} (last 5 blocks), enabling sync", + local_diff, + peer_diff, + threshold, + ); + return (true, most_work_height); } } } diff --git a/servers/tests/api.rs b/servers/tests/api.rs index e09fc2218..d31825b28 100644 --- a/servers/tests/api.rs +++ b/servers/tests/api.rs @@ -460,12 +460,13 @@ pub fn get_peer( pub fn get_connected_peers( base_addr: &String, api_server_port: u16, -) -> Result, Error> { +) -> Result, Error> { let url = format!( "http://{}:{}/v1/peers/connected", base_addr, api_server_port ); - api::client::get::>(url.as_str(), None).map_err(|e| Error::API(e)) + api::client::get::>(url.as_str(), None) + .map_err(|e| Error::API(e)) } pub fn get_all_peers( diff --git a/servers/tests/simulnet.rs b/servers/tests/simulnet.rs index a800c81df..3f40d568b 100644 --- a/servers/tests/simulnet.rs +++ b/servers/tests/simulnet.rs @@ -127,7 +127,7 @@ fn simulate_seeding() { "http://{}:{}/v1/peers/connected", &server_config.base_addr, 30020 ); - let peers_all = api::client::get::>(url.as_str(), None); + let peers_all = api::client::get::>(url.as_str(), None); assert!(peers_all.is_ok()); assert_eq!(peers_all.unwrap().len(), 4); diff --git a/src/bin/cmd/client.rs b/src/bin/cmd/client.rs index 94c16c13b..e1579bf62 100644 --- a/src/bin/cmd/client.rs +++ b/src/bin/cmd/client.rs @@ -122,8 +122,9 @@ pub fn unban_peer(config: &ServerConfig, peer_addr: &SocketAddr, api_secret: Opt pub fn list_connected_peers(config: &ServerConfig, api_secret: Option) { let mut e = term::stdout().unwrap(); let url = format!("http://{}/v1/peers/connected", config.api_http_addr); - let peers_info: Result, api::Error>; - peers_info = api::client::get::>(url.as_str(), api_secret); + // let peers_info: Result, api::Error>; + + let peers_info = api::client::get::>(url.as_str(), api_secret); match peers_info.map_err(|e| Error::API(e)) { Ok(connected_peers) => { @@ -134,6 +135,7 @@ pub fn list_connected_peers(config: &ServerConfig, api_secret: Option) { writeln!(e, "User agent: {}", connected_peer.user_agent).unwrap(); writeln!(e, "Version: {}", connected_peer.version).unwrap(); writeln!(e, "Peer address: {}", connected_peer.addr).unwrap(); + writeln!(e, "Height: {}", connected_peer.height).unwrap(); writeln!(e, "Total difficulty: {}", connected_peer.total_difficulty).unwrap(); writeln!(e, "Direction: {:?}", connected_peer.direction).unwrap(); println!(); @@ -142,6 +144,7 @@ pub fn list_connected_peers(config: &ServerConfig, api_secret: Option) { } Err(_) => writeln!(e, "Failed to get connected peers").unwrap(), }; + e.reset().unwrap(); }