mirror of
https://github.com/mimblewimble/grin.git
synced 2025-02-01 17:01:09 +03:00
Externalized all peers selection and handling (#468)
Moved handling to the peer map out of the p2p server and into its own struct. Allowed factoring code from the net adapter and simplification of some interactions. Also removes the need for the adapter to reference the p2p server or peers. Fixes #430, #453 and #456
This commit is contained in:
parent
4a03b90190
commit
e50703d79e
12 changed files with 573 additions and 500 deletions
|
@ -486,13 +486,13 @@ let _ = thread::Builder::new()
|
|||
tx_pool: tx_pool.clone(),
|
||||
};
|
||||
let peers_all_handler = PeersAllHandler {
|
||||
p2p_server: p2p_server.clone(),
|
||||
peers: peers.clone(),
|
||||
};
|
||||
let peers_connected_handler = PeersConnectedHandler {
|
||||
p2p_server: p2p_server.clone(),
|
||||
peers: peers.clone(),
|
||||
};
|
||||
let peer_handler = PeerHandler {
|
||||
p2p_server: p2p_server.clone(),
|
||||
peers: peers.clone(),
|
||||
};
|
||||
|
||||
let route_list = vec!(
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
// Copyright 2016 The Grin Developers
|
||||
// Copyright 2017 The Grin Developers
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
|
@ -21,7 +21,7 @@ use core::core::{self, Output};
|
|||
use core::core::block::BlockHeader;
|
||||
use core::core::hash::{Hash, Hashed};
|
||||
use core::core::target::Difficulty;
|
||||
use p2p::{self, NetAdapter, PeerData, State};
|
||||
use p2p;
|
||||
use pool;
|
||||
use util::secp::pedersen::Commitment;
|
||||
use util::OneTime;
|
||||
|
@ -32,13 +32,12 @@ use util::LOGGER;
|
|||
/// blocks and transactions are received and forwards to the chain and pool
|
||||
/// implementations.
|
||||
pub struct NetToChainAdapter {
|
||||
currently_syncing: Arc<AtomicBool>,
|
||||
chain: Arc<chain::Chain>,
|
||||
p2p_server: OneTime<Arc<p2p::Server>>,
|
||||
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
|
||||
syncing: AtomicBool,
|
||||
}
|
||||
|
||||
impl NetAdapter for NetToChainAdapter {
|
||||
impl p2p::ChainAdapter for NetToChainAdapter {
|
||||
fn total_difficulty(&self) -> Difficulty {
|
||||
self.chain.total_difficulty()
|
||||
}
|
||||
|
@ -65,7 +64,7 @@ impl NetAdapter for NetToChainAdapter {
|
|||
}
|
||||
}
|
||||
|
||||
fn block_received(&self, b: core::Block, addr: SocketAddr) {
|
||||
fn block_received(&self, b: core::Block, _: SocketAddr) -> bool {
|
||||
let bhash = b.hash();
|
||||
debug!(
|
||||
LOGGER,
|
||||
|
@ -79,19 +78,16 @@ impl NetAdapter for NetToChainAdapter {
|
|||
|
||||
if let &Err(ref e) = &res {
|
||||
debug!(LOGGER, "Block {} refused by chain: {:?}", bhash, e);
|
||||
|
||||
// if the peer sent us a block that's intrinsically bad, they're either
|
||||
// mistaken or manevolent, both of which require a ban
|
||||
if e.is_bad_block() {
|
||||
self.p2p_server.borrow().ban_peer(&addr);
|
||||
//
|
||||
// // header chain should be consistent with the sync head here
|
||||
// // we just banned the peer that sent a bad block so
|
||||
// // sync head should resolve itself if/when we find an alternative peer
|
||||
// // with more work than ourselves
|
||||
// // we should not need to reset the header head here
|
||||
// header chain should be consistent with the sync head here
|
||||
// we just banned the peer that sent a bad block so
|
||||
// sync head should resolve itself if/when we find an alternative peer
|
||||
// with more work than ourselves
|
||||
// we should not need to reset the header head here
|
||||
return false;
|
||||
}
|
||||
}
|
||||
true
|
||||
}
|
||||
|
||||
fn headers_received(&self, bhs: Vec<core::BlockHeader>, addr: SocketAddr) {
|
||||
|
@ -193,135 +189,21 @@ impl NetAdapter for NetToChainAdapter {
|
|||
}
|
||||
}
|
||||
|
||||
/// Find good peers we know with the provided capability and return their
|
||||
/// addresses.
|
||||
fn find_peer_addrs(&self, capab: p2p::Capabilities) -> Vec<SocketAddr> {
|
||||
let peers = self.p2p_server.borrow()
|
||||
.find_peers(State::Healthy, capab, p2p::MAX_PEER_ADDRS as usize);
|
||||
debug!(LOGGER, "Got {} peer addrs to send.", peers.len());
|
||||
map_vec!(peers, |p| p.addr)
|
||||
}
|
||||
|
||||
/// A list of peers has been received from one of our peers.
|
||||
fn peer_addrs_received(&self, peer_addrs: Vec<SocketAddr>) {
|
||||
debug!(LOGGER, "Received {} peer addrs, saving.", peer_addrs.len());
|
||||
for pa in peer_addrs {
|
||||
if let Ok(e) = self.p2p_server.borrow().exists_peer(pa) {
|
||||
if e {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
let peer = PeerData {
|
||||
addr: pa,
|
||||
capabilities: p2p::UNKNOWN,
|
||||
user_agent: "".to_string(),
|
||||
flags: State::Healthy,
|
||||
};
|
||||
if let Err(e) = self.p2p_server.borrow().save_peer(&peer) {
|
||||
error!(LOGGER, "Could not save received peer address: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Network successfully connected to a peer.
|
||||
fn peer_connected(&self, pi: &p2p::PeerInfo) {
|
||||
debug!(LOGGER, "Saving newly connected peer {}.", pi.addr);
|
||||
let peer = PeerData {
|
||||
addr: pi.addr,
|
||||
capabilities: pi.capabilities,
|
||||
user_agent: pi.user_agent.clone(),
|
||||
flags: State::Healthy,
|
||||
};
|
||||
if let Err(e) = self.p2p_server.borrow().save_peer(&peer) {
|
||||
error!(LOGGER, "Could not save connected peer: {:?}", e);
|
||||
}
|
||||
}
|
||||
|
||||
fn peer_difficulty(&self, addr: SocketAddr, diff: Difficulty, height: u64) {
|
||||
debug!(
|
||||
LOGGER,
|
||||
"peer total_diff @ height (ping/pong): {}: {} @ {} \
|
||||
vs us: {} @ {}",
|
||||
addr,
|
||||
diff,
|
||||
height,
|
||||
self.total_difficulty(),
|
||||
self.total_height()
|
||||
);
|
||||
|
||||
if diff.into_num() > 0 && self.p2p_server.is_initialized() {
|
||||
if let Some(peer) = self.p2p_server.borrow().get_peer(&addr) {
|
||||
let mut peer = peer.write().unwrap();
|
||||
peer.info.total_difficulty = diff;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl NetToChainAdapter {
|
||||
pub fn new(
|
||||
currently_syncing: Arc<AtomicBool>,
|
||||
chain_ref: Arc<chain::Chain>,
|
||||
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
|
||||
) -> NetToChainAdapter {
|
||||
NetToChainAdapter {
|
||||
currently_syncing: currently_syncing,
|
||||
chain: chain_ref,
|
||||
p2p_server: OneTime::new(),
|
||||
tx_pool: tx_pool,
|
||||
syncing: AtomicBool::new(true),
|
||||
}
|
||||
}
|
||||
|
||||
/// Setup the p2p server on the adapter
|
||||
pub fn init(&self, p2p: Arc<p2p::Server>) {
|
||||
self.p2p_server.init(p2p);
|
||||
}
|
||||
|
||||
/// Whether we're currently syncing the chain or we're fully caught up and
|
||||
/// just receiving blocks through gossip.
|
||||
pub fn is_syncing(&self) -> bool {
|
||||
let local_diff = self.total_difficulty();
|
||||
let peer = self.p2p_server.borrow().most_work_peer();
|
||||
|
||||
// if we're already syncing, we're caught up if no peer has a higher
|
||||
// difficulty than us
|
||||
if self.syncing.load(Ordering::Relaxed) {
|
||||
if let Some(peer) = peer {
|
||||
if let Ok(peer) = peer.try_read() {
|
||||
if peer.info.total_difficulty <= local_diff {
|
||||
info!(LOGGER, "sync: caught up on most worked chain, disabling sync");
|
||||
self.syncing.store(false, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
info!(LOGGER, "sync: no peers available, disabling sync");
|
||||
self.syncing.store(false, Ordering::Relaxed);
|
||||
}
|
||||
} else {
|
||||
if let Some(peer) = peer {
|
||||
if let Ok(peer) = peer.try_read() {
|
||||
// sum the last 5 difficulties to give us the threshold
|
||||
let threshold = self.chain
|
||||
.difficulty_iter()
|
||||
.filter_map(|x| x.map(|(_, x)| x).ok())
|
||||
.take(5)
|
||||
.fold(Difficulty::zero(), |sum, val| sum + val);
|
||||
|
||||
if peer.info.total_difficulty > local_diff.clone() + threshold.clone() {
|
||||
info!(
|
||||
LOGGER,
|
||||
"sync: total_difficulty {}, peer_difficulty {}, threshold {} (last 5 blocks), enabling sync",
|
||||
local_diff,
|
||||
peer.info.total_difficulty,
|
||||
threshold,
|
||||
);
|
||||
self.syncing.store(true, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
self.syncing.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
// recursively go back through the locator vector and stop when we find
|
||||
// a header that we recognize this will be a header shared in common
|
||||
// between us and the peer
|
||||
|
@ -358,7 +240,7 @@ impl NetToChainAdapter {
|
|||
|
||||
/// Prepare options for the chain pipeline
|
||||
fn chain_opts(&self) -> chain::Options {
|
||||
let opts = if self.is_syncing() {
|
||||
let opts = if self.currently_syncing.load(Ordering::Relaxed) {
|
||||
chain::SYNC
|
||||
} else {
|
||||
chain::NONE
|
||||
|
@ -372,7 +254,7 @@ impl NetToChainAdapter {
|
|||
/// the network to broadcast the block
|
||||
pub struct ChainToPoolAndNetAdapter {
|
||||
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
|
||||
p2p: OneTime<Arc<p2p::Server>>,
|
||||
peers: OneTime<p2p::Peers>,
|
||||
}
|
||||
|
||||
impl ChainAdapter for ChainToPoolAndNetAdapter {
|
||||
|
@ -387,7 +269,7 @@ impl ChainAdapter for ChainToPoolAndNetAdapter {
|
|||
);
|
||||
}
|
||||
}
|
||||
self.p2p.borrow().broadcast_block(b);
|
||||
self.peers.borrow().broadcast_block(b);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -397,23 +279,23 @@ impl ChainToPoolAndNetAdapter {
|
|||
) -> ChainToPoolAndNetAdapter {
|
||||
ChainToPoolAndNetAdapter {
|
||||
tx_pool: tx_pool,
|
||||
p2p: OneTime::new(),
|
||||
peers: OneTime::new(),
|
||||
}
|
||||
}
|
||||
pub fn init(&self, p2p: Arc<p2p::Server>) {
|
||||
self.p2p.init(p2p);
|
||||
pub fn init(&self, peers: p2p::Peers) {
|
||||
self.peers.init(peers);
|
||||
}
|
||||
}
|
||||
|
||||
/// Adapter between the transaction pool and the network, to relay
|
||||
/// transactions that have been accepted.
|
||||
pub struct PoolToNetAdapter {
|
||||
p2p: OneTime<Arc<p2p::Server>>,
|
||||
peers: OneTime<p2p::Peers>,
|
||||
}
|
||||
|
||||
impl pool::PoolAdapter for PoolToNetAdapter {
|
||||
fn tx_accepted(&self, tx: &core::Transaction) {
|
||||
self.p2p.borrow().broadcast_transaction(tx);
|
||||
self.peers.borrow().broadcast_transaction(tx);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -421,13 +303,13 @@ impl PoolToNetAdapter {
|
|||
/// Create a new pool to net adapter
|
||||
pub fn new() -> PoolToNetAdapter {
|
||||
PoolToNetAdapter {
|
||||
p2p: OneTime::new(),
|
||||
peers: OneTime::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Setup the p2p server on the adapter
|
||||
pub fn init(&self, p2p: Arc<p2p::Server>) {
|
||||
self.p2p.init(p2p);
|
||||
pub fn init(&self, peers: p2p::Peers) {
|
||||
self.peers.init(peers);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -36,18 +36,20 @@ const PEER_PREFERRED_COUNT: u32 = 8;
|
|||
const SEEDS_URL: &'static str = "http://grin-tech.org/seeds.txt";
|
||||
|
||||
pub struct Seeder {
|
||||
p2p: Arc<p2p::Server>,
|
||||
|
||||
peers: p2p::Peers,
|
||||
p2p_server: Arc<p2p::Server>,
|
||||
capabilities: p2p::Capabilities,
|
||||
}
|
||||
|
||||
impl Seeder {
|
||||
pub fn new(
|
||||
capabilities: p2p::Capabilities,
|
||||
p2p: Arc<p2p::Server>,
|
||||
p2p_server: Arc<p2p::Server>,
|
||||
peers: p2p::Peers,
|
||||
) -> Seeder {
|
||||
Seeder {
|
||||
p2p: p2p,
|
||||
peers: peers,
|
||||
p2p_server: p2p_server,
|
||||
capabilities: capabilities,
|
||||
}
|
||||
}
|
||||
|
@ -76,7 +78,7 @@ impl Seeder {
|
|||
&self,
|
||||
tx: mpsc::UnboundedSender<SocketAddr>,
|
||||
) -> Box<Future<Item = (), Error = String>> {
|
||||
let p2p_server = self.p2p.clone();
|
||||
let peers = self.peers.clone();
|
||||
let capabilities = self.capabilities.clone();
|
||||
|
||||
// now spawn a new future to regularly check if we need to acquire more peers
|
||||
|
@ -84,19 +86,19 @@ impl Seeder {
|
|||
let mon_loop = Timer::default()
|
||||
.interval(time::Duration::from_secs(30))
|
||||
.for_each(move |_| {
|
||||
let total_count = p2p_server.all_peers().len();
|
||||
let total_count = peers.all_peers().len();
|
||||
debug!(
|
||||
LOGGER,
|
||||
"monitor_peers: {} most_work_peers, {} connected, {} total known",
|
||||
p2p_server.most_work_peers().len(),
|
||||
p2p_server.connected_peers().len(),
|
||||
peers.most_work_peers().len(),
|
||||
peers.connected_peers().len(),
|
||||
total_count,
|
||||
);
|
||||
|
||||
let mut healthy_count = 0;
|
||||
let mut banned_count = 0;
|
||||
let mut defunct_count = 0;
|
||||
for x in p2p_server.all_peers() {
|
||||
for x in peers.all_peers() {
|
||||
if x.flags == p2p::State::Healthy { healthy_count += 1 }
|
||||
else if x.flags == p2p::State::Banned { banned_count += 1 }
|
||||
else if x.flags == p2p::State::Defunct { defunct_count += 1 };
|
||||
|
@ -113,14 +115,14 @@ impl Seeder {
|
|||
|
||||
// maintenance step first, clean up p2p server peers
|
||||
{
|
||||
p2p_server.clean_peers(PEER_PREFERRED_COUNT as usize);
|
||||
peers.clean_peers(PEER_PREFERRED_COUNT as usize);
|
||||
}
|
||||
|
||||
// not enough peers, getting more from db
|
||||
if p2p_server.peer_count() < PEER_PREFERRED_COUNT {
|
||||
if peers.peer_count() < PEER_PREFERRED_COUNT {
|
||||
// loop over connected peers
|
||||
// ask them for their list of peers
|
||||
for p in p2p_server.connected_peers() {
|
||||
for p in peers.connected_peers() {
|
||||
if let Ok(p) = p.try_read() {
|
||||
debug!(
|
||||
LOGGER,
|
||||
|
@ -138,7 +140,7 @@ impl Seeder {
|
|||
|
||||
// find some peers from our db
|
||||
// and queue them up for a connection attempt
|
||||
let peers = p2p_server.find_peers(
|
||||
let peers = peers.find_peers(
|
||||
p2p::State::Healthy,
|
||||
p2p::UNKNOWN,
|
||||
100,
|
||||
|
@ -171,11 +173,11 @@ impl Seeder {
|
|||
// db query
|
||||
let thread_pool = cpupool::Builder::new()
|
||||
.pool_size(1).name_prefix("seed").create();
|
||||
let p2p_server = self.p2p.clone();
|
||||
let peers = self.peers.clone();
|
||||
let seeder = thread_pool
|
||||
.spawn_fn(move || {
|
||||
// check if we have some peers in db
|
||||
let peers = p2p_server.find_peers(
|
||||
let peers = peers.find_peers(
|
||||
p2p::State::Healthy,
|
||||
p2p::FULL_HIST,
|
||||
100,
|
||||
|
@ -215,21 +217,15 @@ impl Seeder {
|
|||
rx: mpsc::UnboundedReceiver<SocketAddr>,
|
||||
) -> Box<Future<Item = (), Error = ()>> {
|
||||
let capab = self.capabilities;
|
||||
let p2p_server = self.p2p.clone();
|
||||
let peers = self.peers.clone();
|
||||
let p2p_server = self.p2p_server.clone();
|
||||
|
||||
let listener = rx.for_each(move |peer_addr| {
|
||||
debug!(LOGGER, "New peer address to connect to: {}.", peer_addr);
|
||||
let inner_h = h.clone();
|
||||
if p2p_server.peer_count() < PEER_MAX_COUNT {
|
||||
h.spawn(
|
||||
connect_and_req(
|
||||
capab,
|
||||
p2p_server.clone(),
|
||||
inner_h,
|
||||
peer_addr,
|
||||
)
|
||||
)
|
||||
};
|
||||
if peers.peer_count() < PEER_MAX_COUNT {
|
||||
h.spawn(connect_and_req(capab, p2p_server.clone(), peers.clone(), inner_h, peer_addr))
|
||||
}
|
||||
Box::new(future::ok(()))
|
||||
});
|
||||
Box::new(listener)
|
||||
|
@ -293,11 +289,12 @@ pub fn predefined_seeds(
|
|||
fn connect_and_req(
|
||||
capab: p2p::Capabilities,
|
||||
p2p: Arc<p2p::Server>,
|
||||
peers: p2p::Peers,
|
||||
h: reactor::Handle,
|
||||
addr: SocketAddr,
|
||||
) -> Box<Future<Item = (), Error = ()>> {
|
||||
let connect_peer = p2p.connect_peer(addr, h);
|
||||
let p2p_server = p2p.clone();
|
||||
let peers = peers.clone();
|
||||
let fut = connect_peer.then(move |p| {
|
||||
match p {
|
||||
Ok(Some(p)) => {
|
||||
|
@ -311,7 +308,7 @@ fn connect_and_req(
|
|||
},
|
||||
Err(e) => {
|
||||
debug!(LOGGER, "connect_and_req: {} is Defunct; {:?}", addr, e);
|
||||
let _ = p2p_server.update_state(addr, p2p::State::Defunct);
|
||||
let _ = peers.update_state(addr, p2p::State::Defunct);
|
||||
},
|
||||
}
|
||||
Ok(())
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::thread;
|
||||
use std::time;
|
||||
|
||||
|
@ -51,7 +52,7 @@ pub struct Server {
|
|||
chain: Arc<chain::Chain>,
|
||||
/// in-memory transaction pool
|
||||
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
|
||||
net_adapter: Arc<NetToChainAdapter>,
|
||||
currently_syncing: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl Server {
|
||||
|
@ -108,7 +109,10 @@ impl Server {
|
|||
|
||||
pool_adapter.set_chain(shared_chain.clone());
|
||||
|
||||
let currently_syncing = Arc::new(AtomicBool::new(true));
|
||||
|
||||
let net_adapter = Arc::new(NetToChainAdapter::new(
|
||||
currently_syncing.clone(),
|
||||
shared_chain.clone(),
|
||||
tx_pool.clone(),
|
||||
));
|
||||
|
@ -120,11 +124,11 @@ impl Server {
|
|||
net_adapter.clone(),
|
||||
genesis.hash(),
|
||||
)?);
|
||||
chain_adapter.init(p2p_server.clone());
|
||||
pool_net_adapter.init(p2p_server.clone());
|
||||
net_adapter.init(p2p_server.clone());
|
||||
chain_adapter.init(p2p_server.peers.clone());
|
||||
pool_net_adapter.init(p2p_server.peers.clone());
|
||||
|
||||
let seed = seed::Seeder::new(config.capabilities, p2p_server.clone());
|
||||
let seed = seed::Seeder::new(
|
||||
config.capabilities, p2p_server.clone(), p2p_server.peers.clone());
|
||||
match config.seeding_type.clone() {
|
||||
Seeding::None => {
|
||||
warn!(
|
||||
|
@ -152,8 +156,8 @@ impl Server {
|
|||
}
|
||||
|
||||
sync::run_sync(
|
||||
net_adapter.clone(),
|
||||
p2p_server.clone(),
|
||||
currently_syncing.clone(),
|
||||
p2p_server.peers.clone(),
|
||||
shared_chain.clone(),
|
||||
);
|
||||
|
||||
|
@ -165,7 +169,7 @@ impl Server {
|
|||
config.api_http_addr.clone(),
|
||||
shared_chain.clone(),
|
||||
tx_pool.clone(),
|
||||
p2p_server.clone(),
|
||||
p2p_server.peers.clone(),
|
||||
);
|
||||
|
||||
warn!(LOGGER, "Grin server started.");
|
||||
|
@ -175,7 +179,7 @@ impl Server {
|
|||
p2p: p2p_server,
|
||||
chain: shared_chain,
|
||||
tx_pool: tx_pool,
|
||||
net_adapter: net_adapter,
|
||||
currently_syncing: currently_syncing,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -193,7 +197,7 @@ impl Server {
|
|||
|
||||
/// Number of peers
|
||||
pub fn peer_count(&self) -> u32 {
|
||||
self.p2p.peer_count()
|
||||
self.p2p.peers.peer_count()
|
||||
}
|
||||
|
||||
/// Start mining for blocks on a separate thread. Uses toy miner by default,
|
||||
|
@ -201,7 +205,7 @@ impl Server {
|
|||
pub fn start_miner(&self, config: pow::types::MinerConfig) {
|
||||
let cuckoo_size = global::sizeshift();
|
||||
let proof_size = global::proofsize();
|
||||
let net_adapter = self.net_adapter.clone();
|
||||
let currently_syncing = self.currently_syncing.clone();
|
||||
|
||||
let mut miner = miner::Miner::new(config.clone(), self.chain.clone(), self.tx_pool.clone());
|
||||
miner.set_debug_output_id(format!("Port {}", self.config.p2p_config.unwrap().port));
|
||||
|
@ -211,7 +215,7 @@ impl Server {
|
|||
// TODO push this down in the run loop so miner gets paused anytime we
|
||||
// decide to sync again
|
||||
let secs_5 = time::Duration::from_secs(5);
|
||||
while net_adapter.is_syncing() {
|
||||
while currently_syncing.load(Ordering::Relaxed) {
|
||||
thread::sleep(secs_5);
|
||||
}
|
||||
miner.run_loop(config.clone(), cuckoo_size as u32, proof_size);
|
||||
|
|
|
@ -15,24 +15,24 @@
|
|||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use time;
|
||||
|
||||
use adapters::NetToChainAdapter;
|
||||
use chain;
|
||||
use core::core::hash::{Hash, Hashed};
|
||||
use p2p::{self, Peer};
|
||||
use core::core::target::Difficulty;
|
||||
use p2p::{self, Peer, Peers, ChainAdapter};
|
||||
use types::Error;
|
||||
use util::LOGGER;
|
||||
|
||||
/// Starts the syncing loop, just spawns two threads that loop forever
|
||||
pub fn run_sync(
|
||||
adapter: Arc<NetToChainAdapter>,
|
||||
p2p_server: Arc<p2p::Server>,
|
||||
currently_syncing: Arc<AtomicBool>,
|
||||
peers: p2p::Peers,
|
||||
chain: Arc<chain::Chain>,
|
||||
) {
|
||||
let a_inner = adapter.clone();
|
||||
let p2p_inner = p2p_server.clone();
|
||||
let c_inner = chain.clone();
|
||||
|
||||
let chain = chain.clone();
|
||||
let _ = thread::Builder::new()
|
||||
.name("sync".to_string())
|
||||
.spawn(move || {
|
||||
|
@ -43,13 +43,16 @@ pub fn run_sync(
|
|||
thread::sleep(Duration::from_secs(30));
|
||||
|
||||
loop {
|
||||
if a_inner.is_syncing() {
|
||||
let syncing = needs_syncing(
|
||||
currently_syncing.clone(), peers.clone(), chain.clone());
|
||||
if syncing {
|
||||
|
||||
let current_time = time::now_utc();
|
||||
|
||||
// run the header sync every 10s
|
||||
if current_time - prev_header_sync > time::Duration::seconds(10) {
|
||||
header_sync(
|
||||
p2p_server.clone(),
|
||||
peers.clone(),
|
||||
chain.clone(),
|
||||
);
|
||||
prev_header_sync = current_time;
|
||||
|
@ -58,8 +61,8 @@ pub fn run_sync(
|
|||
// run the body_sync every 5s
|
||||
if current_time - prev_body_sync > time::Duration::seconds(5) {
|
||||
body_sync(
|
||||
p2p_inner.clone(),
|
||||
c_inner.clone(),
|
||||
peers.clone(),
|
||||
chain.clone(),
|
||||
);
|
||||
prev_body_sync = current_time;
|
||||
}
|
||||
|
@ -72,10 +75,8 @@ pub fn run_sync(
|
|||
});
|
||||
}
|
||||
|
||||
fn body_sync(
|
||||
p2p_server: Arc<p2p::Server>,
|
||||
chain: Arc<chain::Chain>,
|
||||
) {
|
||||
fn body_sync(peers: Peers, chain: Arc<chain::Chain>) {
|
||||
|
||||
let body_head: chain::Tip = chain.head().unwrap();
|
||||
let header_head: chain::Tip = chain.get_header_head().unwrap();
|
||||
let sync_head: chain::Tip = chain.get_sync_head().unwrap();
|
||||
|
@ -112,7 +113,7 @@ fn body_sync(
|
|||
// if we have 5 most_work_peers then ask for 50 blocks total (peer_count * 10)
|
||||
// max will be 80 if all 8 peers are advertising most_work
|
||||
let peer_count = {
|
||||
p2p_server.most_work_peers().len()
|
||||
peers.most_work_peers().len()
|
||||
};
|
||||
let block_count = peer_count * 10;
|
||||
|
||||
|
@ -134,7 +135,7 @@ fn body_sync(
|
|||
);
|
||||
|
||||
for hash in hashes_to_get.clone() {
|
||||
let peer = p2p_server.most_work_peer();
|
||||
let peer = peers.most_work_peer();
|
||||
if let Some(peer) = peer {
|
||||
if let Ok(peer) = peer.try_read() {
|
||||
let _ = peer.send_block_request(hash);
|
||||
|
@ -144,14 +145,11 @@ fn body_sync(
|
|||
}
|
||||
}
|
||||
|
||||
pub fn header_sync(
|
||||
p2p_server: Arc<p2p::Server>,
|
||||
chain: Arc<chain::Chain>,
|
||||
) {
|
||||
pub fn header_sync(peers: Peers, chain: Arc<chain::Chain>) {
|
||||
if let Ok(header_head) = chain.get_header_head() {
|
||||
let difficulty = header_head.total_difficulty;
|
||||
|
||||
if let Some(peer) = p2p_server.most_work_peer() {
|
||||
if let Some(peer) = peers.most_work_peer() {
|
||||
if let Ok(p) = peer.try_read() {
|
||||
let peer_difficulty = p.info.total_difficulty.clone();
|
||||
if peer_difficulty > difficulty {
|
||||
|
@ -189,6 +187,57 @@ fn request_headers(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
/// Whether we're currently syncing the chain or we're fully caught up and
|
||||
/// just receiving blocks through gossip.
|
||||
pub fn needs_syncing(
|
||||
currently_syncing: Arc<AtomicBool>,
|
||||
peers: Peers,
|
||||
chain: Arc<chain::Chain>) -> bool {
|
||||
|
||||
let local_diff = peers.total_difficulty();
|
||||
let peer = peers.most_work_peer();
|
||||
|
||||
// if we're already syncing, we're caught up if no peer has a higher
|
||||
// difficulty than us
|
||||
if currently_syncing.load(Ordering::Relaxed) {
|
||||
if let Some(peer) = peer {
|
||||
if let Ok(peer) = peer.try_read() {
|
||||
if peer.info.total_difficulty <= local_diff {
|
||||
info!(LOGGER, "sync: caught up on most worked chain, disabling sync");
|
||||
currently_syncing.store(false, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
info!(LOGGER, "sync: no peers available, disabling sync");
|
||||
currently_syncing.store(false, Ordering::Relaxed);
|
||||
}
|
||||
} else {
|
||||
if let Some(peer) = peer {
|
||||
if let Ok(peer) = peer.try_read() {
|
||||
// sum the last 5 difficulties to give us the threshold
|
||||
let threshold = chain
|
||||
.difficulty_iter()
|
||||
.filter_map(|x| x.map(|(_, x)| x).ok())
|
||||
.take(5)
|
||||
.fold(Difficulty::zero(), |sum, val| sum + val);
|
||||
|
||||
if peer.info.total_difficulty > local_diff.clone() + threshold.clone() {
|
||||
info!(
|
||||
LOGGER,
|
||||
"sync: total_difficulty {}, peer_difficulty {}, threshold {} (last 5 blocks), enabling sync",
|
||||
local_diff,
|
||||
peer.info.total_difficulty,
|
||||
threshold,
|
||||
);
|
||||
currently_syncing.store(true, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
currently_syncing.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// We build a locator based on sync_head.
|
||||
/// Even if sync_head is significantly out of date we will "reset" it once we start getting
|
||||
/// headers back from a peer.
|
||||
|
|
|
@ -47,13 +47,15 @@ pub mod handshake;
|
|||
mod rate_limit;
|
||||
mod msg;
|
||||
mod peer;
|
||||
mod peers;
|
||||
mod protocol;
|
||||
mod server;
|
||||
mod store;
|
||||
mod types;
|
||||
|
||||
pub use server::{DummyAdapter, Server};
|
||||
pub use peers::Peers;
|
||||
pub use peer::Peer;
|
||||
pub use types::{Capabilities, Error, NetAdapter, P2PConfig, PeerInfo, FULL_HIST, FULL_NODE,
|
||||
MAX_BLOCK_HEADERS, MAX_PEER_ADDRS, UNKNOWN};
|
||||
pub use types::{Capabilities, Error, ChainAdapter, P2PConfig, PeerInfo, FULL_HIST, FULL_NODE,
|
||||
MAX_BLOCK_HEADERS, MAX_PEER_ADDRS, UNKNOWN};
|
||||
pub use store::{PeerData, State};
|
||||
|
|
|
@ -221,7 +221,7 @@ impl TrackingAdapter {
|
|||
}
|
||||
}
|
||||
|
||||
impl NetAdapter for TrackingAdapter {
|
||||
impl ChainAdapter for TrackingAdapter {
|
||||
fn total_difficulty(&self) -> Difficulty {
|
||||
self.adapter.total_difficulty()
|
||||
}
|
||||
|
@ -235,7 +235,7 @@ impl NetAdapter for TrackingAdapter {
|
|||
self.adapter.transaction_received(tx)
|
||||
}
|
||||
|
||||
fn block_received(&self, b: core::Block, addr: SocketAddr) {
|
||||
fn block_received(&self, b: core::Block, addr: SocketAddr) -> bool {
|
||||
self.push(b.hash());
|
||||
self.adapter.block_received(b, addr)
|
||||
}
|
||||
|
@ -251,7 +251,9 @@ impl NetAdapter for TrackingAdapter {
|
|||
fn get_block(&self, h: Hash) -> Option<core::Block> {
|
||||
self.adapter.get_block(h)
|
||||
}
|
||||
}
|
||||
|
||||
impl NetAdapter for TrackingAdapter {
|
||||
fn find_peer_addrs(&self, capab: Capabilities) -> Vec<SocketAddr> {
|
||||
self.adapter.find_peer_addrs(capab)
|
||||
}
|
||||
|
@ -260,10 +262,6 @@ impl NetAdapter for TrackingAdapter {
|
|||
self.adapter.peer_addrs_received(addrs)
|
||||
}
|
||||
|
||||
fn peer_connected(&self, pi: &PeerInfo) {
|
||||
self.adapter.peer_connected(pi)
|
||||
}
|
||||
|
||||
fn peer_difficulty(&self, addr: SocketAddr, diff: Difficulty, height:u64) {
|
||||
self.adapter.peer_difficulty(addr, diff, height)
|
||||
}
|
||||
|
|
381
p2p/src/peers.rs
Normal file
381
p2p/src/peers.rs
Normal file
|
@ -0,0 +1,381 @@
|
|||
// Copyright 2016 The Grin Developers
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use rand::{thread_rng, Rng};
|
||||
|
||||
use core::core;
|
||||
use core::core::hash::Hash;
|
||||
use core::core::target::Difficulty;
|
||||
use util::LOGGER;
|
||||
|
||||
use peer::Peer;
|
||||
use store::{PeerStore, PeerData, State};
|
||||
use types::*;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Peers {
|
||||
pub adapter: Arc<ChainAdapter>,
|
||||
store: Arc<PeerStore>,
|
||||
peers: Arc<RwLock<HashMap<SocketAddr, Arc<RwLock<Peer>>>>>,
|
||||
}
|
||||
|
||||
unsafe impl Send for Peers {}
|
||||
unsafe impl Sync for Peers {}
|
||||
|
||||
impl Peers {
|
||||
pub fn new(store: PeerStore, adapter: Arc<ChainAdapter>) -> Peers {
|
||||
Peers {
|
||||
adapter: adapter,
|
||||
store: Arc::new(store),
|
||||
peers: Arc::new(RwLock::new(HashMap::new())),
|
||||
}
|
||||
}
|
||||
|
||||
/// Adds the peer to our internal peer mapping. Note that the peer is still
|
||||
/// returned so the server can run it.
|
||||
pub fn add_connected(&self, p: Peer) -> Arc<RwLock<Peer>> {
|
||||
debug!(LOGGER, "Saving newly connected peer {}.", p.info.addr);
|
||||
let peer_data = PeerData {
|
||||
addr: p.info.addr,
|
||||
capabilities: p.info.capabilities,
|
||||
user_agent: p.info.user_agent.clone(),
|
||||
flags: State::Healthy,
|
||||
};
|
||||
if let Err(e) = self.save_peer(&peer_data) {
|
||||
error!(LOGGER, "Could not save connected peer: {:?}", e);
|
||||
}
|
||||
|
||||
let addr = p.info.addr.clone();
|
||||
let apeer = Arc::new(RwLock::new(p));
|
||||
{
|
||||
let mut peers = self.peers.write().unwrap();
|
||||
peers.insert(addr, apeer.clone());
|
||||
}
|
||||
apeer.clone()
|
||||
}
|
||||
|
||||
pub fn is_known(&self, addr: &SocketAddr) -> bool {
|
||||
self.get_peer(addr).is_some()
|
||||
}
|
||||
|
||||
pub fn connected_peers(&self) -> Vec<Arc<RwLock<Peer>>> {
|
||||
self.peers.read().unwrap().values().map(|p| p.clone()).collect()
|
||||
}
|
||||
|
||||
/// Get a peer we're connected to by address.
|
||||
pub fn get_peer(&self, addr: &SocketAddr) -> Option<Arc<RwLock<Peer>>> {
|
||||
self.peers.read().unwrap().get(addr).map(|p| p.clone())
|
||||
}
|
||||
|
||||
/// Number of peers we're currently connected to.
|
||||
pub fn peer_count(&self) -> u32 {
|
||||
self.connected_peers().len() as u32
|
||||
}
|
||||
|
||||
/// Return vec of all peers that currently have the most worked branch,
|
||||
/// showing the highest total difficulty.
|
||||
pub fn most_work_peers(&self) -> Vec<Arc<RwLock<Peer>>> {
|
||||
let peers = self.connected_peers();
|
||||
if peers.len() == 0 {
|
||||
return vec![];
|
||||
}
|
||||
|
||||
let max_total_difficulty = peers
|
||||
.iter()
|
||||
.map(|x| {
|
||||
match x.try_read() {
|
||||
Ok(peer) => peer.info.total_difficulty.clone(),
|
||||
Err(_) => Difficulty::zero(),
|
||||
}
|
||||
})
|
||||
.max()
|
||||
.unwrap();
|
||||
|
||||
let mut max_peers = peers
|
||||
.iter()
|
||||
.filter(|x| {
|
||||
match x.try_read() {
|
||||
Ok(peer) => {
|
||||
peer.info.total_difficulty == max_total_difficulty
|
||||
},
|
||||
Err(_) => false,
|
||||
}
|
||||
})
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
thread_rng().shuffle(&mut max_peers);
|
||||
max_peers
|
||||
}
|
||||
|
||||
/// Returns single random peer with the most worked branch, showing the highest total
|
||||
/// difficulty.
|
||||
pub fn most_work_peer(&self) -> Option<Arc<RwLock<Peer>>> {
|
||||
match self.most_work_peers().first() {
|
||||
Some(x) => Some(x.clone()),
|
||||
None => None
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a random connected peer.
|
||||
pub fn random_peer(&self) -> Option<Arc<RwLock<Peer>>> {
|
||||
let peers = self.connected_peers();
|
||||
Some(thread_rng().choose(&peers).unwrap().clone())
|
||||
}
|
||||
|
||||
pub fn is_banned(&self, peer_addr: SocketAddr) -> bool {
|
||||
if let Ok(peer_data) = self.store.get_peer(peer_addr) {
|
||||
if peer_data.flags == State::Banned {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
/// Bans a peer, disconnecting it if we're currently connected
|
||||
pub fn ban_peer(&self, peer_addr: &SocketAddr) {
|
||||
if let Err(e) = self.update_state(peer_addr.clone(), State::Banned) {
|
||||
error!(LOGGER, "Couldn't ban {}: {:?}", peer_addr, e);
|
||||
}
|
||||
|
||||
if let Some(peer) = self.get_peer(peer_addr) {
|
||||
debug!(LOGGER, "Banning peer {}", peer_addr);
|
||||
// setting peer status will get it removed at the next clean_peer
|
||||
let peer = peer.write().unwrap();
|
||||
peer.set_banned();
|
||||
peer.stop();
|
||||
}
|
||||
}
|
||||
|
||||
/// Broadcasts the provided block to all our peers. A peer implementation
|
||||
/// may drop the broadcast request if it knows the remote peer already has
|
||||
/// the block.
|
||||
pub fn broadcast_block(&self, b: &core::Block) {
|
||||
let peers = self.connected_peers();
|
||||
let mut count = 0;
|
||||
for p in peers {
|
||||
let p = p.read().unwrap();
|
||||
if p.is_connected() {
|
||||
if let Err(e) = p.send_block(b) {
|
||||
debug!(LOGGER, "Error sending block to peer: {:?}", e);
|
||||
} else {
|
||||
count += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
debug!(LOGGER, "Broadcasted block {} to {} peers.", b.header.height, count);
|
||||
}
|
||||
|
||||
/// Broadcasts the provided transaction to all our peers. A peer
|
||||
/// implementation may drop the broadcast request if it knows the
|
||||
/// remote peer already has the transaction.
|
||||
pub fn broadcast_transaction(&self, tx: &core::Transaction) {
|
||||
let peers = self.connected_peers();
|
||||
for p in peers {
|
||||
let p = p.read().unwrap();
|
||||
if p.is_connected() {
|
||||
if let Err(e) = p.send_transaction(tx) {
|
||||
debug!(LOGGER, "Error sending block to peer: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Ping all our connected peers. Always automatically expects a pong back or
|
||||
/// disconnects. This acts as a liveness test.
|
||||
pub fn check_all(&self, total_difficulty: Difficulty, height: u64) {
|
||||
let peers_map = self.peers.read().unwrap();
|
||||
for p in peers_map.values() {
|
||||
let p = p.read().unwrap();
|
||||
if p.is_connected() {
|
||||
let _ = p.send_ping(total_difficulty.clone(), height);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// All peer information we have in storage
|
||||
pub fn all_peers(&self) -> Vec<PeerData> {
|
||||
self.store.all_peers()
|
||||
}
|
||||
|
||||
/// Find peers in store (not necessarily connected) and return their data
|
||||
pub fn find_peers(&self, state: State, cap: Capabilities, count: usize) -> Vec<PeerData> {
|
||||
self.store.find_peers(state, cap, count)
|
||||
}
|
||||
|
||||
/// Whether we've already seen a peer with the provided address
|
||||
pub fn exists_peer(&self, peer_addr: SocketAddr) -> Result<bool, Error> {
|
||||
self.store.exists_peer(peer_addr).map_err(From::from)
|
||||
}
|
||||
|
||||
/// Saves updated information about a peer
|
||||
pub fn save_peer(&self, p: &PeerData) -> Result<(), Error> {
|
||||
self.store.save_peer(p).map_err(From::from)
|
||||
}
|
||||
|
||||
/// Updates the state of a peer in store
|
||||
pub fn update_state(&self, peer_addr: SocketAddr, new_state: State) -> Result<(), Error> {
|
||||
self.store.update_state(peer_addr, new_state).map_err(From::from)
|
||||
}
|
||||
|
||||
/// Iterate over the peer list and prune all peers we have
|
||||
/// lost connection to or have been deemed problematic.
|
||||
/// Also avoid connected peer count getting too high.
|
||||
pub fn clean_peers(&self, desired_count: usize) {
|
||||
let mut rm = vec![];
|
||||
|
||||
// build a list of peers to be cleaned up
|
||||
for peer in self.connected_peers() {
|
||||
let peer_inner = peer.read().unwrap();
|
||||
if peer_inner.is_banned() {
|
||||
debug!(LOGGER, "cleaning {:?}, peer banned", peer_inner.info.addr);
|
||||
rm.push(peer.clone());
|
||||
} else if !peer_inner.is_connected() {
|
||||
debug!(LOGGER, "cleaning {:?}, not connected", peer_inner.info.addr);
|
||||
rm.push(peer.clone());
|
||||
}
|
||||
}
|
||||
|
||||
// now clean up peer map based on the list to remove
|
||||
{
|
||||
let mut peers = self.peers.write().unwrap();
|
||||
for p in rm.clone() {
|
||||
let p = p.read().unwrap();
|
||||
peers.remove(&p.info.addr);
|
||||
}
|
||||
}
|
||||
|
||||
// ensure we do not have too many connected peers
|
||||
// really fighting with the double layer of rwlocks here...
|
||||
let excess_count = {
|
||||
let peer_count = self.peer_count().clone() as usize;
|
||||
if peer_count > desired_count {
|
||||
peer_count - desired_count
|
||||
} else {
|
||||
0
|
||||
}
|
||||
};
|
||||
|
||||
// map peers to addrs in a block to bound how long we keep the read lock for
|
||||
let addrs = {
|
||||
self.connected_peers().iter().map(|x| {
|
||||
let p = x.read().unwrap();
|
||||
p.info.addr.clone()
|
||||
}).collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
// now remove them taking a short-lived write lock each time
|
||||
// maybe better to take write lock once and remove them all?
|
||||
for x in addrs
|
||||
.iter()
|
||||
.take(excess_count) {
|
||||
let mut peers = self.peers.write().unwrap();
|
||||
peers.remove(x);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn stop(self) {
|
||||
let peers = self.connected_peers();
|
||||
for peer in peers {
|
||||
let peer = peer.read().unwrap();
|
||||
peer.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ChainAdapter for Peers {
|
||||
fn total_difficulty(&self) -> Difficulty {
|
||||
self.adapter.total_difficulty()
|
||||
}
|
||||
fn total_height(&self) -> u64 {
|
||||
self.adapter.total_height()
|
||||
}
|
||||
fn transaction_received(&self, tx: core::Transaction) {
|
||||
self.adapter.transaction_received(tx)
|
||||
}
|
||||
fn block_received(&self, b: core::Block, peer_addr: SocketAddr) -> bool {
|
||||
if !self.adapter.block_received(b, peer_addr) {
|
||||
// if the peer sent us a block that's intrinsically bad, they're either
|
||||
// mistaken or manevolent, both of which require a ban
|
||||
self.ban_peer(&peer_addr);
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
}
|
||||
fn headers_received(&self, headers: Vec<core::BlockHeader>, peer_addr:SocketAddr) {
|
||||
self.adapter.headers_received(headers, peer_addr)
|
||||
}
|
||||
fn locate_headers(&self, hs: Vec<Hash>) -> Vec<core::BlockHeader> {
|
||||
self.adapter.locate_headers(hs)
|
||||
}
|
||||
fn get_block(&self, h: Hash) -> Option<core::Block> {
|
||||
self.adapter.get_block(h)
|
||||
}
|
||||
}
|
||||
|
||||
impl NetAdapter for Peers {
|
||||
/// Find good peers we know with the provided capability and return their
|
||||
/// addresses.
|
||||
fn find_peer_addrs(&self, capab: Capabilities) -> Vec<SocketAddr> {
|
||||
let peers = self.find_peers(State::Healthy, capab, MAX_PEER_ADDRS as usize);
|
||||
debug!(LOGGER, "Got {} peer addrs to send.", peers.len());
|
||||
map_vec!(peers, |p| p.addr)
|
||||
}
|
||||
|
||||
/// A list of peers has been received from one of our peers.
|
||||
fn peer_addrs_received(&self, peer_addrs: Vec<SocketAddr>) {
|
||||
debug!(LOGGER, "Received {} peer addrs, saving.", peer_addrs.len());
|
||||
for pa in peer_addrs {
|
||||
if let Ok(e) = self.exists_peer(pa) {
|
||||
if e {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
let peer = PeerData {
|
||||
addr: pa,
|
||||
capabilities: UNKNOWN,
|
||||
user_agent: "".to_string(),
|
||||
flags: State::Healthy,
|
||||
};
|
||||
if let Err(e) = self.save_peer(&peer) {
|
||||
error!(LOGGER, "Could not save received peer address: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn peer_difficulty(&self, addr: SocketAddr, diff: Difficulty, height: u64) {
|
||||
debug!(
|
||||
LOGGER,
|
||||
"peer total_diff @ height (ping/pong): {}: {} @ {} \
|
||||
vs us: {} @ {}",
|
||||
addr,
|
||||
diff,
|
||||
height,
|
||||
self.total_difficulty(),
|
||||
self.total_height()
|
||||
);
|
||||
|
||||
if diff.into_num() > 0 {
|
||||
if let Some(peer) = self.get_peer(&addr) {
|
||||
let mut peer = peer.write().unwrap();
|
||||
peer.info.total_difficulty = diff;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -16,7 +16,6 @@
|
|||
//! other peers in the network.
|
||||
|
||||
use std::cell::RefCell;
|
||||
use std::collections::HashMap;
|
||||
use std::net::{SocketAddr, Shutdown};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::time::Duration;
|
||||
|
@ -24,7 +23,6 @@ use std::time::Duration;
|
|||
use futures;
|
||||
use futures::{Future, Stream};
|
||||
use futures::future::{self, IntoFuture};
|
||||
use rand::{thread_rng, Rng};
|
||||
use tokio_core::net::{TcpListener, TcpStream};
|
||||
use tokio_core::reactor;
|
||||
use tokio_timer::Timer;
|
||||
|
@ -34,13 +32,14 @@ use core::core::hash::Hash;
|
|||
use core::core::target::Difficulty;
|
||||
use handshake::Handshake;
|
||||
use peer::Peer;
|
||||
use store::{PeerStore, PeerData, State};
|
||||
use peers::Peers;
|
||||
use store::PeerStore;
|
||||
use types::*;
|
||||
use util::LOGGER;
|
||||
|
||||
/// A no-op network adapter used for testing.
|
||||
pub struct DummyAdapter {}
|
||||
impl NetAdapter for DummyAdapter {
|
||||
impl ChainAdapter for DummyAdapter {
|
||||
fn total_difficulty(&self) -> Difficulty {
|
||||
Difficulty::one()
|
||||
}
|
||||
|
@ -48,7 +47,7 @@ impl NetAdapter for DummyAdapter {
|
|||
0
|
||||
}
|
||||
fn transaction_received(&self, _: core::Transaction) {}
|
||||
fn block_received(&self, _: core::Block, _: SocketAddr) {}
|
||||
fn block_received(&self, _: core::Block, _: SocketAddr) -> bool { true }
|
||||
fn headers_received(&self, _: Vec<core::BlockHeader>, _:SocketAddr) {}
|
||||
fn locate_headers(&self, _: Vec<Hash>) -> Vec<core::BlockHeader> {
|
||||
vec![]
|
||||
|
@ -56,11 +55,12 @@ impl NetAdapter for DummyAdapter {
|
|||
fn get_block(&self, _: Hash) -> Option<core::Block> {
|
||||
None
|
||||
}
|
||||
}
|
||||
impl NetAdapter for DummyAdapter {
|
||||
fn find_peer_addrs(&self, _: Capabilities) -> Vec<SocketAddr> {
|
||||
vec![]
|
||||
}
|
||||
fn peer_addrs_received(&self, _: Vec<SocketAddr>) {}
|
||||
fn peer_connected(&self, _: &PeerInfo) {}
|
||||
fn peer_difficulty(&self, _: SocketAddr, _: Difficulty, _:u64) {}
|
||||
}
|
||||
|
||||
|
@ -69,10 +69,8 @@ impl NetAdapter for DummyAdapter {
|
|||
pub struct Server {
|
||||
config: P2PConfig,
|
||||
capabilities: Capabilities,
|
||||
store: Arc<PeerStore>,
|
||||
peers: Arc<RwLock<HashMap<SocketAddr, Arc<RwLock<Peer>>>>>,
|
||||
handshake: Arc<Handshake>,
|
||||
adapter: Arc<NetAdapter>,
|
||||
pub peers: Peers,
|
||||
stop: RefCell<Option<futures::sync::oneshot::Sender<()>>>,
|
||||
}
|
||||
|
||||
|
@ -86,16 +84,13 @@ impl Server {
|
|||
db_root: String,
|
||||
capab: Capabilities,
|
||||
config: P2PConfig,
|
||||
adapter: Arc<NetAdapter>,
|
||||
genesis: Hash,
|
||||
adapter: Arc<ChainAdapter>,
|
||||
) -> Result<Server, Error> {
|
||||
Ok(Server {
|
||||
config: config,
|
||||
capabilities: capab,
|
||||
store: Arc::new(PeerStore::new(db_root)?),
|
||||
peers: Arc::new(RwLock::new(HashMap::new())),
|
||||
handshake: Arc::new(Handshake::new(genesis)),
|
||||
adapter: adapter,
|
||||
handshake: Arc::new(Handshake::new()),
|
||||
peers: Peers::new(PeerStore::new(db_root)?, adapter),
|
||||
stop: RefCell::new(None),
|
||||
})
|
||||
}
|
||||
|
@ -109,39 +104,33 @@ impl Server {
|
|||
|
||||
let handshake = self.handshake.clone();
|
||||
let peers = self.peers.clone();
|
||||
let adapter = self.adapter.clone();
|
||||
let capab = self.capabilities.clone();
|
||||
let store = self.store.clone();
|
||||
|
||||
// main peer acceptance future handling handshake
|
||||
let hp = h.clone();
|
||||
let peers_listen = socket.incoming().map_err(From::from).map(move |(conn, _)| {
|
||||
|
||||
// aaaand.. reclone for the internal closures
|
||||
let adapter = adapter.clone();
|
||||
let store = store.clone();
|
||||
let peers = peers.clone();
|
||||
let peers2 = peers.clone();
|
||||
let handshake = handshake.clone();
|
||||
let hp = hp.clone();
|
||||
|
||||
future::ok(conn).and_then(move |conn| {
|
||||
// Refuse banned peers connection
|
||||
if let Ok(peer_addr) = conn.peer_addr() {
|
||||
if let Ok(peer_data) = store.get_peer(peer_addr) {
|
||||
if peer_data.flags == State::Banned {
|
||||
debug!(LOGGER, "Peer {} banned, refusing connection.", peer_addr);
|
||||
if let Err(e) = conn.shutdown(Shutdown::Both) {
|
||||
debug!(LOGGER, "Error shutting down conn: {:?}", e);
|
||||
}
|
||||
return Err(Error::Banned)
|
||||
if peers.is_banned(peer_addr) {
|
||||
debug!(LOGGER, "Peer {} banned, refusing connection.", peer_addr);
|
||||
if let Err(e) = conn.shutdown(Shutdown::Both) {
|
||||
debug!(LOGGER, "Error shutting down conn: {:?}", e);
|
||||
}
|
||||
return Err(Error::Banned)
|
||||
}
|
||||
}
|
||||
Ok(conn)
|
||||
}).and_then(move |conn| {
|
||||
|
||||
let peers = peers.clone();
|
||||
let total_diff = adapter.total_difficulty();
|
||||
let total_diff = peers2.total_difficulty();
|
||||
|
||||
// accept the peer and add it to the server map
|
||||
let accept = Peer::accept(
|
||||
|
@ -149,9 +138,9 @@ impl Server {
|
|||
capab,
|
||||
total_diff,
|
||||
&handshake.clone(),
|
||||
adapter.clone(),
|
||||
Arc::new(peers2.clone()),
|
||||
);
|
||||
let added = add_to_peers(peers, adapter.clone(), accept);
|
||||
let added = add_to_peers(peers2, accept);
|
||||
|
||||
// wire in a future to timeout the accept after 5 secs
|
||||
let timed_peer = with_timeout(Box::new(added), &hp);
|
||||
|
@ -186,14 +175,13 @@ impl Server {
|
|||
|
||||
|
||||
// timer to regularly check on our peers by pinging them
|
||||
let adapter = self.adapter.clone();
|
||||
let peers_inner = self.peers.clone();
|
||||
let peers_timer = Timer::default()
|
||||
.interval(Duration::new(20, 0))
|
||||
.fold((), move |_, _| {
|
||||
let total_diff = adapter.total_difficulty();
|
||||
let total_height = adapter.total_height();
|
||||
check_peers(peers_inner.clone(), total_diff, total_height);
|
||||
let total_diff = peers_inner.total_difficulty();
|
||||
let total_height = peers_inner.total_height();
|
||||
peers_inner.check_all(total_diff, total_height);
|
||||
Ok(())
|
||||
});
|
||||
|
||||
|
@ -218,7 +206,8 @@ impl Server {
|
|||
addr: SocketAddr,
|
||||
h: reactor::Handle,
|
||||
) -> Box<Future<Item = Option<Arc<RwLock<Peer>>>, Error = Error>> {
|
||||
if let Some(p) = self.get_peer(&addr) {
|
||||
|
||||
if let Some(p) = self.peers.get_peer(&addr) {
|
||||
// if we're already connected to the addr, just return the peer
|
||||
debug!(LOGGER, "connect_peer: already connected {}", addr);
|
||||
return Box::new(future::ok(Some(p)));
|
||||
|
@ -229,7 +218,6 @@ impl Server {
|
|||
// cloneapalooza
|
||||
let peers = self.peers.clone();
|
||||
let handshake = self.handshake.clone();
|
||||
let adapter = self.adapter.clone();
|
||||
let capab = self.capabilities.clone();
|
||||
let self_addr = SocketAddr::new(self.config.host, self.config.port);
|
||||
|
||||
|
@ -245,8 +233,7 @@ impl Server {
|
|||
let h2 = h.clone();
|
||||
let request = socket_connect
|
||||
.and_then(move |socket| {
|
||||
let peers = peers.clone();
|
||||
let total_diff = adapter.clone().total_difficulty();
|
||||
let total_diff = peers.total_difficulty();
|
||||
|
||||
// connect to the peer and add it to the server map, wiring it a timeout for
|
||||
// the handshake
|
||||
|
@ -256,9 +243,9 @@ impl Server {
|
|||
total_diff,
|
||||
self_addr,
|
||||
handshake.clone(),
|
||||
adapter.clone(),
|
||||
Arc::new(peers.clone()),
|
||||
);
|
||||
let added = add_to_peers(peers, adapter, connect);
|
||||
let added = add_to_peers(peers, connect);
|
||||
with_timeout(Box::new(added), &h)
|
||||
})
|
||||
.and_then(move |(socket, peer)| {
|
||||
|
@ -272,256 +259,26 @@ impl Server {
|
|||
Box::new(request)
|
||||
}
|
||||
|
||||
/// Check if the server already knows this peer (is already connected).
|
||||
pub fn is_known(&self, addr: &SocketAddr) -> bool {
|
||||
self.get_peer(addr).is_some()
|
||||
}
|
||||
|
||||
pub fn connected_peers(&self) -> Vec<Arc<RwLock<Peer>>> {
|
||||
self.peers.read().unwrap().values().map(|p| p.clone()).collect()
|
||||
}
|
||||
|
||||
/// Get a peer we're connected to by address.
|
||||
pub fn get_peer(&self, addr: &SocketAddr) -> Option<Arc<RwLock<Peer>>> {
|
||||
self.peers.read().unwrap().get(addr).map(|p| p.clone())
|
||||
}
|
||||
|
||||
/// Have the server iterate over its peer list and prune all peers we have
|
||||
/// lost connection to or have been deemed problematic.
|
||||
/// Also avoid connected peer count getting too high.
|
||||
pub fn clean_peers(&self, desired_count: usize) {
|
||||
let mut rm = vec![];
|
||||
|
||||
// build a list of peers to be cleaned up
|
||||
for peer in self.connected_peers() {
|
||||
let peer_inner = peer.read().unwrap();
|
||||
if peer_inner.is_banned() {
|
||||
debug!(LOGGER, "cleaning {:?}, peer banned", peer_inner.info.addr);
|
||||
rm.push(peer.clone());
|
||||
} else if !peer_inner.is_connected() {
|
||||
debug!(LOGGER, "cleaning {:?}, not connected", peer_inner.info.addr);
|
||||
rm.push(peer.clone());
|
||||
}
|
||||
}
|
||||
|
||||
// now clean up peer map based on the list to remove
|
||||
{
|
||||
let mut peers = self.peers.write().unwrap();
|
||||
for p in rm.clone() {
|
||||
let p = p.read().unwrap();
|
||||
peers.remove(&p.info.addr);
|
||||
}
|
||||
}
|
||||
|
||||
// ensure we do not have too many connected peers
|
||||
// really fighting with the double layer of rwlocks here...
|
||||
let excess_count = {
|
||||
let peer_count = self.peer_count().clone() as usize;
|
||||
if peer_count > desired_count {
|
||||
peer_count - desired_count
|
||||
} else {
|
||||
0
|
||||
}
|
||||
};
|
||||
|
||||
// map peers to addrs in a block to bound how long we keep the read lock for
|
||||
let addrs = {
|
||||
self.connected_peers().iter().map(|x| {
|
||||
let p = x.read().unwrap();
|
||||
p.info.addr.clone()
|
||||
}).collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
// now remove them taking a short-lived write lock each time
|
||||
// maybe better to take write lock once and remove them all?
|
||||
for x in addrs
|
||||
.iter()
|
||||
.take(excess_count) {
|
||||
let mut peers = self.peers.write().unwrap();
|
||||
peers.remove(x);
|
||||
}
|
||||
}
|
||||
|
||||
/// Return vec of all peers that currently have the most worked branch,
|
||||
/// showing the highest total difficulty.
|
||||
pub fn most_work_peers(&self) -> Vec<Arc<RwLock<Peer>>> {
|
||||
let peers = self.connected_peers();
|
||||
if peers.len() == 0 {
|
||||
return vec![];
|
||||
}
|
||||
|
||||
let max_total_difficulty = peers
|
||||
.iter()
|
||||
.map(|x| {
|
||||
match x.try_read() {
|
||||
Ok(peer) => peer.info.total_difficulty.clone(),
|
||||
Err(_) => Difficulty::zero(),
|
||||
}
|
||||
})
|
||||
.max()
|
||||
.unwrap();
|
||||
|
||||
let mut max_peers = peers
|
||||
.iter()
|
||||
.filter(|x| {
|
||||
match x.try_read() {
|
||||
Ok(peer) => {
|
||||
peer.info.total_difficulty == max_total_difficulty
|
||||
},
|
||||
Err(_) => false,
|
||||
}
|
||||
})
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
thread_rng().shuffle(&mut max_peers);
|
||||
max_peers
|
||||
}
|
||||
|
||||
/// Returns single random peer with the most worked branch, showing the highest total
|
||||
/// difficulty.
|
||||
pub fn most_work_peer(&self) -> Option<Arc<RwLock<Peer>>> {
|
||||
match self.most_work_peers().first() {
|
||||
Some(x) => Some(x.clone()),
|
||||
None => None
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a random connected peer.
|
||||
pub fn random_peer(&self) -> Option<Arc<RwLock<Peer>>> {
|
||||
let peers = self.connected_peers();
|
||||
Some(thread_rng().choose(&peers).unwrap().clone())
|
||||
}
|
||||
|
||||
/// Broadcasts the provided block to all our peers. A peer implementation
|
||||
/// may drop the broadcast request if it knows the remote peer already has
|
||||
/// the block.
|
||||
pub fn broadcast_block(&self, b: &core::Block) {
|
||||
let peers = self.connected_peers();
|
||||
let mut count = 0;
|
||||
for p in peers {
|
||||
let p = p.read().unwrap();
|
||||
if p.is_connected() {
|
||||
if let Err(e) = p.send_block(b) {
|
||||
debug!(LOGGER, "Error sending block to peer: {:?}", e);
|
||||
} else {
|
||||
count += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
debug!(LOGGER, "Broadcasted block {} to {} peers.", b.header.height, count);
|
||||
}
|
||||
|
||||
/// Broadcasts the provided transaction to all our peers. A peer
|
||||
/// implementation may drop the broadcast request if it knows the
|
||||
/// remote peer already has the transaction.
|
||||
pub fn broadcast_transaction(&self, tx: &core::Transaction) {
|
||||
let peers = self.connected_peers();
|
||||
for p in peers {
|
||||
let p = p.read().unwrap();
|
||||
if p.is_connected() {
|
||||
if let Err(e) = p.send_transaction(tx) {
|
||||
debug!(LOGGER, "Error sending block to peer: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Number of peers we're currently connected to.
|
||||
pub fn peer_count(&self) -> u32 {
|
||||
self.connected_peers().len() as u32
|
||||
}
|
||||
|
||||
/// Bans a peer, disconnecting it if we're currently connected
|
||||
pub fn ban_peer(&self, peer_addr: &SocketAddr) {
|
||||
if let Err(e) = self.update_state(peer_addr.clone(), State::Banned) {
|
||||
error!(LOGGER, "Couldn't ban {}: {:?}", peer_addr, e);
|
||||
}
|
||||
|
||||
if let Some(peer) = self.get_peer(peer_addr) {
|
||||
debug!(LOGGER, "Banning peer {}", peer_addr);
|
||||
// setting peer status will get it removed at the next clean_peer
|
||||
let peer = peer.write().unwrap();
|
||||
peer.set_banned();
|
||||
peer.stop();
|
||||
}
|
||||
}
|
||||
|
||||
/// Stops the server. Disconnect from all peers at the same time.
|
||||
pub fn stop(self) {
|
||||
info!(LOGGER, "calling stop on server");
|
||||
let peers = self.connected_peers();
|
||||
for peer in peers {
|
||||
let peer = peer.read().unwrap();
|
||||
peer.stop();
|
||||
}
|
||||
self.peers.stop();
|
||||
self.stop.into_inner().unwrap().send(()).unwrap();
|
||||
}
|
||||
|
||||
/// All peer information we have in storage
|
||||
pub fn all_peers(&self) -> Vec<PeerData> {
|
||||
self.store.all_peers()
|
||||
}
|
||||
|
||||
/// Find peers in store (not necessarily connected) and return their data
|
||||
pub fn find_peers(&self, state: State, cap: Capabilities, count: usize) -> Vec<PeerData> {
|
||||
self.store.find_peers(state, cap, count)
|
||||
}
|
||||
|
||||
/// Whether we've already seen a peer with the provided address
|
||||
pub fn exists_peer(&self, peer_addr: SocketAddr) -> Result<bool, Error> {
|
||||
self.store.exists_peer(peer_addr).map_err(From::from)
|
||||
}
|
||||
|
||||
/// Saves updated information about a peer
|
||||
pub fn save_peer(&self, p: &PeerData) -> Result<(), Error> {
|
||||
self.store.save_peer(p).map_err(From::from)
|
||||
}
|
||||
|
||||
/// Updates the state of a peer in store
|
||||
pub fn update_state(&self, peer_addr: SocketAddr, new_state: State) -> Result<(), Error> {
|
||||
self.store.update_state(peer_addr, new_state).map_err(From::from)
|
||||
}
|
||||
}
|
||||
|
||||
// Adds the peer built by the provided future in the peers map
|
||||
fn add_to_peers<A>(
|
||||
peers: Arc<RwLock<HashMap<SocketAddr, Arc<RwLock<Peer>>>>>,
|
||||
adapter: Arc<NetAdapter>,
|
||||
peer_fut: A,
|
||||
) -> Box<Future<Item = Result<(TcpStream, Arc<RwLock<Peer>>), ()>, Error = Error>>
|
||||
where
|
||||
A: IntoFuture<Item = (TcpStream, Peer), Error = Error> + 'static,
|
||||
{
|
||||
fn add_to_peers<A>(peers: Peers, peer_fut: A)
|
||||
-> Box<Future<Item = Result<(TcpStream, Arc<RwLock<Peer>>), ()>, Error = Error>>
|
||||
where A: IntoFuture<Item = (TcpStream, Peer), Error = Error> + 'static {
|
||||
|
||||
let peer_add = peer_fut.into_future().map(move |(conn, peer)| {
|
||||
adapter.peer_connected(&peer.info);
|
||||
let addr = peer.info.addr.clone();
|
||||
let apeer = Arc::new(RwLock::new(peer));
|
||||
{
|
||||
let mut peers = peers.write().unwrap();
|
||||
peers.insert(addr, apeer.clone());
|
||||
}
|
||||
let apeer = peers.add_connected(peer);
|
||||
Ok((conn, apeer))
|
||||
});
|
||||
Box::new(peer_add)
|
||||
}
|
||||
|
||||
// Ping all our connected peers. Always automatically expects a pong back or
|
||||
// disconnects. This acts as a liveness test.
|
||||
fn check_peers(
|
||||
peers: Arc<RwLock<HashMap<SocketAddr, Arc<RwLock<Peer>>>>>,
|
||||
total_difficulty: Difficulty,
|
||||
height: u64
|
||||
) {
|
||||
let peers_map = peers.read().unwrap();
|
||||
for p in peers_map.values() {
|
||||
let p = p.read().unwrap();
|
||||
if p.is_connected() {
|
||||
let _ = p.send_ping(total_difficulty.clone(), height);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Adds a timeout to a future
|
||||
fn with_timeout<T: 'static>(
|
||||
fut: Box<Future<Item = Result<T, ()>, Error = Error>>,
|
||||
|
|
|
@ -162,7 +162,7 @@ pub trait Protocol {
|
|||
/// Bridge between the networking layer and the rest of the system. Handles the
|
||||
/// forwarding or querying of blocks and transactions from the network among
|
||||
/// other things.
|
||||
pub trait NetAdapter: Sync + Send {
|
||||
pub trait ChainAdapter: Sync + Send {
|
||||
/// Current total difficulty on our chain
|
||||
fn total_difficulty(&self) -> Difficulty;
|
||||
|
||||
|
@ -172,8 +172,11 @@ pub trait NetAdapter: Sync + Send {
|
|||
/// A valid transaction has been received from one of our peers
|
||||
fn transaction_received(&self, tx: core::Transaction);
|
||||
|
||||
/// A block has been received from one of our peers
|
||||
fn block_received(&self, b: core::Block, addr: SocketAddr);
|
||||
/// A block has been received from one of our peers. Returns true if the
|
||||
/// block could be handled properly and is not deemed defective by the
|
||||
/// chain. Returning false means the block will nenver be valid and
|
||||
/// may result in the peer being banned.
|
||||
fn block_received(&self, b: core::Block, addr: SocketAddr) -> bool;
|
||||
|
||||
/// A set of block header has been received, typically in response to a
|
||||
/// block
|
||||
|
@ -187,7 +190,11 @@ pub trait NetAdapter: Sync + Send {
|
|||
|
||||
/// Gets a full block by its hash.
|
||||
fn get_block(&self, h: Hash) -> Option<core::Block>;
|
||||
}
|
||||
|
||||
/// Additional methods required by the protocol that don't need to be
|
||||
/// externally implemented.
|
||||
pub trait NetAdapter: ChainAdapter {
|
||||
/// Find good peers we know with the provided capability and return their
|
||||
/// addresses.
|
||||
fn find_peer_addrs(&self, capab: Capabilities) -> Vec<SocketAddr>;
|
||||
|
@ -195,9 +202,6 @@ pub trait NetAdapter: Sync + Send {
|
|||
/// A list of peers has been received from one of our peers.
|
||||
fn peer_addrs_received(&self, Vec<SocketAddr>);
|
||||
|
||||
/// Network successfully connected to a peer.
|
||||
fn peer_connected(&self, &PeerInfo);
|
||||
|
||||
/// Heard total_difficulty from a connected peer (via ping/pong).
|
||||
fn peer_difficulty(&self, SocketAddr, Difficulty, u64);
|
||||
}
|
||||
|
|
|
@ -84,7 +84,7 @@ fn peer_handshake() {
|
|||
Ok(())
|
||||
})
|
||||
.and_then(|_| {
|
||||
assert!(server.peer_count() > 0);
|
||||
assert!(server.peers.peer_count() > 0);
|
||||
server.stop();
|
||||
Ok(())
|
||||
})
|
||||
|
|
|
@ -40,7 +40,6 @@ use clap::{App, Arg, ArgMatches, SubCommand};
|
|||
use daemonize::Daemonize;
|
||||
|
||||
use config::GlobalConfig;
|
||||
use wallet::WalletConfig;
|
||||
use core::global;
|
||||
use core::core::amount_to_hr_string;
|
||||
use util::{init_logger, LoggingConfig, LOGGER};
|
||||
|
|
Loading…
Reference in a new issue