From 6352dfbac921888259d5f8008eb6a421111f1361 Mon Sep 17 00:00:00 2001 From: AntiochP <30642645+antiochp@users.noreply.github.com> Date: Tue, 21 Nov 2017 09:24:29 -0500 Subject: [PATCH] add total_diff to ping/pong msgs (#350) * add total_diff to ping/pong msgs debug log for total_diff on each ping/pong * expose peer addr to the handle_payload fn so we know where it came from * fix p2p tests for ping * default to 0 if we cannot read total_difficulty * updating a connected peer in place * actually update peer info diff * fixup p2p tests --- api/src/handlers.rs | 1 + grin/src/adapters.rs | 25 +++++++++- grin/src/seed.rs | 2 + grin/src/server.rs | 6 +++ grin/src/sync.rs | 3 ++ p2p/src/msg.rs | 48 +++++++++++++++---- p2p/src/peer.rs | 10 ++-- p2p/src/protocol.rs | 33 +++++++++++-- p2p/src/server.rs | 95 ++++++++++++++++++++++--------------- p2p/src/types.rs | 7 ++- p2p/tests/peer_handshake.rs | 7 ++- 11 files changed, 178 insertions(+), 59 deletions(-) diff --git a/api/src/handlers.rs b/api/src/handlers.rs index 02aa1571b..083b65024 100644 --- a/api/src/handlers.rs +++ b/api/src/handlers.rs @@ -247,6 +247,7 @@ impl Handler for PeersConnectedHandler { fn handle(&self, _req: &mut Request) -> IronResult { let mut peers = vec![]; for p in &self.p2p_server.all_peers() { + let p = p.read().unwrap(); let peer_info = p.info.clone(); peers.push(peer_info); } diff --git a/grin/src/adapters.rs b/grin/src/adapters.rs index 3dcfa79f1..14e4293b8 100644 --- a/grin/src/adapters.rs +++ b/grin/src/adapters.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::net::SocketAddr; use std::sync::{Arc, RwLock}; use std::thread; @@ -21,7 +22,7 @@ use core::core::{self, Output}; use core::core::block::BlockHeader; use core::core::hash::{Hash, Hashed}; use core::core::target::Difficulty; -use p2p::{self, NetAdapter, PeerData, PeerStore, Server, State}; +use p2p::{self, NetAdapter, Peer, PeerData, PeerStore, Server, State}; use pool; use util::secp::pedersen::Commitment; use util::OneTime; @@ -35,8 +36,8 @@ use util::LOGGER; pub struct NetToChainAdapter { chain: Arc, peer_store: Arc, + connected_peers: Arc>>>>, tx_pool: Arc>>, - syncer: OneTime>, } @@ -247,6 +248,24 @@ impl NetAdapter for NetToChainAdapter { error!(LOGGER, "Could not save connected peer: {:?}", e); } } + + fn peer_difficulty(&self, addr: SocketAddr, diff: Difficulty) { + debug!( + LOGGER, + "peer total_diff (ping/pong): {}, {} vs us {}", + addr, + diff, + self.total_difficulty(), + ); + + if diff.into_num() > 0 { + let peers = self.connected_peers.write().unwrap(); + if let Some(peer) = peers.get(&addr) { + let mut peer = peer.write().unwrap(); + peer.info.total_difficulty = diff; + } + } + } } impl NetToChainAdapter { @@ -254,10 +273,12 @@ impl NetToChainAdapter { chain_ref: Arc, tx_pool: Arc>>, peer_store: Arc, + connected_peers: Arc>>>>, ) -> NetToChainAdapter { NetToChainAdapter { chain: chain_ref, peer_store: peer_store, + connected_peers: connected_peers, tx_pool: tx_pool, syncer: OneTime::new(), } diff --git a/grin/src/seed.rs b/grin/src/seed.rs index 79ce46bb6..c6878d5cc 100644 --- a/grin/src/seed.rs +++ b/grin/src/seed.rs @@ -96,6 +96,7 @@ impl Seeder { // if needed let disconnected = p2p_server.clean_peers(); for p in disconnected { + let p = p.read().unwrap(); if p.is_banned() { debug!(LOGGER, "Marking peer {} as banned.", p.info.addr); let update_result = @@ -282,6 +283,7 @@ fn connect_and_req( let fut = timeout.then(move |p| { match p { Ok(Some(p)) => { + let p = p.read().unwrap(); let peer_result = p.send_peer_request(capab); match peer_result { Ok(()) => {} diff --git a/grin/src/server.rs b/grin/src/server.rs index 1ede3346e..52fb9b449 100644 --- a/grin/src/server.rs +++ b/grin/src/server.rs @@ -16,6 +16,7 @@ //! the peer-to-peer server, the blockchain and the transaction pool) and acts //! as a facade. +use std::collections::HashMap; use std::net::SocketAddr; use std::sync::{Arc, RwLock}; use std::thread; @@ -108,16 +109,21 @@ impl Server { pool_adapter.set_chain(shared_chain.clone()); + // Currently connected peers. Used by both the net_adapter and the p2p_server. + let connected_peers = Arc::new(RwLock::new(HashMap::new())); + let peer_store = Arc::new(p2p::PeerStore::new(config.db_root.clone())?); let net_adapter = Arc::new(NetToChainAdapter::new( shared_chain.clone(), tx_pool.clone(), peer_store.clone(), + connected_peers.clone(), )); let p2p_server = Arc::new(p2p::Server::new( config.capabilities, config.p2p_config.unwrap(), + connected_peers.clone(), net_adapter.clone(), genesis.hash(), )); diff --git a/grin/src/sync.rs b/grin/src/sync.rs index c26c36c84..9acd26b74 100644 --- a/grin/src/sync.rs +++ b/grin/src/sync.rs @@ -107,6 +107,7 @@ impl Syncer { // TODO do something better (like trying to get more) if we lose peers let peer = self.p2p.most_work_peer().expect("No peers available for sync."); + let peer = peer.read().unwrap(); debug!( LOGGER, "Sync: peer {} vs us {}", @@ -245,6 +246,7 @@ impl Syncer { let peer = self.p2p.most_work_peer(); let locator = self.get_locator(&tip)?; if let Some(p) = peer { + let p = p.read().unwrap(); debug!( LOGGER, "Asking peer {} for more block headers, locator: {:?}", @@ -313,6 +315,7 @@ impl Syncer { /// Pick a random peer and ask for a block by hash fn request_block(&self, h: Hash) { let peer = self.p2p.random_peer().unwrap(); + let peer = peer.read().unwrap(); let send_result = peer.send_block_request(h); if let Err(e) = send_result { debug!(LOGGER, "Error requesting block: {:?}", e); diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index a5961d95f..71760c45c 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -475,18 +475,50 @@ impl Readable for Headers { } } -/// Placeholder for messages like Ping and Pong that don't send anything but -/// the header. -pub struct Empty {} +pub struct Ping { + /// total difficulty accumulated by the sender, used to check whether sync + /// may be needed + pub total_difficulty: Difficulty, +} -impl Writeable for Empty { - fn write(&self, _: &mut W) -> Result<(), ser::Error> { +impl Writeable for Ping { + fn write(&self, writer: &mut W) -> Result<(), ser::Error> { + self.total_difficulty.write(writer).unwrap(); Ok(()) } } -impl Readable for Empty { - fn read(_: &mut Reader) -> Result { - Ok(Empty {}) +impl Readable for Ping { + fn read(reader: &mut Reader) -> Result { + // TODO - once everyone is sending total_difficulty we can clean this up + let total_difficulty = match Difficulty::read(reader) { + Ok(diff) => diff, + Err(_) => Difficulty::zero(), + }; + Ok(Ping { total_difficulty }) + } +} + +pub struct Pong { + /// total difficulty accumulated by the sender, used to check whether sync + /// may be needed + pub total_difficulty: Difficulty, +} + +impl Writeable for Pong { + fn write(&self, writer: &mut W) -> Result<(), ser::Error> { + self.total_difficulty.write(writer).unwrap(); + Ok(()) + } +} + +impl Readable for Pong { + fn read(reader: &mut Reader) -> Result { + // TODO - once everyone is sending total_difficulty we can clean this up + let total_difficulty = match Difficulty::read(reader) { + Ok(diff) => diff, + Err(_) => Difficulty::zero(), + }; + Ok(Pong { total_difficulty }) } } diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 3f75b7deb..3e0449ba9 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -94,7 +94,7 @@ impl Peer { let state = self.state.clone(); let adapter = Arc::new(self.tracking_adapter.clone()); - Box::new(self.proto.handle(conn, adapter).then(move |res| { + Box::new(self.proto.handle(conn, adapter, addr).then(move |res| { // handle disconnection, standard disconnections aren't considered an error let mut state = state.write().unwrap(); match res { @@ -134,8 +134,8 @@ impl Peer { self.proto.transmitted_bytes() } - pub fn send_ping(&self) -> Result<(), Error> { - self.proto.send_ping() + pub fn send_ping(&self, total_difficulty: Difficulty) -> Result<(), Error> { + self.proto.send_ping(total_difficulty) } /// Sends the provided block to the remote peer. The request may be dropped @@ -252,4 +252,8 @@ impl NetAdapter for TrackingAdapter { fn peer_connected(&self, pi: &PeerInfo) { self.adapter.peer_connected(pi) } + + fn peer_difficulty(&self, addr: SocketAddr, diff: Difficulty) { + self.adapter.peer_difficulty(addr, diff) + } } diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index da56a7a27..46a9142d3 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::sync::Arc; +use std::net::SocketAddr; use futures::Future; use futures::sync::mpsc::UnboundedSender; @@ -20,6 +21,7 @@ use tokio_core::net::TcpStream; use core::core; use core::core::hash::Hash; +use core::core::target::Difficulty; use core::ser; use conn::TimeoutConnection; use msg::*; @@ -46,10 +48,11 @@ impl Protocol for ProtocolV1 { &self, conn: TcpStream, adapter: Arc, + addr: SocketAddr, ) -> Box> { let (conn, listener) = TimeoutConnection::listen(conn, move |sender, header, data| { let adapt = adapter.as_ref(); - handle_payload(adapt, sender, header, data) + handle_payload(adapt, sender, header, data, addr) }); self.conn.init(conn); @@ -64,8 +67,13 @@ impl Protocol for ProtocolV1 { /// Sends a ping message to the remote peer. Will panic if handle has never /// been called on this protocol. - fn send_ping(&self) -> Result<(), Error> { - self.send_request(Type::Ping, Type::Pong, &Empty {}, None) + fn send_ping(&self, total_difficulty: Difficulty) -> Result<(), Error> { + self.send_request( + Type::Ping, + Type::Pong, + &Ping { total_difficulty }, + None, + ) } /// Serializes and sends a block to our remote peer @@ -129,14 +137,29 @@ fn handle_payload( sender: UnboundedSender>, header: MsgHeader, buf: Vec, + addr: SocketAddr, ) -> Result, ser::Error> { match header.msg_type { Type::Ping => { - let data = ser::ser_vec(&MsgHeader::new(Type::Pong, 0))?; + let ping = ser::deserialize::(&mut &buf[..])?; + adapter.peer_difficulty(addr, ping.total_difficulty); + let pong = Pong { total_difficulty: adapter.total_difficulty() }; + let mut body_data = vec![]; + try!(ser::serialize(&mut body_data, &pong)); + let mut data = vec![]; + try!(ser::serialize( + &mut data, + &MsgHeader::new(Type::Pong, body_data.len() as u64), + )); + data.append(&mut body_data); sender.unbounded_send(data).unwrap(); Ok(None) } - Type::Pong => Ok(None), + Type::Pong => { + let pong = ser::deserialize::(&mut &buf[..])?; + adapter.peer_difficulty(addr, pong.total_difficulty); + Ok(None) + }, Type::Transaction => { let tx = ser::deserialize::(&mut &buf[..])?; adapter.transaction_received(tx); diff --git a/p2p/src/server.rs b/p2p/src/server.rs index b09d6006b..6a63a58c4 100644 --- a/p2p/src/server.rs +++ b/p2p/src/server.rs @@ -18,7 +18,6 @@ use std::cell::RefCell; use std::collections::HashMap; use std::net::SocketAddr; -use std::ops::Deref; use std::sync::{Arc, RwLock}; use std::time::Duration; @@ -58,6 +57,7 @@ impl NetAdapter for DummyAdapter { } fn peer_addrs_received(&self, _: Vec) {} fn peer_connected(&self, _: &PeerInfo) {} + fn peer_difficulty(&self, _: SocketAddr, _: Difficulty) {} } /// P2P server implementation, handling bootstrapping to find and connect to @@ -65,7 +65,7 @@ impl NetAdapter for DummyAdapter { pub struct Server { config: P2PConfig, capabilities: Capabilities, - peers: Arc>>>, + peers: Arc>>>>, handshake: Arc, adapter: Arc, stop: RefCell>>, @@ -80,13 +80,14 @@ impl Server { pub fn new( capab: Capabilities, config: P2PConfig, + peers: Arc>>>>, adapter: Arc, genesis: Hash, ) -> Server { Server { config: config, capabilities: capab, - peers: Arc::new(RwLock::new(HashMap::new())), + peers: peers, handshake: Arc::new(Handshake::new(genesis)), adapter: adapter, stop: RefCell::new(None), @@ -108,19 +109,27 @@ impl Server { // main peer acceptance future handling handshake let hp = h.clone(); let peers_listen = socket.incoming().map_err(From::from).map(move |(conn, _)| { - let adapter = adapter.clone(); - let total_diff = adapter.total_difficulty(); let peers = peers.clone(); + let total_diff = adapter.total_difficulty(); // accept the peer and add it to the server map - let accept = Peer::accept(conn, capab, total_diff, &handshake.clone(), adapter.clone()); - let added = add_to_peers(peers, adapter, accept); + let accept = Peer::accept( + conn, + capab, + total_diff, + &handshake.clone(), + adapter.clone(), + ); + let added = add_to_peers(peers, adapter.clone(), accept); // wire in a future to timeout the accept after 5 secs let timed_peer = with_timeout(Box::new(added), &hp); // run the main peer protocol - timed_peer.and_then(move |(conn, peer)| peer.clone().run(conn)) + timed_peer.and_then(move |(conn, peer)| { + let peer = peer.read().unwrap(); + peer.run(conn) + }) }); // spawn each peer future to its own task @@ -143,12 +152,15 @@ impl Server { *stop_mut = Some(stop); } + // timer to regularly check on our peers by pinging them + let adapter = self.adapter.clone(); let peers_inner = self.peers.clone(); let peers_timer = Timer::default() .interval(Duration::new(20, 0)) .fold((), move |_, _| { - check_peers(peers_inner.clone()); + let total_diff = adapter.total_difficulty(); + check_peers(peers_inner.clone(), total_diff); Ok(()) }); @@ -172,7 +184,7 @@ impl Server { &self, addr: SocketAddr, h: reactor::Handle, - ) -> Box>, Error = Error>> { + ) -> Box>>, Error = Error>> { if let Some(p) = self.get_peer(addr) { // if we're already connected to the addr, just return the peer return Box::new(future::ok(Some(p))); @@ -206,11 +218,12 @@ impl Server { with_timeout(Box::new(added), &h) }) .and_then(move |(socket, peer)| { - h2.spawn(peer.run(socket).map_err(|e| { + let peer_inner = peer.read().unwrap(); + h2.spawn(peer_inner.run(socket).map_err(|e| { error!(LOGGER, "Peer error: {:?}", e); () })); - Ok(Some(peer)) + Ok(Some(peer.clone())) }); Box::new(request) } @@ -220,32 +233,34 @@ impl Server { self.get_peer(addr).is_some() } - pub fn all_peers(&self) -> Vec> { + pub fn all_peers(&self) -> Vec>> { self.peers.read().unwrap().values().map(|p| p.clone()).collect() } /// Get a peer we're connected to by address. - pub fn get_peer(&self, addr: SocketAddr) -> Option> { + pub fn get_peer(&self, addr: SocketAddr) -> Option>> { self.peers.read().unwrap().get(&addr).map(|p| p.clone()) } /// Have the server iterate over its peer list and prune all peers we have /// lost connection to or have been deemed problematic. The removed peers /// are returned. - pub fn clean_peers(&self) -> Vec> { + pub fn clean_peers(&self) -> Vec>> { let mut rm = vec![]; // build a list of peers to be cleaned up for peer in self.all_peers() { - if !peer.is_connected() { - debug!(LOGGER, "cleaning {:?}, not connected", peer.info.addr); - rm.push(peer); + let peer_inner = peer.read().unwrap(); + if !peer_inner.is_connected() { + debug!(LOGGER, "cleaning {:?}, not connected", peer_inner.info.addr); + rm.push(peer.clone()); } } // now clean up peer map based on the list to remove let mut peers = self.peers.write().unwrap(); for p in rm.clone() { + let p = p.read().unwrap(); peers.remove(&p.info.addr); } @@ -254,22 +269,21 @@ impl Server { /// Returns the peer with the most worked branch, showing the highest total /// difficulty. - pub fn most_work_peer(&self) -> Option> { - let peers = self.all_peers(); + pub fn most_work_peer(&self) -> Option>> { + let mut peers = self.all_peers(); if peers.len() == 0 { return None; } - let mut res = peers[0].clone(); - for p in peers.deref() { - if p.is_connected() && res.info.total_difficulty < p.info.total_difficulty { - res = (*p).clone(); - } - } - Some(res) + peers.sort_by_key(|p| { + let p = p.read().unwrap(); + p.info.total_difficulty.clone() + }); + let peer = peers.last().unwrap(); + Some(peer.clone()) } /// Returns a random peer we're connected to. - pub fn random_peer(&self) -> Option> { + pub fn random_peer(&self) -> Option>> { let peers = self.all_peers(); if peers.len() == 0 { None @@ -285,7 +299,8 @@ impl Server { pub fn broadcast_block(&self, b: &core::Block) { let peers = self.all_peers(); let mut count = 0; - for p in peers.deref() { + for p in peers { + let p = p.read().unwrap(); if p.is_connected() { if let Err(e) = p.send_block(b) { debug!(LOGGER, "Error sending block to peer: {:?}", e); @@ -294,7 +309,7 @@ impl Server { } } } - debug!(LOGGER, "Bardcasted block {} to {} peers.", b.header.height, count); + debug!(LOGGER, "Broadcasted block {} to {} peers.", b.header.height, count); } /// Broadcasts the provided transaction to all our peers. A peer @@ -302,7 +317,8 @@ impl Server { /// remote peer already has the transaction. pub fn broadcast_transaction(&self, tx: &core::Transaction) { let peers = self.all_peers(); - for p in peers.deref() { + for p in peers { + let p = p.read().unwrap(); if p.is_connected() { if let Err(e) = p.send_transaction(tx) { debug!(LOGGER, "Error sending block to peer: {:?}", e); @@ -320,7 +336,8 @@ impl Server { pub fn stop(self) { info!(LOGGER, "calling stop on server"); let peers = self.all_peers(); - for p in peers.deref() { + for p in peers { + let p = p.write().unwrap(); p.stop(); } self.stop.into_inner().unwrap().send(()).unwrap(); @@ -329,17 +346,17 @@ impl Server { // Adds the peer built by the provided future in the peers map fn add_to_peers( - peers: Arc>>>, + peers: Arc>>>>, adapter: Arc, peer_fut: A, -) -> Box), ()>, Error = Error>> +) -> Box>), ()>, Error = Error>> where A: IntoFuture + 'static, { let peer_add = peer_fut.into_future().map(move |(conn, peer)| { adapter.peer_connected(&peer.info); let addr = peer.info.addr.clone(); - let apeer = Arc::new(peer); + let apeer = Arc::new(RwLock::new(peer)); let mut peers = peers.write().unwrap(); peers.insert(addr, apeer.clone()); Ok((conn, apeer)) @@ -349,11 +366,15 @@ where // Ping all our connected peers. Always automatically expects a pong back or // disconnects. This acts as a liveness test. -fn check_peers(peers: Arc>>>) { +fn check_peers( + peers: Arc>>>>, + total_difficulty: Difficulty, +) { let peers_map = peers.read().unwrap(); for p in peers_map.values() { + let p = p.read().unwrap(); if p.is_connected() { - let _ = p.send_ping(); + let _ = p.send_ping(total_difficulty.clone()); } } } diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 53d29fe15..d11628c79 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -126,11 +126,11 @@ pub trait Protocol { /// be known already, usually passed during construction. Will typically /// block so needs to be called withing a coroutine. Should also be called /// only once. - fn handle(&self, conn: TcpStream, na: Arc) + fn handle(&self, conn: TcpStream, na: Arc, addr: SocketAddr) -> Box>; /// Sends a ping message to the remote peer. - fn send_ping(&self) -> Result<(), Error>; + fn send_ping(&self, total_difficulty: Difficulty) -> Result<(), Error>; /// Relays a block to the remote peer. fn send_block(&self, b: &core::Block) -> Result<(), Error>; @@ -189,4 +189,7 @@ pub trait NetAdapter: Sync + Send { /// Network successfully connected to a peer. fn peer_connected(&self, &PeerInfo); + + /// Heard total_difficulty from a connected peer (via ping/pong). + fn peer_difficulty(&self, SocketAddr, Difficulty); } diff --git a/p2p/tests/peer_handshake.rs b/p2p/tests/peer_handshake.rs index 508745d78..6ffe43fa8 100644 --- a/p2p/tests/peer_handshake.rs +++ b/p2p/tests/peer_handshake.rs @@ -17,8 +17,9 @@ extern crate grin_core as core; extern crate grin_p2p as p2p; extern crate tokio_core; +use std::collections::HashMap; use std::net::SocketAddr; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::time; use futures::future::Future; @@ -37,9 +38,11 @@ fn peer_handshake() { let handle = evtlp.handle(); let p2p_conf = p2p::P2PConfig::default(); let net_adapter = Arc::new(p2p::DummyAdapter {}); + let connected_peers = Arc::new(RwLock::new(HashMap::new())); let server = p2p::Server::new( p2p::UNKNOWN, p2p_conf, + connected_peers, net_adapter.clone(), Hash::from_vec(vec![]), ); @@ -73,7 +76,7 @@ fn peer_handshake() { rhandle.spawn(peer.run(socket).map_err(|e| { panic!("Client run failed: {:?}", e); })); - peer.send_ping().unwrap(); + peer.send_ping(Difficulty::one()).unwrap(); timeout_send.from_err().map(|_| peer) }) .and_then(|peer| {