add total_diff to ping/pong msgs (#350)

* add total_diff to ping/pong msgs
debug log for total_diff on each ping/pong

* expose peer addr to the handle_payload fn
so we know where it came from

* fix p2p tests for ping

* default to 0 if we cannot read total_difficulty

* updating a connected peer in place

* actually update peer info diff

* fixup p2p tests
This commit is contained in:
AntiochP 2017-11-21 09:24:29 -05:00 committed by GitHub
parent ce82c82b0d
commit 6352dfbac9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 178 additions and 59 deletions

View file

@ -247,6 +247,7 @@ impl Handler for PeersConnectedHandler {
fn handle(&self, _req: &mut Request) -> IronResult<Response> {
let mut peers = vec![];
for p in &self.p2p_server.all_peers() {
let p = p.read().unwrap();
let peer_info = p.info.clone();
peers.push(peer_info);
}

View file

@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::{Arc, RwLock};
use std::thread;
@ -21,7 +22,7 @@ use core::core::{self, Output};
use core::core::block::BlockHeader;
use core::core::hash::{Hash, Hashed};
use core::core::target::Difficulty;
use p2p::{self, NetAdapter, PeerData, PeerStore, Server, State};
use p2p::{self, NetAdapter, Peer, PeerData, PeerStore, Server, State};
use pool;
use util::secp::pedersen::Commitment;
use util::OneTime;
@ -35,8 +36,8 @@ use util::LOGGER;
pub struct NetToChainAdapter {
chain: Arc<chain::Chain>,
peer_store: Arc<PeerStore>,
connected_peers: Arc<RwLock<HashMap<SocketAddr, Arc<RwLock<Peer>>>>>,
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
syncer: OneTime<Arc<sync::Syncer>>,
}
@ -247,6 +248,24 @@ impl NetAdapter for NetToChainAdapter {
error!(LOGGER, "Could not save connected peer: {:?}", e);
}
}
fn peer_difficulty(&self, addr: SocketAddr, diff: Difficulty) {
debug!(
LOGGER,
"peer total_diff (ping/pong): {}, {} vs us {}",
addr,
diff,
self.total_difficulty(),
);
if diff.into_num() > 0 {
let peers = self.connected_peers.write().unwrap();
if let Some(peer) = peers.get(&addr) {
let mut peer = peer.write().unwrap();
peer.info.total_difficulty = diff;
}
}
}
}
impl NetToChainAdapter {
@ -254,10 +273,12 @@ impl NetToChainAdapter {
chain_ref: Arc<chain::Chain>,
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
peer_store: Arc<PeerStore>,
connected_peers: Arc<RwLock<HashMap<SocketAddr, Arc<RwLock<Peer>>>>>,
) -> NetToChainAdapter {
NetToChainAdapter {
chain: chain_ref,
peer_store: peer_store,
connected_peers: connected_peers,
tx_pool: tx_pool,
syncer: OneTime::new(),
}

View file

@ -96,6 +96,7 @@ impl Seeder {
// if needed
let disconnected = p2p_server.clean_peers();
for p in disconnected {
let p = p.read().unwrap();
if p.is_banned() {
debug!(LOGGER, "Marking peer {} as banned.", p.info.addr);
let update_result =
@ -282,6 +283,7 @@ fn connect_and_req(
let fut = timeout.then(move |p| {
match p {
Ok(Some(p)) => {
let p = p.read().unwrap();
let peer_result = p.send_peer_request(capab);
match peer_result {
Ok(()) => {}

View file

@ -16,6 +16,7 @@
//! the peer-to-peer server, the blockchain and the transaction pool) and acts
//! as a facade.
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::{Arc, RwLock};
use std::thread;
@ -108,16 +109,21 @@ impl Server {
pool_adapter.set_chain(shared_chain.clone());
// Currently connected peers. Used by both the net_adapter and the p2p_server.
let connected_peers = Arc::new(RwLock::new(HashMap::new()));
let peer_store = Arc::new(p2p::PeerStore::new(config.db_root.clone())?);
let net_adapter = Arc::new(NetToChainAdapter::new(
shared_chain.clone(),
tx_pool.clone(),
peer_store.clone(),
connected_peers.clone(),
));
let p2p_server = Arc::new(p2p::Server::new(
config.capabilities,
config.p2p_config.unwrap(),
connected_peers.clone(),
net_adapter.clone(),
genesis.hash(),
));

View file

@ -107,6 +107,7 @@ impl Syncer {
// TODO do something better (like trying to get more) if we lose peers
let peer = self.p2p.most_work_peer().expect("No peers available for sync.");
let peer = peer.read().unwrap();
debug!(
LOGGER,
"Sync: peer {} vs us {}",
@ -245,6 +246,7 @@ impl Syncer {
let peer = self.p2p.most_work_peer();
let locator = self.get_locator(&tip)?;
if let Some(p) = peer {
let p = p.read().unwrap();
debug!(
LOGGER,
"Asking peer {} for more block headers, locator: {:?}",
@ -313,6 +315,7 @@ impl Syncer {
/// Pick a random peer and ask for a block by hash
fn request_block(&self, h: Hash) {
let peer = self.p2p.random_peer().unwrap();
let peer = peer.read().unwrap();
let send_result = peer.send_block_request(h);
if let Err(e) = send_result {
debug!(LOGGER, "Error requesting block: {:?}", e);

View file

@ -475,18 +475,50 @@ impl Readable for Headers {
}
}
/// Placeholder for messages like Ping and Pong that don't send anything but
/// the header.
pub struct Empty {}
pub struct Ping {
/// total difficulty accumulated by the sender, used to check whether sync
/// may be needed
pub total_difficulty: Difficulty,
}
impl Writeable for Empty {
fn write<W: Writer>(&self, _: &mut W) -> Result<(), ser::Error> {
impl Writeable for Ping {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
self.total_difficulty.write(writer).unwrap();
Ok(())
}
}
impl Readable for Empty {
fn read(_: &mut Reader) -> Result<Empty, ser::Error> {
Ok(Empty {})
impl Readable for Ping {
fn read(reader: &mut Reader) -> Result<Ping, ser::Error> {
// TODO - once everyone is sending total_difficulty we can clean this up
let total_difficulty = match Difficulty::read(reader) {
Ok(diff) => diff,
Err(_) => Difficulty::zero(),
};
Ok(Ping { total_difficulty })
}
}
pub struct Pong {
/// total difficulty accumulated by the sender, used to check whether sync
/// may be needed
pub total_difficulty: Difficulty,
}
impl Writeable for Pong {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
self.total_difficulty.write(writer).unwrap();
Ok(())
}
}
impl Readable for Pong {
fn read(reader: &mut Reader) -> Result<Pong, ser::Error> {
// TODO - once everyone is sending total_difficulty we can clean this up
let total_difficulty = match Difficulty::read(reader) {
Ok(diff) => diff,
Err(_) => Difficulty::zero(),
};
Ok(Pong { total_difficulty })
}
}

View file

@ -94,7 +94,7 @@ impl Peer {
let state = self.state.clone();
let adapter = Arc::new(self.tracking_adapter.clone());
Box::new(self.proto.handle(conn, adapter).then(move |res| {
Box::new(self.proto.handle(conn, adapter, addr).then(move |res| {
// handle disconnection, standard disconnections aren't considered an error
let mut state = state.write().unwrap();
match res {
@ -134,8 +134,8 @@ impl Peer {
self.proto.transmitted_bytes()
}
pub fn send_ping(&self) -> Result<(), Error> {
self.proto.send_ping()
pub fn send_ping(&self, total_difficulty: Difficulty) -> Result<(), Error> {
self.proto.send_ping(total_difficulty)
}
/// Sends the provided block to the remote peer. The request may be dropped
@ -252,4 +252,8 @@ impl NetAdapter for TrackingAdapter {
fn peer_connected(&self, pi: &PeerInfo) {
self.adapter.peer_connected(pi)
}
fn peer_difficulty(&self, addr: SocketAddr, diff: Difficulty) {
self.adapter.peer_difficulty(addr, diff)
}
}

View file

@ -13,6 +13,7 @@
// limitations under the License.
use std::sync::Arc;
use std::net::SocketAddr;
use futures::Future;
use futures::sync::mpsc::UnboundedSender;
@ -20,6 +21,7 @@ use tokio_core::net::TcpStream;
use core::core;
use core::core::hash::Hash;
use core::core::target::Difficulty;
use core::ser;
use conn::TimeoutConnection;
use msg::*;
@ -46,10 +48,11 @@ impl Protocol for ProtocolV1 {
&self,
conn: TcpStream,
adapter: Arc<NetAdapter>,
addr: SocketAddr,
) -> Box<Future<Item = (), Error = Error>> {
let (conn, listener) = TimeoutConnection::listen(conn, move |sender, header, data| {
let adapt = adapter.as_ref();
handle_payload(adapt, sender, header, data)
handle_payload(adapt, sender, header, data, addr)
});
self.conn.init(conn);
@ -64,8 +67,13 @@ impl Protocol for ProtocolV1 {
/// Sends a ping message to the remote peer. Will panic if handle has never
/// been called on this protocol.
fn send_ping(&self) -> Result<(), Error> {
self.send_request(Type::Ping, Type::Pong, &Empty {}, None)
fn send_ping(&self, total_difficulty: Difficulty) -> Result<(), Error> {
self.send_request(
Type::Ping,
Type::Pong,
&Ping { total_difficulty },
None,
)
}
/// Serializes and sends a block to our remote peer
@ -129,14 +137,29 @@ fn handle_payload(
sender: UnboundedSender<Vec<u8>>,
header: MsgHeader,
buf: Vec<u8>,
addr: SocketAddr,
) -> Result<Option<Hash>, ser::Error> {
match header.msg_type {
Type::Ping => {
let data = ser::ser_vec(&MsgHeader::new(Type::Pong, 0))?;
let ping = ser::deserialize::<Ping>(&mut &buf[..])?;
adapter.peer_difficulty(addr, ping.total_difficulty);
let pong = Pong { total_difficulty: adapter.total_difficulty() };
let mut body_data = vec![];
try!(ser::serialize(&mut body_data, &pong));
let mut data = vec![];
try!(ser::serialize(
&mut data,
&MsgHeader::new(Type::Pong, body_data.len() as u64),
));
data.append(&mut body_data);
sender.unbounded_send(data).unwrap();
Ok(None)
}
Type::Pong => Ok(None),
Type::Pong => {
let pong = ser::deserialize::<Pong>(&mut &buf[..])?;
adapter.peer_difficulty(addr, pong.total_difficulty);
Ok(None)
},
Type::Transaction => {
let tx = ser::deserialize::<core::Transaction>(&mut &buf[..])?;
adapter.transaction_received(tx);

View file

@ -18,7 +18,6 @@
use std::cell::RefCell;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::ops::Deref;
use std::sync::{Arc, RwLock};
use std::time::Duration;
@ -58,6 +57,7 @@ impl NetAdapter for DummyAdapter {
}
fn peer_addrs_received(&self, _: Vec<SocketAddr>) {}
fn peer_connected(&self, _: &PeerInfo) {}
fn peer_difficulty(&self, _: SocketAddr, _: Difficulty) {}
}
/// P2P server implementation, handling bootstrapping to find and connect to
@ -65,7 +65,7 @@ impl NetAdapter for DummyAdapter {
pub struct Server {
config: P2PConfig,
capabilities: Capabilities,
peers: Arc<RwLock<HashMap<SocketAddr, Arc<Peer>>>>,
peers: Arc<RwLock<HashMap<SocketAddr, Arc<RwLock<Peer>>>>>,
handshake: Arc<Handshake>,
adapter: Arc<NetAdapter>,
stop: RefCell<Option<futures::sync::oneshot::Sender<()>>>,
@ -80,13 +80,14 @@ impl Server {
pub fn new(
capab: Capabilities,
config: P2PConfig,
peers: Arc<RwLock<HashMap<SocketAddr, Arc<RwLock<Peer>>>>>,
adapter: Arc<NetAdapter>,
genesis: Hash,
) -> Server {
Server {
config: config,
capabilities: capab,
peers: Arc::new(RwLock::new(HashMap::new())),
peers: peers,
handshake: Arc::new(Handshake::new(genesis)),
adapter: adapter,
stop: RefCell::new(None),
@ -108,19 +109,27 @@ impl Server {
// main peer acceptance future handling handshake
let hp = h.clone();
let peers_listen = socket.incoming().map_err(From::from).map(move |(conn, _)| {
let adapter = adapter.clone();
let total_diff = adapter.total_difficulty();
let peers = peers.clone();
let total_diff = adapter.total_difficulty();
// accept the peer and add it to the server map
let accept = Peer::accept(conn, capab, total_diff, &handshake.clone(), adapter.clone());
let added = add_to_peers(peers, adapter, accept);
let accept = Peer::accept(
conn,
capab,
total_diff,
&handshake.clone(),
adapter.clone(),
);
let added = add_to_peers(peers, adapter.clone(), accept);
// wire in a future to timeout the accept after 5 secs
let timed_peer = with_timeout(Box::new(added), &hp);
// run the main peer protocol
timed_peer.and_then(move |(conn, peer)| peer.clone().run(conn))
timed_peer.and_then(move |(conn, peer)| {
let peer = peer.read().unwrap();
peer.run(conn)
})
});
// spawn each peer future to its own task
@ -143,12 +152,15 @@ impl Server {
*stop_mut = Some(stop);
}
// timer to regularly check on our peers by pinging them
let adapter = self.adapter.clone();
let peers_inner = self.peers.clone();
let peers_timer = Timer::default()
.interval(Duration::new(20, 0))
.fold((), move |_, _| {
check_peers(peers_inner.clone());
let total_diff = adapter.total_difficulty();
check_peers(peers_inner.clone(), total_diff);
Ok(())
});
@ -172,7 +184,7 @@ impl Server {
&self,
addr: SocketAddr,
h: reactor::Handle,
) -> Box<Future<Item = Option<Arc<Peer>>, Error = Error>> {
) -> Box<Future<Item = Option<Arc<RwLock<Peer>>>, Error = Error>> {
if let Some(p) = self.get_peer(addr) {
// if we're already connected to the addr, just return the peer
return Box::new(future::ok(Some(p)));
@ -206,11 +218,12 @@ impl Server {
with_timeout(Box::new(added), &h)
})
.and_then(move |(socket, peer)| {
h2.spawn(peer.run(socket).map_err(|e| {
let peer_inner = peer.read().unwrap();
h2.spawn(peer_inner.run(socket).map_err(|e| {
error!(LOGGER, "Peer error: {:?}", e);
()
}));
Ok(Some(peer))
Ok(Some(peer.clone()))
});
Box::new(request)
}
@ -220,32 +233,34 @@ impl Server {
self.get_peer(addr).is_some()
}
pub fn all_peers(&self) -> Vec<Arc<Peer>> {
pub fn all_peers(&self) -> Vec<Arc<RwLock<Peer>>> {
self.peers.read().unwrap().values().map(|p| p.clone()).collect()
}
/// Get a peer we're connected to by address.
pub fn get_peer(&self, addr: SocketAddr) -> Option<Arc<Peer>> {
pub fn get_peer(&self, addr: SocketAddr) -> Option<Arc<RwLock<Peer>>> {
self.peers.read().unwrap().get(&addr).map(|p| p.clone())
}
/// Have the server iterate over its peer list and prune all peers we have
/// lost connection to or have been deemed problematic. The removed peers
/// are returned.
pub fn clean_peers(&self) -> Vec<Arc<Peer>> {
pub fn clean_peers(&self) -> Vec<Arc<RwLock<Peer>>> {
let mut rm = vec![];
// build a list of peers to be cleaned up
for peer in self.all_peers() {
if !peer.is_connected() {
debug!(LOGGER, "cleaning {:?}, not connected", peer.info.addr);
rm.push(peer);
let peer_inner = peer.read().unwrap();
if !peer_inner.is_connected() {
debug!(LOGGER, "cleaning {:?}, not connected", peer_inner.info.addr);
rm.push(peer.clone());
}
}
// now clean up peer map based on the list to remove
let mut peers = self.peers.write().unwrap();
for p in rm.clone() {
let p = p.read().unwrap();
peers.remove(&p.info.addr);
}
@ -254,22 +269,21 @@ impl Server {
/// Returns the peer with the most worked branch, showing the highest total
/// difficulty.
pub fn most_work_peer(&self) -> Option<Arc<Peer>> {
let peers = self.all_peers();
pub fn most_work_peer(&self) -> Option<Arc<RwLock<Peer>>> {
let mut peers = self.all_peers();
if peers.len() == 0 {
return None;
}
let mut res = peers[0].clone();
for p in peers.deref() {
if p.is_connected() && res.info.total_difficulty < p.info.total_difficulty {
res = (*p).clone();
}
}
Some(res)
peers.sort_by_key(|p| {
let p = p.read().unwrap();
p.info.total_difficulty.clone()
});
let peer = peers.last().unwrap();
Some(peer.clone())
}
/// Returns a random peer we're connected to.
pub fn random_peer(&self) -> Option<Arc<Peer>> {
pub fn random_peer(&self) -> Option<Arc<RwLock<Peer>>> {
let peers = self.all_peers();
if peers.len() == 0 {
None
@ -285,7 +299,8 @@ impl Server {
pub fn broadcast_block(&self, b: &core::Block) {
let peers = self.all_peers();
let mut count = 0;
for p in peers.deref() {
for p in peers {
let p = p.read().unwrap();
if p.is_connected() {
if let Err(e) = p.send_block(b) {
debug!(LOGGER, "Error sending block to peer: {:?}", e);
@ -294,7 +309,7 @@ impl Server {
}
}
}
debug!(LOGGER, "Bardcasted block {} to {} peers.", b.header.height, count);
debug!(LOGGER, "Broadcasted block {} to {} peers.", b.header.height, count);
}
/// Broadcasts the provided transaction to all our peers. A peer
@ -302,7 +317,8 @@ impl Server {
/// remote peer already has the transaction.
pub fn broadcast_transaction(&self, tx: &core::Transaction) {
let peers = self.all_peers();
for p in peers.deref() {
for p in peers {
let p = p.read().unwrap();
if p.is_connected() {
if let Err(e) = p.send_transaction(tx) {
debug!(LOGGER, "Error sending block to peer: {:?}", e);
@ -320,7 +336,8 @@ impl Server {
pub fn stop(self) {
info!(LOGGER, "calling stop on server");
let peers = self.all_peers();
for p in peers.deref() {
for p in peers {
let p = p.write().unwrap();
p.stop();
}
self.stop.into_inner().unwrap().send(()).unwrap();
@ -329,17 +346,17 @@ impl Server {
// Adds the peer built by the provided future in the peers map
fn add_to_peers<A>(
peers: Arc<RwLock<HashMap<SocketAddr, Arc<Peer>>>>,
peers: Arc<RwLock<HashMap<SocketAddr, Arc<RwLock<Peer>>>>>,
adapter: Arc<NetAdapter>,
peer_fut: A,
) -> Box<Future<Item = Result<(TcpStream, Arc<Peer>), ()>, Error = Error>>
) -> Box<Future<Item = Result<(TcpStream, Arc<RwLock<Peer>>), ()>, Error = Error>>
where
A: IntoFuture<Item = (TcpStream, Peer), Error = Error> + 'static,
{
let peer_add = peer_fut.into_future().map(move |(conn, peer)| {
adapter.peer_connected(&peer.info);
let addr = peer.info.addr.clone();
let apeer = Arc::new(peer);
let apeer = Arc::new(RwLock::new(peer));
let mut peers = peers.write().unwrap();
peers.insert(addr, apeer.clone());
Ok((conn, apeer))
@ -349,11 +366,15 @@ where
// Ping all our connected peers. Always automatically expects a pong back or
// disconnects. This acts as a liveness test.
fn check_peers(peers: Arc<RwLock<HashMap<SocketAddr, Arc<Peer>>>>) {
fn check_peers(
peers: Arc<RwLock<HashMap<SocketAddr, Arc<RwLock<Peer>>>>>,
total_difficulty: Difficulty,
) {
let peers_map = peers.read().unwrap();
for p in peers_map.values() {
let p = p.read().unwrap();
if p.is_connected() {
let _ = p.send_ping();
let _ = p.send_ping(total_difficulty.clone());
}
}
}

View file

@ -126,11 +126,11 @@ pub trait Protocol {
/// be known already, usually passed during construction. Will typically
/// block so needs to be called withing a coroutine. Should also be called
/// only once.
fn handle(&self, conn: TcpStream, na: Arc<NetAdapter>)
fn handle(&self, conn: TcpStream, na: Arc<NetAdapter>, addr: SocketAddr)
-> Box<Future<Item = (), Error = Error>>;
/// Sends a ping message to the remote peer.
fn send_ping(&self) -> Result<(), Error>;
fn send_ping(&self, total_difficulty: Difficulty) -> Result<(), Error>;
/// Relays a block to the remote peer.
fn send_block(&self, b: &core::Block) -> Result<(), Error>;
@ -189,4 +189,7 @@ pub trait NetAdapter: Sync + Send {
/// Network successfully connected to a peer.
fn peer_connected(&self, &PeerInfo);
/// Heard total_difficulty from a connected peer (via ping/pong).
fn peer_difficulty(&self, SocketAddr, Difficulty);
}

View file

@ -17,8 +17,9 @@ extern crate grin_core as core;
extern crate grin_p2p as p2p;
extern crate tokio_core;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::time;
use futures::future::Future;
@ -37,9 +38,11 @@ fn peer_handshake() {
let handle = evtlp.handle();
let p2p_conf = p2p::P2PConfig::default();
let net_adapter = Arc::new(p2p::DummyAdapter {});
let connected_peers = Arc::new(RwLock::new(HashMap::new()));
let server = p2p::Server::new(
p2p::UNKNOWN,
p2p_conf,
connected_peers,
net_adapter.clone(),
Hash::from_vec(vec![]),
);
@ -73,7 +76,7 @@ fn peer_handshake() {
rhandle.spawn(peer.run(socket).map_err(|e| {
panic!("Client run failed: {:?}", e);
}));
peer.send_ping().unwrap();
peer.send_ping(Difficulty::one()).unwrap();
timeout_send.from_err().map(|_| peer)
})
.and_then(|peer| {