Merge pull request #481 from sesam/more-cherries

Cherry picks + Ext. peers handling (#468) + compare genesis during handshake (#327)
This commit is contained in:
Ignotus Peverell 2017-12-14 03:29:48 +00:00 committed by GitHub
commit 360e311041
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 761 additions and 635 deletions

View file

@ -231,24 +231,24 @@ impl Handler for SumTreeHandler {
}
pub struct PeersAllHandler {
pub p2p_server: Arc<p2p::Server>,
pub peers: p2p::Peers,
}
impl Handler for PeersAllHandler {
fn handle(&self, _req: &mut Request) -> IronResult<Response> {
let peers = &self.p2p_server.all_peers();
let peers = &self.peers.all_peers();
json_response_pretty(&peers)
}
}
pub struct PeersConnectedHandler {
pub p2p_server: Arc<p2p::Server>,
pub peers: p2p::Peers,
}
impl Handler for PeersConnectedHandler {
fn handle(&self, _req: &mut Request) -> IronResult<Response> {
let mut peers = vec![];
for p in &self.p2p_server.connected_peers() {
for p in &self.peers.connected_peers() {
let p = p.read().unwrap();
let peer_info = p.info.clone();
peers.push(peer_info);
@ -262,7 +262,7 @@ impl Handler for PeersConnectedHandler {
/// POST /v1/peers/10.12.12.13/ban
/// TODO POST /v1/peers/10.12.12.13/unban
pub struct PeerHandler {
pub p2p_server: Arc<p2p::Server>,
pub peers: p2p::Peers,
}
impl Handler for PeerHandler {
@ -276,7 +276,7 @@ impl Handler for PeerHandler {
"ban" => {
path_elems.pop();
if let Ok(addr) = path_elems.last().unwrap().parse() {
self.p2p_server.ban_peer(&addr);
self.peers.ban_peer(&addr);
Ok(Response::with((status::Ok, "")))
} else {
Ok(Response::with((status::BadRequest, "")))
@ -459,74 +459,76 @@ pub fn start_rest_apis<T>(
addr: String,
chain: Arc<chain::Chain>,
tx_pool: Arc<RwLock<pool::TransactionPool<T>>>,
p2p_server: Arc<p2p::Server>,
peers: p2p::Peers,
) where
T: pool::BlockChain + Send + Sync + 'static,
{
thread::spawn(move || {
// build handlers and register them under the appropriate endpoint
let utxo_handler = UtxoHandler {
chain: chain.clone(),
};
let block_handler = BlockHandler {
chain: chain.clone(),
};
let chain_tip_handler = ChainHandler {
chain: chain.clone(),
};
let sumtree_handler = SumTreeHandler {
chain: chain.clone(),
};
let pool_info_handler = PoolInfoHandler {
tx_pool: tx_pool.clone(),
};
let pool_push_handler = PoolPushHandler {
tx_pool: tx_pool.clone(),
};
let peers_all_handler = PeersAllHandler {
p2p_server: p2p_server.clone(),
};
let peers_connected_handler = PeersConnectedHandler {
p2p_server: p2p_server.clone(),
};
let peer_handler = PeerHandler {
p2p_server: p2p_server.clone(),
};
let _ = thread::Builder::new()
.name("apis".to_string())
.spawn(move || {
// build handlers and register them under the appropriate endpoint
let utxo_handler = UtxoHandler {
chain: chain.clone(),
};
let block_handler = BlockHandler {
chain: chain.clone(),
};
let chain_tip_handler = ChainHandler {
chain: chain.clone(),
};
let sumtree_handler = SumTreeHandler {
chain: chain.clone(),
};
let pool_info_handler = PoolInfoHandler {
tx_pool: tx_pool.clone(),
};
let pool_push_handler = PoolPushHandler {
tx_pool: tx_pool.clone(),
};
let peers_all_handler = PeersAllHandler {
peers: peers.clone(),
};
let peers_connected_handler = PeersConnectedHandler {
peers: peers.clone(),
};
let peer_handler = PeerHandler {
peers: peers.clone(),
};
let route_list = vec!(
"get /".to_string(),
"get /blocks".to_string(),
"get /chain".to_string(),
"get /chain/utxos".to_string(),
"get /sumtrees/roots".to_string(),
"get /sumtrees/lastutxos?n=10".to_string(),
"get /sumtrees/lastrangeproofs".to_string(),
"get /sumtrees/lastkernels".to_string(),
"get /pool".to_string(),
"post /pool/push".to_string(),
"get /peers/all".to_string(),
"get /peers/connected".to_string(),
);
let index_handler = IndexHandler { list: route_list };
let router = router!(
index: get "/" => index_handler,
blocks: get "/blocks/*" => block_handler,
chain_tip: get "/chain" => chain_tip_handler,
chain_utxos: get "/chain/utxos/*" => utxo_handler,
sumtree_roots: get "/sumtrees/*" => sumtree_handler,
pool_info: get "/pool" => pool_info_handler,
pool_push: post "/pool/push" => pool_push_handler,
peers_all: get "/peers/all" => peers_all_handler,
peers_connected: get "/peers/connected" => peers_connected_handler,
peer: post "/peers/*" => peer_handler,
);
let route_list = vec!(
"get /".to_string(),
"get /blocks".to_string(),
"get /chain".to_string(),
"get /chain/utxos".to_string(),
"get /sumtrees/roots".to_string(),
"get /sumtrees/lastutxos?n=10".to_string(),
"get /sumtrees/lastrangeproofs".to_string(),
"get /sumtrees/lastkernels".to_string(),
"get /pool".to_string(),
"post /pool/push".to_string(),
"get /peers/all".to_string(),
"get /peers/connected".to_string(),
);
let index_handler = IndexHandler { list: route_list };
let router = router!(
index: get "/" => index_handler,
blocks: get "/blocks/*" => block_handler,
chain_tip: get "/chain" => chain_tip_handler,
chain_utxos: get "/chain/utxos/*" => utxo_handler,
sumtree_roots: get "/sumtrees/*" => sumtree_handler,
pool_info: get "/pool" => pool_info_handler,
pool_push: post "/pool/push" => pool_push_handler,
peers_all: get "/peers/all" => peers_all_handler,
peers_connected: get "/peers/connected" => peers_connected_handler,
peer: post "/peers/*" => peer_handler,
);
let mut apis = ApiServer::new("/v1".to_string());
apis.register_handler(router);
let mut apis = ApiServer::new("/v1".to_string());
apis.register_handler(router);
info!(LOGGER, "Starting HTTP API server at {}.", addr);
apis.start(&addr[..]).unwrap_or_else(|e| {
error!(LOGGER, "Failed to start API HTTP server: {}.", e);
});
info!(LOGGER, "Starting HTTP API server at {}.", addr);
apis.start(&addr[..]).unwrap_or_else(|e| {
error!(LOGGER, "Failed to start API HTTP server: {}.", e);
});
});
}

View file

@ -239,9 +239,13 @@ impl<'a> Extension<'a> {
pub fn apply_block(&mut self, b: &Block) -> Result<(), Error> {
// doing inputs first guarantees an input can't spend an output in the
// same block, enforcing block cut-through
// same block, enforcing block cut-through
for input in &b.inputs {
let pos_res = self.commit_index.get_output_pos(&input.commitment());
if b.hash().to_string() == "f697a877" {
debug!(LOGGER, "input pos: {:?}, commit: {} {:?}",
pos_res, input.commitment().hash(), input.commitment());
}
if let Ok(pos) = pos_res {
match self.output_pmmr.prune(pos, b.header.height as u32) {
Ok(true) => {
@ -257,29 +261,22 @@ impl<'a> Extension<'a> {
}
}
// checking any position after the MMR size is useless, catches rewind
// edge cases
let output_max_index = self.output_pmmr.unpruned_size();
let kernel_max_index = self.kernel_pmmr.unpruned_size();
for out in &b.outputs {
let commit = out.commitment();
if let Ok(pos) = self.commit_index.get_output_pos(&commit) {
if pos <= output_max_index {
// we need to check whether the commitment is in the current MMR view
// as well as the index doesn't support rewind and is non-authoritative
// (non-historical node will have a much smaller one)
// note that this doesn't show the commitment *never* existed, just
// that this is not an existing unspent commitment right now
if let Some(c) = self.output_pmmr.get(pos) {
let hashsum = HashSum::from_summable(
pos, &SumCommit{commit}, Some(out.switch_commit_hash));
// as we're processing a new fork, we may get a position on the old
// fork that exists but matches a different node, filtering that
// case out
if c.hash == hashsum.hash {
return Err(Error::DuplicateCommitment(out.commitment()));
}
// we need to check whether the commitment is in the current MMR view
// as well as the index doesn't support rewind and is non-authoritative
// (non-historical node will have a much smaller one)
// note that this doesn't show the commitment *never* existed, just
// that this is not an existing unspent commitment right now
if let Some(c) = self.output_pmmr.get(pos) {
let hashsum = HashSum::from_summable(
pos, &SumCommit{commit}, Some(out.switch_commit_hash));
// as we're processing a new fork, we may get a position on the old
// fork that exists but matches a different node, filtering that
// case out
if c.hash == hashsum.hash {
return Err(Error::DuplicateCommitment(out.commitment()));
}
}
}
@ -303,14 +300,12 @@ impl<'a> Extension<'a> {
for kernel in &b.kernels {
if let Ok(pos) = self.commit_index.get_kernel_pos(&kernel.excess) {
if pos <= kernel_max_index {
// same as outputs
if let Some(k) = self.kernel_pmmr.get(pos) {
let hashsum = HashSum::from_summable(
pos, &NoSum(kernel), None::<RangeProof>);
if k.hash == hashsum.hash {
return Err(Error::DuplicateKernel(kernel.excess.clone()));
}
// same as outputs
if let Some(k) = self.kernel_pmmr.get(pos) {
let hashsum = HashSum::from_summable(
pos, &NoSum(kernel), None::<RangeProof>);
if k.hash == hashsum.hash {
return Err(Error::DuplicateKernel(kernel.excess.clone()));
}
}
}

View file

@ -347,7 +347,11 @@ where
/// Helper function to get the HashSum of a node at a given position from
/// the backend.
pub fn get(&self, position: u64) -> Option<HashSum<T>> {
self.backend.get(position)
if position > self.last_pos {
None
} else {
self.backend.get(position)
}
}
/// Helper function to get the last N nodes inserted, i.e. the last

View file

@ -53,6 +53,10 @@ Builds a transaction to send someone some coins. Creates and outputs a transacti
Replaced by `listen` (see above). The `receive` command might later be recycled to actively accept one or several specific transactions.
### grin wallet request
(tbd)
### grin wallet burn
*TESTING ONLY*: Burns the provided amount to a known key. Similar to send but burns an output to allow single-party

View file

@ -1,4 +1,4 @@
// Copyright 2016 The Grin Developers
// Copyright 2017 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -21,7 +21,7 @@ use core::core::{self, Output};
use core::core::block::BlockHeader;
use core::core::hash::{Hash, Hashed};
use core::core::target::Difficulty;
use p2p::{self, NetAdapter, PeerData, State};
use p2p;
use pool;
use util::secp::pedersen::Commitment;
use util::OneTime;
@ -32,13 +32,12 @@ use util::LOGGER;
/// blocks and transactions are received and forwards to the chain and pool
/// implementations.
pub struct NetToChainAdapter {
currently_syncing: Arc<AtomicBool>,
chain: Arc<chain::Chain>,
p2p_server: OneTime<Arc<p2p::Server>>,
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
syncing: AtomicBool,
}
impl NetAdapter for NetToChainAdapter {
impl p2p::ChainAdapter for NetToChainAdapter {
fn total_difficulty(&self) -> Difficulty {
self.chain.total_difficulty()
}
@ -65,7 +64,7 @@ impl NetAdapter for NetToChainAdapter {
}
}
fn block_received(&self, b: core::Block, addr: SocketAddr) {
fn block_received(&self, b: core::Block, _: SocketAddr) -> bool {
let bhash = b.hash();
debug!(
LOGGER,
@ -79,19 +78,16 @@ impl NetAdapter for NetToChainAdapter {
if let &Err(ref e) = &res {
debug!(LOGGER, "Block {} refused by chain: {:?}", bhash, e);
// if the peer sent us a block that's intrinsically bad, they're either
// mistaken or manevolent, both of which require a ban
if e.is_bad_block() {
self.p2p_server.borrow().ban_peer(&addr);
//
// // header chain should be consistent with the sync head here
// // we just banned the peer that sent a bad block so
// // sync head should resolve itself if/when we find an alternative peer
// // with more work than ourselves
// // we should not need to reset the header head here
// header chain should be consistent with the sync head here
// we just banned the peer that sent a bad block so
// sync head should resolve itself if/when we find an alternative peer
// with more work than ourselves
// we should not need to reset the header head here
return false;
}
}
true
}
fn headers_received(&self, bhs: Vec<core::BlockHeader>, addr: SocketAddr) {
@ -149,23 +145,9 @@ impl NetAdapter for NetToChainAdapter {
locator,
);
if locator.len() == 0 {
return vec![];
}
// recursively go back through the locator vector
// and stop when we find a header that we recognize
// this will be a header shared in common between us and the peer
let known = self.chain.get_block_header(&locator[0]);
let header = match known {
Ok(header) => header,
Err(chain::Error::StoreErr(store::Error::NotFoundErr, _)) => {
return self.locate_headers(locator[1..].to_vec());
}
Err(e) => {
error!(LOGGER, "Could not build header locator: {:?}", e);
return vec![];
}
let header = match self.find_common_header(locator) {
Some(header) => header,
None => return vec![],
};
debug!(
@ -207,138 +189,58 @@ impl NetAdapter for NetToChainAdapter {
}
}
/// Find good peers we know with the provided capability and return their
/// addresses.
fn find_peer_addrs(&self, capab: p2p::Capabilities) -> Vec<SocketAddr> {
let peers = self.p2p_server.borrow()
.find_peers(State::Healthy, capab, p2p::MAX_PEER_ADDRS as usize);
debug!(LOGGER, "Got {} peer addrs to send.", peers.len());
map_vec!(peers, |p| p.addr)
}
/// A list of peers has been received from one of our peers.
fn peer_addrs_received(&self, peer_addrs: Vec<SocketAddr>) {
debug!(LOGGER, "Received {} peer addrs, saving.", peer_addrs.len());
for pa in peer_addrs {
if let Ok(e) = self.p2p_server.borrow().exists_peer(pa) {
if e {
continue;
}
}
let peer = PeerData {
addr: pa,
capabilities: p2p::UNKNOWN,
user_agent: "".to_string(),
flags: State::Healthy,
};
if let Err(e) = self.p2p_server.borrow().save_peer(&peer) {
error!(LOGGER, "Could not save received peer address: {:?}", e);
}
}
}
/// Network successfully connected to a peer.
fn peer_connected(&self, pi: &p2p::PeerInfo) {
debug!(LOGGER, "Saving newly connected peer {}.", pi.addr);
let peer = PeerData {
addr: pi.addr,
capabilities: pi.capabilities,
user_agent: pi.user_agent.clone(),
flags: State::Healthy,
};
if let Err(e) = self.p2p_server.borrow().save_peer(&peer) {
error!(LOGGER, "Could not save connected peer: {:?}", e);
}
}
fn peer_difficulty(&self, addr: SocketAddr, diff: Difficulty, height: u64) {
debug!(
LOGGER,
"peer total_diff @ height (ping/pong): {}: {} @ {} \
vs us: {} @ {}",
addr,
diff,
height,
self.total_difficulty(),
self.total_height()
);
if self.p2p_server.is_initialized() {
if let Some(peer) = self.p2p_server.borrow().get_peer(&addr) {
let mut peer = peer.write().unwrap();
peer.info.total_difficulty = diff;
}
}
}
}
impl NetToChainAdapter {
pub fn new(
currently_syncing: Arc<AtomicBool>,
chain_ref: Arc<chain::Chain>,
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
) -> NetToChainAdapter {
NetToChainAdapter {
currently_syncing: currently_syncing,
chain: chain_ref,
p2p_server: OneTime::new(),
tx_pool: tx_pool,
syncing: AtomicBool::new(true),
}
}
/// Setup the p2p server on the adapter
pub fn init(&self, p2p: Arc<p2p::Server>) {
self.p2p_server.init(p2p);
}
// recursively go back through the locator vector and stop when we find
// a header that we recognize this will be a header shared in common
// between us and the peer
fn find_common_header(&self, locator: Vec<Hash>) -> Option<BlockHeader> {
if locator.len() == 0 {
return None;
}
/// Whether we're currently syncing the chain or we're fully caught up and
/// just receiving blocks through gossip.
pub fn is_syncing(&self) -> bool {
let local_diff = self.total_difficulty();
let peer = self.p2p_server.borrow().most_work_peer();
let known = self.chain.get_block_header(&locator[0]);
// if we're already syncing, we're caught up if no peer has a higher
// difficulty than us
if self.syncing.load(Ordering::Relaxed) {
if let Some(peer) = peer {
if let Ok(peer) = peer.try_read() {
if peer.info.total_difficulty <= local_diff {
info!(LOGGER, "sync: caught up on most worked chain, disabling sync");
self.syncing.store(false, Ordering::Relaxed);
}
}
} else {
info!(LOGGER, "sync: no peers available, disabling sync");
self.syncing.store(false, Ordering::Relaxed);
}
} else {
if let Some(peer) = peer {
if let Ok(peer) = peer.try_read() {
// sum the last 5 difficulties to give us the threshold
let threshold = self.chain
.difficulty_iter()
.filter_map(|x| x.map(|(_, x)| x).ok())
.take(5)
.fold(Difficulty::zero(), |sum, val| sum + val);
if peer.info.total_difficulty > local_diff.clone() + threshold.clone() {
info!(
LOGGER,
"sync: total_difficulty {}, peer_difficulty {}, threshold {} (last 5 blocks), enabling sync",
local_diff,
peer.info.total_difficulty,
threshold,
);
self.syncing.store(true, Ordering::Relaxed);
match known {
Ok(header) => {
// even if we know the block, it may not be on our winning chain
let known_winning = self.chain.get_header_by_height(header.height);
if let Ok(known_winning) = known_winning {
if known_winning.hash() != header.hash() {
self.find_common_header(locator[1..].to_vec())
} else {
Some(header)
}
} else {
self.find_common_header(locator[1..].to_vec())
}
},
Err(chain::Error::StoreErr(store::Error::NotFoundErr, _)) => {
self.find_common_header(locator[1..].to_vec())
},
Err(e) => {
error!(LOGGER, "Could not build header locator: {:?}", e);
None
}
}
self.syncing.load(Ordering::Relaxed)
}
/// Prepare options for the chain pipeline
fn chain_opts(&self) -> chain::Options {
let opts = if self.is_syncing() {
let opts = if self.currently_syncing.load(Ordering::Relaxed) {
chain::SYNC
} else {
chain::NONE
@ -352,7 +254,7 @@ impl NetToChainAdapter {
/// the network to broadcast the block
pub struct ChainToPoolAndNetAdapter {
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
p2p: OneTime<Arc<p2p::Server>>,
peers: OneTime<p2p::Peers>,
}
impl ChainAdapter for ChainToPoolAndNetAdapter {
@ -367,7 +269,7 @@ impl ChainAdapter for ChainToPoolAndNetAdapter {
);
}
}
self.p2p.borrow().broadcast_block(b);
self.peers.borrow().broadcast_block(b);
}
}
@ -377,23 +279,23 @@ impl ChainToPoolAndNetAdapter {
) -> ChainToPoolAndNetAdapter {
ChainToPoolAndNetAdapter {
tx_pool: tx_pool,
p2p: OneTime::new(),
peers: OneTime::new(),
}
}
pub fn init(&self, p2p: Arc<p2p::Server>) {
self.p2p.init(p2p);
pub fn init(&self, peers: p2p::Peers) {
self.peers.init(peers);
}
}
/// Adapter between the transaction pool and the network, to relay
/// transactions that have been accepted.
pub struct PoolToNetAdapter {
p2p: OneTime<Arc<p2p::Server>>,
peers: OneTime<p2p::Peers>,
}
impl pool::PoolAdapter for PoolToNetAdapter {
fn tx_accepted(&self, tx: &core::Transaction) {
self.p2p.borrow().broadcast_transaction(tx);
self.peers.borrow().broadcast_transaction(tx);
}
}
@ -401,13 +303,13 @@ impl PoolToNetAdapter {
/// Create a new pool to net adapter
pub fn new() -> PoolToNetAdapter {
PoolToNetAdapter {
p2p: OneTime::new(),
peers: OneTime::new(),
}
}
/// Setup the p2p server on the adapter
pub fn init(&self, p2p: Arc<p2p::Server>) {
self.p2p.init(p2p);
pub fn init(&self, peers: p2p::Peers) {
self.peers.init(peers);
}
}

View file

@ -17,7 +17,7 @@
use rand::{self, Rng};
use std::sync::{Arc, RwLock};
use std::{str, thread};
use std::thread;
use std;
use time;

View file

@ -36,18 +36,20 @@ const PEER_PREFERRED_COUNT: u32 = 8;
const SEEDS_URL: &'static str = "http://grin-tech.org/seeds.txt";
pub struct Seeder {
p2p: Arc<p2p::Server>,
peers: p2p::Peers,
p2p_server: Arc<p2p::Server>,
capabilities: p2p::Capabilities,
}
impl Seeder {
pub fn new(
capabilities: p2p::Capabilities,
p2p: Arc<p2p::Server>,
p2p_server: Arc<p2p::Server>,
peers: p2p::Peers,
) -> Seeder {
Seeder {
p2p: p2p,
peers: peers,
p2p_server: p2p_server,
capabilities: capabilities,
}
}
@ -76,7 +78,7 @@ impl Seeder {
&self,
tx: mpsc::UnboundedSender<SocketAddr>,
) -> Box<Future<Item = (), Error = String>> {
let p2p_server = self.p2p.clone();
let peers = self.peers.clone();
let capabilities = self.capabilities.clone();
// now spawn a new future to regularly check if we need to acquire more peers
@ -84,19 +86,19 @@ impl Seeder {
let mon_loop = Timer::default()
.interval(time::Duration::from_secs(30))
.for_each(move |_| {
let total_count = p2p_server.all_peers().len();
let total_count = peers.all_peers().len();
debug!(
LOGGER,
"monitor_peers: {} most_work_peers, {} connected, {} total known",
p2p_server.most_work_peers().len(),
p2p_server.connected_peers().len(),
peers.most_work_peers().len(),
peers.connected_peers().len(),
total_count,
);
let mut healthy_count = 0;
let mut banned_count = 0;
let mut defunct_count = 0;
for x in p2p_server.all_peers() {
for x in peers.all_peers() {
if x.flags == p2p::State::Healthy { healthy_count += 1 }
else if x.flags == p2p::State::Banned { banned_count += 1 }
else if x.flags == p2p::State::Defunct { defunct_count += 1 };
@ -113,14 +115,14 @@ impl Seeder {
// maintenance step first, clean up p2p server peers
{
p2p_server.clean_peers(PEER_PREFERRED_COUNT as usize);
peers.clean_peers(PEER_PREFERRED_COUNT as usize);
}
// not enough peers, getting more from db
if p2p_server.peer_count() < PEER_PREFERRED_COUNT {
if peers.peer_count() < PEER_PREFERRED_COUNT {
// loop over connected peers
// ask them for their list of peers
for p in p2p_server.connected_peers() {
for p in peers.connected_peers() {
if let Ok(p) = p.try_read() {
debug!(
LOGGER,
@ -138,7 +140,7 @@ impl Seeder {
// find some peers from our db
// and queue them up for a connection attempt
let peers = p2p_server.find_peers(
let peers = peers.find_peers(
p2p::State::Healthy,
p2p::UNKNOWN,
100,
@ -169,12 +171,13 @@ impl Seeder {
// a thread pool is required so we don't block the event loop with a
// db query
let thread_pool = cpupool::CpuPool::new(1);
let p2p_server = self.p2p.clone();
let thread_pool = cpupool::Builder::new()
.pool_size(1).name_prefix("seed").create();
let peers = self.peers.clone();
let seeder = thread_pool
.spawn_fn(move || {
// check if we have some peers in db
let peers = p2p_server.find_peers(
let peers = peers.find_peers(
p2p::State::Healthy,
p2p::FULL_HIST,
100,
@ -214,21 +217,15 @@ impl Seeder {
rx: mpsc::UnboundedReceiver<SocketAddr>,
) -> Box<Future<Item = (), Error = ()>> {
let capab = self.capabilities;
let p2p_server = self.p2p.clone();
let peers = self.peers.clone();
let p2p_server = self.p2p_server.clone();
let listener = rx.for_each(move |peer_addr| {
debug!(LOGGER, "New peer address to connect to: {}.", peer_addr);
let inner_h = h.clone();
if p2p_server.peer_count() < PEER_MAX_COUNT {
h.spawn(
connect_and_req(
capab,
p2p_server.clone(),
inner_h,
peer_addr,
)
)
};
if peers.peer_count() < PEER_MAX_COUNT {
h.spawn(connect_and_req(capab, p2p_server.clone(), peers.clone(), inner_h, peer_addr))
}
Box::new(future::ok(()))
});
Box::new(listener)
@ -292,11 +289,12 @@ pub fn predefined_seeds(
fn connect_and_req(
capab: p2p::Capabilities,
p2p: Arc<p2p::Server>,
peers: p2p::Peers,
h: reactor::Handle,
addr: SocketAddr,
) -> Box<Future<Item = (), Error = ()>> {
let connect_peer = p2p.connect_peer(addr, h);
let p2p_server = p2p.clone();
let peers = peers.clone();
let fut = connect_peer.then(move |p| {
match p {
Ok(Some(p)) => {
@ -310,7 +308,7 @@ fn connect_and_req(
},
Err(e) => {
debug!(LOGGER, "connect_and_req: {} is Defunct; {:?}", addr, e);
let _ = p2p_server.update_state(addr, p2p::State::Defunct);
let _ = peers.update_state(addr, p2p::State::Defunct);
},
}
Ok(())

View file

@ -18,6 +18,7 @@
use std::net::SocketAddr;
use std::sync::{Arc, RwLock};
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::time;
@ -51,7 +52,7 @@ pub struct Server {
chain: Arc<chain::Chain>,
/// in-memory transaction pool
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
net_adapter: Arc<NetToChainAdapter>,
currently_syncing: Arc<AtomicBool>,
}
impl Server {
@ -108,7 +109,10 @@ impl Server {
pool_adapter.set_chain(shared_chain.clone());
let currently_syncing = Arc::new(AtomicBool::new(true));
let net_adapter = Arc::new(NetToChainAdapter::new(
currently_syncing.clone(),
shared_chain.clone(),
tx_pool.clone(),
));
@ -120,11 +124,11 @@ impl Server {
net_adapter.clone(),
genesis.hash(),
)?);
chain_adapter.init(p2p_server.clone());
pool_net_adapter.init(p2p_server.clone());
net_adapter.init(p2p_server.clone());
chain_adapter.init(p2p_server.peers.clone());
pool_net_adapter.init(p2p_server.peers.clone());
let seed = seed::Seeder::new(config.capabilities, p2p_server.clone());
let seed = seed::Seeder::new(
config.capabilities, p2p_server.clone(), p2p_server.peers.clone());
match config.seeding_type.clone() {
Seeding::None => {
warn!(
@ -152,8 +156,8 @@ impl Server {
}
sync::run_sync(
net_adapter.clone(),
p2p_server.clone(),
currently_syncing.clone(),
p2p_server.peers.clone(),
shared_chain.clone(),
);
@ -165,7 +169,7 @@ impl Server {
config.api_http_addr.clone(),
shared_chain.clone(),
tx_pool.clone(),
p2p_server.clone(),
p2p_server.peers.clone(),
);
warn!(LOGGER, "Grin server started.");
@ -175,7 +179,7 @@ impl Server {
p2p: p2p_server,
chain: shared_chain,
tx_pool: tx_pool,
net_adapter: net_adapter,
currently_syncing: currently_syncing,
})
}
@ -193,7 +197,7 @@ impl Server {
/// Number of peers
pub fn peer_count(&self) -> u32 {
self.p2p.peer_count()
self.p2p.peers.peer_count()
}
/// Start mining for blocks on a separate thread. Uses toy miner by default,
@ -201,19 +205,21 @@ impl Server {
pub fn start_miner(&self, config: pow::types::MinerConfig) {
let cuckoo_size = global::sizeshift();
let proof_size = global::proofsize();
let net_adapter = self.net_adapter.clone();
let currently_syncing = self.currently_syncing.clone();
let mut miner = miner::Miner::new(config.clone(), self.chain.clone(), self.tx_pool.clone());
miner.set_debug_output_id(format!("Port {}", self.config.p2p_config.unwrap().port));
thread::spawn(move || {
// TODO push this down in the run loop so miner gets paused anytime we
// decide to sync again
let secs_5 = time::Duration::from_secs(5);
while net_adapter.is_syncing() {
thread::sleep(secs_5);
}
miner.run_loop(config.clone(), cuckoo_size as u32, proof_size);
});
let _ = thread::Builder::new()
.name("miner".to_string())
.spawn(move || {
// TODO push this down in the run loop so miner gets paused anytime we
// decide to sync again
let secs_5 = time::Duration::from_secs(5);
while currently_syncing.load(Ordering::Relaxed) {
thread::sleep(secs_5);
}
miner.run_loop(config.clone(), cuckoo_size as u32, proof_size);
});
}
/// The chain head

View file

@ -15,24 +15,24 @@
use std::thread;
use std::time::Duration;
use std::sync::{Arc, RwLock};
use std::sync::atomic::{AtomicBool, Ordering};
use time;
use adapters::NetToChainAdapter;
use chain;
use core::core::hash::{Hash, Hashed};
use p2p::{self, Peer};
use core::core::target::Difficulty;
use p2p::{self, Peer, Peers, ChainAdapter};
use types::Error;
use util::LOGGER;
/// Starts the syncing loop, just spawns two threads that loop forever
pub fn run_sync(
adapter: Arc<NetToChainAdapter>,
p2p_server: Arc<p2p::Server>,
currently_syncing: Arc<AtomicBool>,
peers: p2p::Peers,
chain: Arc<chain::Chain>,
) {
let a_inner = adapter.clone();
let p2p_inner = p2p_server.clone();
let c_inner = chain.clone();
let chain = chain.clone();
let _ = thread::Builder::new()
.name("sync".to_string())
.spawn(move || {
@ -43,13 +43,16 @@ pub fn run_sync(
thread::sleep(Duration::from_secs(30));
loop {
if a_inner.is_syncing() {
let syncing = needs_syncing(
currently_syncing.clone(), peers.clone(), chain.clone());
if syncing {
let current_time = time::now_utc();
// run the header sync every 10s
if current_time - prev_header_sync > time::Duration::seconds(10) {
header_sync(
p2p_server.clone(),
peers.clone(),
chain.clone(),
);
prev_header_sync = current_time;
@ -58,8 +61,8 @@ pub fn run_sync(
// run the body_sync every 5s
if current_time - prev_body_sync > time::Duration::seconds(5) {
body_sync(
p2p_inner.clone(),
c_inner.clone(),
peers.clone(),
chain.clone(),
);
prev_body_sync = current_time;
}
@ -72,10 +75,8 @@ pub fn run_sync(
});
}
fn body_sync(
p2p_server: Arc<p2p::Server>,
chain: Arc<chain::Chain>,
) {
fn body_sync(peers: Peers, chain: Arc<chain::Chain>) {
let body_head: chain::Tip = chain.head().unwrap();
let header_head: chain::Tip = chain.get_header_head().unwrap();
let sync_head: chain::Tip = chain.get_sync_head().unwrap();
@ -112,7 +113,7 @@ fn body_sync(
// if we have 5 most_work_peers then ask for 50 blocks total (peer_count * 10)
// max will be 80 if all 8 peers are advertising most_work
let peer_count = {
p2p_server.most_work_peers().len()
peers.most_work_peers().len()
};
let block_count = peer_count * 10;
@ -134,7 +135,7 @@ fn body_sync(
);
for hash in hashes_to_get.clone() {
let peer = p2p_server.most_work_peer();
let peer = peers.most_work_peer();
if let Some(peer) = peer {
if let Ok(peer) = peer.try_read() {
let _ = peer.send_block_request(hash);
@ -144,14 +145,11 @@ fn body_sync(
}
}
pub fn header_sync(
p2p_server: Arc<p2p::Server>,
chain: Arc<chain::Chain>,
) {
pub fn header_sync(peers: Peers, chain: Arc<chain::Chain>) {
if let Ok(header_head) = chain.get_header_head() {
let difficulty = header_head.total_difficulty;
if let Some(peer) = p2p_server.most_work_peer() {
if let Some(peer) = peers.most_work_peer() {
if let Ok(p) = peer.try_read() {
let peer_difficulty = p.info.total_difficulty.clone();
if peer_difficulty > difficulty {
@ -189,6 +187,57 @@ fn request_headers(
Ok(())
}
/// Whether we're currently syncing the chain or we're fully caught up and
/// just receiving blocks through gossip.
pub fn needs_syncing(
currently_syncing: Arc<AtomicBool>,
peers: Peers,
chain: Arc<chain::Chain>) -> bool {
let local_diff = peers.total_difficulty();
let peer = peers.most_work_peer();
// if we're already syncing, we're caught up if no peer has a higher
// difficulty than us
if currently_syncing.load(Ordering::Relaxed) {
if let Some(peer) = peer {
if let Ok(peer) = peer.try_read() {
if peer.info.total_difficulty <= local_diff {
info!(LOGGER, "sync: caught up on most worked chain, disabling sync");
currently_syncing.store(false, Ordering::Relaxed);
}
}
} else {
info!(LOGGER, "sync: no peers available, disabling sync");
currently_syncing.store(false, Ordering::Relaxed);
}
} else {
if let Some(peer) = peer {
if let Ok(peer) = peer.try_read() {
// sum the last 5 difficulties to give us the threshold
let threshold = chain
.difficulty_iter()
.filter_map(|x| x.map(|(_, x)| x).ok())
.take(5)
.fold(Difficulty::zero(), |sum, val| sum + val);
if peer.info.total_difficulty > local_diff.clone() + threshold.clone() {
info!(
LOGGER,
"sync: total_difficulty {}, peer_difficulty {}, threshold {} (last 5 blocks), enabling sync",
local_diff,
peer.info.total_difficulty,
threshold,
);
currently_syncing.store(true, Ordering::Relaxed);
}
}
}
}
currently_syncing.load(Ordering::Relaxed)
}
/// We build a locator based on sync_head.
/// Even if sync_head is significantly out of date we will "reset" it once we start getting
/// headers back from a peer.

View file

@ -47,13 +47,15 @@ pub mod handshake;
mod rate_limit;
mod msg;
mod peer;
mod peers;
mod protocol;
mod server;
mod store;
mod types;
pub use server::{DummyAdapter, Server};
pub use peers::Peers;
pub use peer::Peer;
pub use types::{Capabilities, Error, NetAdapter, P2PConfig, PeerInfo, FULL_HIST, FULL_NODE,
MAX_BLOCK_HEADERS, MAX_PEER_ADDRS, UNKNOWN};
pub use types::{Capabilities, Error, ChainAdapter, P2PConfig, PeerInfo, FULL_HIST, FULL_NODE,
MAX_BLOCK_HEADERS, MAX_PEER_ADDRS, UNKNOWN};
pub use store::{PeerData, State};

View file

@ -221,7 +221,7 @@ impl TrackingAdapter {
}
}
impl NetAdapter for TrackingAdapter {
impl ChainAdapter for TrackingAdapter {
fn total_difficulty(&self) -> Difficulty {
self.adapter.total_difficulty()
}
@ -235,7 +235,7 @@ impl NetAdapter for TrackingAdapter {
self.adapter.transaction_received(tx)
}
fn block_received(&self, b: core::Block, addr: SocketAddr) {
fn block_received(&self, b: core::Block, addr: SocketAddr) -> bool {
self.push(b.hash());
self.adapter.block_received(b, addr)
}
@ -251,7 +251,9 @@ impl NetAdapter for TrackingAdapter {
fn get_block(&self, h: Hash) -> Option<core::Block> {
self.adapter.get_block(h)
}
}
impl NetAdapter for TrackingAdapter {
fn find_peer_addrs(&self, capab: Capabilities) -> Vec<SocketAddr> {
self.adapter.find_peer_addrs(capab)
}
@ -260,10 +262,6 @@ impl NetAdapter for TrackingAdapter {
self.adapter.peer_addrs_received(addrs)
}
fn peer_connected(&self, pi: &PeerInfo) {
self.adapter.peer_connected(pi)
}
fn peer_difficulty(&self, addr: SocketAddr, diff: Difficulty, height:u64) {
self.adapter.peer_difficulty(addr, diff, height)
}

381
p2p/src/peers.rs Normal file
View file

@ -0,0 +1,381 @@
// Copyright 2016 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::{Arc, RwLock};
use rand::{thread_rng, Rng};
use core::core;
use core::core::hash::Hash;
use core::core::target::Difficulty;
use util::LOGGER;
use peer::Peer;
use store::{PeerStore, PeerData, State};
use types::*;
#[derive(Clone)]
pub struct Peers {
pub adapter: Arc<ChainAdapter>,
store: Arc<PeerStore>,
peers: Arc<RwLock<HashMap<SocketAddr, Arc<RwLock<Peer>>>>>,
}
unsafe impl Send for Peers {}
unsafe impl Sync for Peers {}
impl Peers {
pub fn new(store: PeerStore, adapter: Arc<ChainAdapter>) -> Peers {
Peers {
adapter: adapter,
store: Arc::new(store),
peers: Arc::new(RwLock::new(HashMap::new())),
}
}
/// Adds the peer to our internal peer mapping. Note that the peer is still
/// returned so the server can run it.
pub fn add_connected(&self, p: Peer) -> Arc<RwLock<Peer>> {
debug!(LOGGER, "Saving newly connected peer {}.", p.info.addr);
let peer_data = PeerData {
addr: p.info.addr,
capabilities: p.info.capabilities,
user_agent: p.info.user_agent.clone(),
flags: State::Healthy,
};
if let Err(e) = self.save_peer(&peer_data) {
error!(LOGGER, "Could not save connected peer: {:?}", e);
}
let addr = p.info.addr.clone();
let apeer = Arc::new(RwLock::new(p));
{
let mut peers = self.peers.write().unwrap();
peers.insert(addr, apeer.clone());
}
apeer.clone()
}
pub fn is_known(&self, addr: &SocketAddr) -> bool {
self.get_peer(addr).is_some()
}
pub fn connected_peers(&self) -> Vec<Arc<RwLock<Peer>>> {
self.peers.read().unwrap().values().map(|p| p.clone()).collect()
}
/// Get a peer we're connected to by address.
pub fn get_peer(&self, addr: &SocketAddr) -> Option<Arc<RwLock<Peer>>> {
self.peers.read().unwrap().get(addr).map(|p| p.clone())
}
/// Number of peers we're currently connected to.
pub fn peer_count(&self) -> u32 {
self.connected_peers().len() as u32
}
/// Return vec of all peers that currently have the most worked branch,
/// showing the highest total difficulty.
pub fn most_work_peers(&self) -> Vec<Arc<RwLock<Peer>>> {
let peers = self.connected_peers();
if peers.len() == 0 {
return vec![];
}
let max_total_difficulty = peers
.iter()
.map(|x| {
match x.try_read() {
Ok(peer) => peer.info.total_difficulty.clone(),
Err(_) => Difficulty::zero(),
}
})
.max()
.unwrap();
let mut max_peers = peers
.iter()
.filter(|x| {
match x.try_read() {
Ok(peer) => {
peer.info.total_difficulty == max_total_difficulty
},
Err(_) => false,
}
})
.cloned()
.collect::<Vec<_>>();
thread_rng().shuffle(&mut max_peers);
max_peers
}
/// Returns single random peer with the most worked branch, showing the highest total
/// difficulty.
pub fn most_work_peer(&self) -> Option<Arc<RwLock<Peer>>> {
match self.most_work_peers().first() {
Some(x) => Some(x.clone()),
None => None
}
}
/// Returns a random connected peer.
pub fn random_peer(&self) -> Option<Arc<RwLock<Peer>>> {
let peers = self.connected_peers();
Some(thread_rng().choose(&peers).unwrap().clone())
}
pub fn is_banned(&self, peer_addr: SocketAddr) -> bool {
if let Ok(peer_data) = self.store.get_peer(peer_addr) {
if peer_data.flags == State::Banned {
return true;
}
}
false
}
/// Bans a peer, disconnecting it if we're currently connected
pub fn ban_peer(&self, peer_addr: &SocketAddr) {
if let Err(e) = self.update_state(peer_addr.clone(), State::Banned) {
error!(LOGGER, "Couldn't ban {}: {:?}", peer_addr, e);
}
if let Some(peer) = self.get_peer(peer_addr) {
debug!(LOGGER, "Banning peer {}", peer_addr);
// setting peer status will get it removed at the next clean_peer
let peer = peer.write().unwrap();
peer.set_banned();
peer.stop();
}
}
/// Broadcasts the provided block to all our peers. A peer implementation
/// may drop the broadcast request if it knows the remote peer already has
/// the block.
pub fn broadcast_block(&self, b: &core::Block) {
let peers = self.connected_peers();
let mut count = 0;
for p in peers {
let p = p.read().unwrap();
if p.is_connected() {
if let Err(e) = p.send_block(b) {
debug!(LOGGER, "Error sending block to peer: {:?}", e);
} else {
count += 1;
}
}
}
debug!(LOGGER, "Broadcasted block {} to {} peers.", b.header.height, count);
}
/// Broadcasts the provided transaction to all our peers. A peer
/// implementation may drop the broadcast request if it knows the
/// remote peer already has the transaction.
pub fn broadcast_transaction(&self, tx: &core::Transaction) {
let peers = self.connected_peers();
for p in peers {
let p = p.read().unwrap();
if p.is_connected() {
if let Err(e) = p.send_transaction(tx) {
debug!(LOGGER, "Error sending block to peer: {:?}", e);
}
}
}
}
/// Ping all our connected peers. Always automatically expects a pong back or
/// disconnects. This acts as a liveness test.
pub fn check_all(&self, total_difficulty: Difficulty, height: u64) {
let peers_map = self.peers.read().unwrap();
for p in peers_map.values() {
let p = p.read().unwrap();
if p.is_connected() {
let _ = p.send_ping(total_difficulty.clone(), height);
}
}
}
/// All peer information we have in storage
pub fn all_peers(&self) -> Vec<PeerData> {
self.store.all_peers()
}
/// Find peers in store (not necessarily connected) and return their data
pub fn find_peers(&self, state: State, cap: Capabilities, count: usize) -> Vec<PeerData> {
self.store.find_peers(state, cap, count)
}
/// Whether we've already seen a peer with the provided address
pub fn exists_peer(&self, peer_addr: SocketAddr) -> Result<bool, Error> {
self.store.exists_peer(peer_addr).map_err(From::from)
}
/// Saves updated information about a peer
pub fn save_peer(&self, p: &PeerData) -> Result<(), Error> {
self.store.save_peer(p).map_err(From::from)
}
/// Updates the state of a peer in store
pub fn update_state(&self, peer_addr: SocketAddr, new_state: State) -> Result<(), Error> {
self.store.update_state(peer_addr, new_state).map_err(From::from)
}
/// Iterate over the peer list and prune all peers we have
/// lost connection to or have been deemed problematic.
/// Also avoid connected peer count getting too high.
pub fn clean_peers(&self, desired_count: usize) {
let mut rm = vec![];
// build a list of peers to be cleaned up
for peer in self.connected_peers() {
let peer_inner = peer.read().unwrap();
if peer_inner.is_banned() {
debug!(LOGGER, "cleaning {:?}, peer banned", peer_inner.info.addr);
rm.push(peer.clone());
} else if !peer_inner.is_connected() {
debug!(LOGGER, "cleaning {:?}, not connected", peer_inner.info.addr);
rm.push(peer.clone());
}
}
// now clean up peer map based on the list to remove
{
let mut peers = self.peers.write().unwrap();
for p in rm.clone() {
let p = p.read().unwrap();
peers.remove(&p.info.addr);
}
}
// ensure we do not have too many connected peers
// really fighting with the double layer of rwlocks here...
let excess_count = {
let peer_count = self.peer_count().clone() as usize;
if peer_count > desired_count {
peer_count - desired_count
} else {
0
}
};
// map peers to addrs in a block to bound how long we keep the read lock for
let addrs = {
self.connected_peers().iter().map(|x| {
let p = x.read().unwrap();
p.info.addr.clone()
}).collect::<Vec<_>>()
};
// now remove them taking a short-lived write lock each time
// maybe better to take write lock once and remove them all?
for x in addrs
.iter()
.take(excess_count) {
let mut peers = self.peers.write().unwrap();
peers.remove(x);
}
}
pub fn stop(self) {
let peers = self.connected_peers();
for peer in peers {
let peer = peer.read().unwrap();
peer.stop();
}
}
}
impl ChainAdapter for Peers {
fn total_difficulty(&self) -> Difficulty {
self.adapter.total_difficulty()
}
fn total_height(&self) -> u64 {
self.adapter.total_height()
}
fn transaction_received(&self, tx: core::Transaction) {
self.adapter.transaction_received(tx)
}
fn block_received(&self, b: core::Block, peer_addr: SocketAddr) -> bool {
if !self.adapter.block_received(b, peer_addr) {
// if the peer sent us a block that's intrinsically bad, they're either
// mistaken or manevolent, both of which require a ban
self.ban_peer(&peer_addr);
false
} else {
true
}
}
fn headers_received(&self, headers: Vec<core::BlockHeader>, peer_addr:SocketAddr) {
self.adapter.headers_received(headers, peer_addr)
}
fn locate_headers(&self, hs: Vec<Hash>) -> Vec<core::BlockHeader> {
self.adapter.locate_headers(hs)
}
fn get_block(&self, h: Hash) -> Option<core::Block> {
self.adapter.get_block(h)
}
}
impl NetAdapter for Peers {
/// Find good peers we know with the provided capability and return their
/// addresses.
fn find_peer_addrs(&self, capab: Capabilities) -> Vec<SocketAddr> {
let peers = self.find_peers(State::Healthy, capab, MAX_PEER_ADDRS as usize);
debug!(LOGGER, "Got {} peer addrs to send.", peers.len());
map_vec!(peers, |p| p.addr)
}
/// A list of peers has been received from one of our peers.
fn peer_addrs_received(&self, peer_addrs: Vec<SocketAddr>) {
debug!(LOGGER, "Received {} peer addrs, saving.", peer_addrs.len());
for pa in peer_addrs {
if let Ok(e) = self.exists_peer(pa) {
if e {
continue;
}
}
let peer = PeerData {
addr: pa,
capabilities: UNKNOWN,
user_agent: "".to_string(),
flags: State::Healthy,
};
if let Err(e) = self.save_peer(&peer) {
error!(LOGGER, "Could not save received peer address: {:?}", e);
}
}
}
fn peer_difficulty(&self, addr: SocketAddr, diff: Difficulty, height: u64) {
debug!(
LOGGER,
"peer total_diff @ height (ping/pong): {}: {} @ {} \
vs us: {} @ {}",
addr,
diff,
height,
self.total_difficulty(),
self.total_height()
);
if diff.into_num() > 0 {
if let Some(peer) = self.get_peer(&addr) {
let mut peer = peer.write().unwrap();
peer.info.total_difficulty = diff;
}
}
}
}

View file

@ -16,7 +16,6 @@
//! other peers in the network.
use std::cell::RefCell;
use std::collections::HashMap;
use std::net::{SocketAddr, Shutdown};
use std::sync::{Arc, RwLock};
use std::time::Duration;
@ -24,7 +23,6 @@ use std::time::Duration;
use futures;
use futures::{Future, Stream};
use futures::future::{self, IntoFuture};
use rand::{thread_rng, Rng};
use tokio_core::net::{TcpListener, TcpStream};
use tokio_core::reactor;
use tokio_timer::Timer;
@ -34,13 +32,14 @@ use core::core::hash::Hash;
use core::core::target::Difficulty;
use handshake::Handshake;
use peer::Peer;
use store::{PeerStore, PeerData, State};
use peers::Peers;
use store::PeerStore;
use types::*;
use util::LOGGER;
/// A no-op network adapter used for testing.
pub struct DummyAdapter {}
impl NetAdapter for DummyAdapter {
impl ChainAdapter for DummyAdapter {
fn total_difficulty(&self) -> Difficulty {
Difficulty::one()
}
@ -48,7 +47,7 @@ impl NetAdapter for DummyAdapter {
0
}
fn transaction_received(&self, _: core::Transaction) {}
fn block_received(&self, _: core::Block, _: SocketAddr) {}
fn block_received(&self, _: core::Block, _: SocketAddr) -> bool { true }
fn headers_received(&self, _: Vec<core::BlockHeader>, _:SocketAddr) {}
fn locate_headers(&self, _: Vec<Hash>) -> Vec<core::BlockHeader> {
vec![]
@ -56,11 +55,12 @@ impl NetAdapter for DummyAdapter {
fn get_block(&self, _: Hash) -> Option<core::Block> {
None
}
}
impl NetAdapter for DummyAdapter {
fn find_peer_addrs(&self, _: Capabilities) -> Vec<SocketAddr> {
vec![]
}
fn peer_addrs_received(&self, _: Vec<SocketAddr>) {}
fn peer_connected(&self, _: &PeerInfo) {}
fn peer_difficulty(&self, _: SocketAddr, _: Difficulty, _:u64) {}
}
@ -69,10 +69,8 @@ impl NetAdapter for DummyAdapter {
pub struct Server {
config: P2PConfig,
capabilities: Capabilities,
store: Arc<PeerStore>,
peers: Arc<RwLock<HashMap<SocketAddr, Arc<RwLock<Peer>>>>>,
handshake: Arc<Handshake>,
adapter: Arc<NetAdapter>,
pub peers: Peers,
stop: RefCell<Option<futures::sync::oneshot::Sender<()>>>,
}
@ -86,16 +84,14 @@ impl Server {
db_root: String,
capab: Capabilities,
config: P2PConfig,
adapter: Arc<NetAdapter>,
adapter: Arc<ChainAdapter>,
genesis: Hash,
) -> Result<Server, Error> {
Ok(Server {
config: config,
capabilities: capab,
store: Arc::new(PeerStore::new(db_root)?),
peers: Arc::new(RwLock::new(HashMap::new())),
handshake: Arc::new(Handshake::new(genesis)),
adapter: adapter,
peers: Peers::new(PeerStore::new(db_root)?, adapter),
stop: RefCell::new(None),
})
}
@ -109,39 +105,33 @@ impl Server {
let handshake = self.handshake.clone();
let peers = self.peers.clone();
let adapter = self.adapter.clone();
let capab = self.capabilities.clone();
let store = self.store.clone();
// main peer acceptance future handling handshake
let hp = h.clone();
let peers_listen = socket.incoming().map_err(From::from).map(move |(conn, _)| {
// aaaand.. reclone for the internal closures
let adapter = adapter.clone();
let store = store.clone();
let peers = peers.clone();
let peers2 = peers.clone();
let handshake = handshake.clone();
let hp = hp.clone();
future::ok(conn).and_then(move |conn| {
// Refuse banned peers connection
if let Ok(peer_addr) = conn.peer_addr() {
if let Ok(peer_data) = store.get_peer(peer_addr) {
if peer_data.flags == State::Banned {
debug!(LOGGER, "Peer {} banned, refusing connection.", peer_addr);
if let Err(e) = conn.shutdown(Shutdown::Both) {
debug!(LOGGER, "Error shutting down conn: {:?}", e);
}
return Err(Error::Banned)
if peers.is_banned(peer_addr) {
debug!(LOGGER, "Peer {} banned, refusing connection.", peer_addr);
if let Err(e) = conn.shutdown(Shutdown::Both) {
debug!(LOGGER, "Error shutting down conn: {:?}", e);
}
return Err(Error::Banned)
}
}
Ok(conn)
}).and_then(move |conn| {
let peers = peers.clone();
let total_diff = adapter.total_difficulty();
let total_diff = peers2.total_difficulty();
// accept the peer and add it to the server map
let accept = Peer::accept(
@ -149,9 +139,9 @@ impl Server {
capab,
total_diff,
&handshake.clone(),
adapter.clone(),
Arc::new(peers2.clone()),
);
let added = add_to_peers(peers, adapter.clone(), accept);
let added = add_to_peers(peers2, accept);
// wire in a future to timeout the accept after 5 secs
let timed_peer = with_timeout(Box::new(added), &hp);
@ -186,14 +176,13 @@ impl Server {
// timer to regularly check on our peers by pinging them
let adapter = self.adapter.clone();
let peers_inner = self.peers.clone();
let peers_timer = Timer::default()
.interval(Duration::new(20, 0))
.fold((), move |_, _| {
let total_diff = adapter.total_difficulty();
let total_height = adapter.total_height();
check_peers(peers_inner.clone(), total_diff, total_height);
let total_diff = peers_inner.total_difficulty();
let total_height = peers_inner.total_height();
peers_inner.check_all(total_diff, total_height);
Ok(())
});
@ -218,7 +207,8 @@ impl Server {
addr: SocketAddr,
h: reactor::Handle,
) -> Box<Future<Item = Option<Arc<RwLock<Peer>>>, Error = Error>> {
if let Some(p) = self.get_peer(&addr) {
if let Some(p) = self.peers.get_peer(&addr) {
// if we're already connected to the addr, just return the peer
debug!(LOGGER, "connect_peer: already connected {}", addr);
return Box::new(future::ok(Some(p)));
@ -229,7 +219,6 @@ impl Server {
// cloneapalooza
let peers = self.peers.clone();
let handshake = self.handshake.clone();
let adapter = self.adapter.clone();
let capab = self.capabilities.clone();
let self_addr = SocketAddr::new(self.config.host, self.config.port);
@ -245,8 +234,7 @@ impl Server {
let h2 = h.clone();
let request = socket_connect
.and_then(move |socket| {
let peers = peers.clone();
let total_diff = adapter.clone().total_difficulty();
let total_diff = peers.total_difficulty();
// connect to the peer and add it to the server map, wiring it a timeout for
// the handshake
@ -256,9 +244,9 @@ impl Server {
total_diff,
self_addr,
handshake.clone(),
adapter.clone(),
Arc::new(peers.clone()),
);
let added = add_to_peers(peers, adapter, connect);
let added = add_to_peers(peers, connect);
with_timeout(Box::new(added), &h)
})
.and_then(move |(socket, peer)| {
@ -272,256 +260,26 @@ impl Server {
Box::new(request)
}
/// Check if the server already knows this peer (is already connected).
pub fn is_known(&self, addr: &SocketAddr) -> bool {
self.get_peer(addr).is_some()
}
pub fn connected_peers(&self) -> Vec<Arc<RwLock<Peer>>> {
self.peers.read().unwrap().values().map(|p| p.clone()).collect()
}
/// Get a peer we're connected to by address.
pub fn get_peer(&self, addr: &SocketAddr) -> Option<Arc<RwLock<Peer>>> {
self.peers.read().unwrap().get(addr).map(|p| p.clone())
}
/// Have the server iterate over its peer list and prune all peers we have
/// lost connection to or have been deemed problematic.
/// Also avoid connected peer count getting too high.
pub fn clean_peers(&self, desired_count: usize) {
let mut rm = vec![];
// build a list of peers to be cleaned up
for peer in self.connected_peers() {
let peer_inner = peer.read().unwrap();
if peer_inner.is_banned() {
debug!(LOGGER, "cleaning {:?}, peer banned", peer_inner.info.addr);
rm.push(peer.clone());
} else if !peer_inner.is_connected() {
debug!(LOGGER, "cleaning {:?}, not connected", peer_inner.info.addr);
rm.push(peer.clone());
}
}
// now clean up peer map based on the list to remove
{
let mut peers = self.peers.write().unwrap();
for p in rm.clone() {
let p = p.read().unwrap();
peers.remove(&p.info.addr);
}
}
// ensure we do not have too many connected peers
// really fighting with the double layer of rwlocks here...
let excess_count = {
let peer_count = self.peer_count().clone() as usize;
if peer_count > desired_count {
peer_count - desired_count
} else {
0
}
};
// map peers to addrs in a block to bound how long we keep the read lock for
let addrs = {
self.connected_peers().iter().map(|x| {
let p = x.read().unwrap();
p.info.addr.clone()
}).collect::<Vec<_>>()
};
// now remove them taking a short-lived write lock each time
// maybe better to take write lock once and remove them all?
for x in addrs
.iter()
.take(excess_count) {
let mut peers = self.peers.write().unwrap();
peers.remove(x);
}
}
/// Return vec of all peers that currently have the most worked branch,
/// showing the highest total difficulty.
pub fn most_work_peers(&self) -> Vec<Arc<RwLock<Peer>>> {
let peers = self.connected_peers();
if peers.len() == 0 {
return vec![];
}
let max_total_difficulty = peers
.iter()
.map(|x| {
match x.try_read() {
Ok(peer) => peer.info.total_difficulty.clone(),
Err(_) => Difficulty::zero(),
}
})
.max()
.unwrap();
let mut max_peers = peers
.iter()
.filter(|x| {
match x.try_read() {
Ok(peer) => {
peer.info.total_difficulty == max_total_difficulty
},
Err(_) => false,
}
})
.cloned()
.collect::<Vec<_>>();
thread_rng().shuffle(&mut max_peers);
max_peers
}
/// Returns single random peer with the most worked branch, showing the highest total
/// difficulty.
pub fn most_work_peer(&self) -> Option<Arc<RwLock<Peer>>> {
match self.most_work_peers().first() {
Some(x) => Some(x.clone()),
None => None
}
}
/// Returns a random connected peer.
pub fn random_peer(&self) -> Option<Arc<RwLock<Peer>>> {
let peers = self.connected_peers();
Some(thread_rng().choose(&peers).unwrap().clone())
}
/// Broadcasts the provided block to all our peers. A peer implementation
/// may drop the broadcast request if it knows the remote peer already has
/// the block.
pub fn broadcast_block(&self, b: &core::Block) {
let peers = self.connected_peers();
let mut count = 0;
for p in peers {
let p = p.read().unwrap();
if p.is_connected() {
if let Err(e) = p.send_block(b) {
debug!(LOGGER, "Error sending block to peer: {:?}", e);
} else {
count += 1;
}
}
}
debug!(LOGGER, "Broadcasted block {} to {} peers.", b.header.height, count);
}
/// Broadcasts the provided transaction to all our peers. A peer
/// implementation may drop the broadcast request if it knows the
/// remote peer already has the transaction.
pub fn broadcast_transaction(&self, tx: &core::Transaction) {
let peers = self.connected_peers();
for p in peers {
let p = p.read().unwrap();
if p.is_connected() {
if let Err(e) = p.send_transaction(tx) {
debug!(LOGGER, "Error sending block to peer: {:?}", e);
}
}
}
}
/// Number of peers we're currently connected to.
pub fn peer_count(&self) -> u32 {
self.connected_peers().len() as u32
}
/// Bans a peer, disconnecting it if we're currently connected
pub fn ban_peer(&self, peer_addr: &SocketAddr) {
if let Err(e) = self.update_state(peer_addr.clone(), State::Banned) {
error!(LOGGER, "Couldn't ban {}: {:?}", peer_addr, e);
}
if let Some(peer) = self.get_peer(peer_addr) {
debug!(LOGGER, "Banning peer {}", peer_addr);
// setting peer status will get it removed at the next clean_peer
let peer = peer.write().unwrap();
peer.set_banned();
peer.stop();
}
}
/// Stops the server. Disconnect from all peers at the same time.
pub fn stop(self) {
info!(LOGGER, "calling stop on server");
let peers = self.connected_peers();
for peer in peers {
let peer = peer.read().unwrap();
peer.stop();
}
self.peers.stop();
self.stop.into_inner().unwrap().send(()).unwrap();
}
/// All peer information we have in storage
pub fn all_peers(&self) -> Vec<PeerData> {
self.store.all_peers()
}
/// Find peers in store (not necessarily connected) and return their data
pub fn find_peers(&self, state: State, cap: Capabilities, count: usize) -> Vec<PeerData> {
self.store.find_peers(state, cap, count)
}
/// Whether we've already seen a peer with the provided address
pub fn exists_peer(&self, peer_addr: SocketAddr) -> Result<bool, Error> {
self.store.exists_peer(peer_addr).map_err(From::from)
}
/// Saves updated information about a peer
pub fn save_peer(&self, p: &PeerData) -> Result<(), Error> {
self.store.save_peer(p).map_err(From::from)
}
/// Updates the state of a peer in store
pub fn update_state(&self, peer_addr: SocketAddr, new_state: State) -> Result<(), Error> {
self.store.update_state(peer_addr, new_state).map_err(From::from)
}
}
// Adds the peer built by the provided future in the peers map
fn add_to_peers<A>(
peers: Arc<RwLock<HashMap<SocketAddr, Arc<RwLock<Peer>>>>>,
adapter: Arc<NetAdapter>,
peer_fut: A,
) -> Box<Future<Item = Result<(TcpStream, Arc<RwLock<Peer>>), ()>, Error = Error>>
where
A: IntoFuture<Item = (TcpStream, Peer), Error = Error> + 'static,
{
fn add_to_peers<A>(peers: Peers, peer_fut: A)
-> Box<Future<Item = Result<(TcpStream, Arc<RwLock<Peer>>), ()>, Error = Error>>
where A: IntoFuture<Item = (TcpStream, Peer), Error = Error> + 'static {
let peer_add = peer_fut.into_future().map(move |(conn, peer)| {
adapter.peer_connected(&peer.info);
let addr = peer.info.addr.clone();
let apeer = Arc::new(RwLock::new(peer));
{
let mut peers = peers.write().unwrap();
peers.insert(addr, apeer.clone());
}
let apeer = peers.add_connected(peer);
Ok((conn, apeer))
});
Box::new(peer_add)
}
// Ping all our connected peers. Always automatically expects a pong back or
// disconnects. This acts as a liveness test.
fn check_peers(
peers: Arc<RwLock<HashMap<SocketAddr, Arc<RwLock<Peer>>>>>,
total_difficulty: Difficulty,
height: u64
) {
let peers_map = peers.read().unwrap();
for p in peers_map.values() {
let p = p.read().unwrap();
if p.is_connected() {
let _ = p.send_ping(total_difficulty.clone(), height);
}
}
}
// Adds a timeout to a future
fn with_timeout<T: 'static>(
fut: Box<Future<Item = Result<T, ()>, Error = Error>>,

View file

@ -162,7 +162,7 @@ pub trait Protocol {
/// Bridge between the networking layer and the rest of the system. Handles the
/// forwarding or querying of blocks and transactions from the network among
/// other things.
pub trait NetAdapter: Sync + Send {
pub trait ChainAdapter: Sync + Send {
/// Current total difficulty on our chain
fn total_difficulty(&self) -> Difficulty;
@ -172,8 +172,11 @@ pub trait NetAdapter: Sync + Send {
/// A valid transaction has been received from one of our peers
fn transaction_received(&self, tx: core::Transaction);
/// A block has been received from one of our peers
fn block_received(&self, b: core::Block, addr: SocketAddr);
/// A block has been received from one of our peers. Returns true if the
/// block could be handled properly and is not deemed defective by the
/// chain. Returning false means the block will nenver be valid and
/// may result in the peer being banned.
fn block_received(&self, b: core::Block, addr: SocketAddr) -> bool;
/// A set of block header has been received, typically in response to a
/// block
@ -187,7 +190,11 @@ pub trait NetAdapter: Sync + Send {
/// Gets a full block by its hash.
fn get_block(&self, h: Hash) -> Option<core::Block>;
}
/// Additional methods required by the protocol that don't need to be
/// externally implemented.
pub trait NetAdapter: ChainAdapter {
/// Find good peers we know with the provided capability and return their
/// addresses.
fn find_peer_addrs(&self, capab: Capabilities) -> Vec<SocketAddr>;
@ -195,9 +202,6 @@ pub trait NetAdapter: Sync + Send {
/// A list of peers has been received from one of our peers.
fn peer_addrs_received(&self, Vec<SocketAddr>);
/// Network successfully connected to a peer.
fn peer_connected(&self, &PeerInfo);
/// Heard total_difficulty from a connected peer (via ping/pong).
fn peer_difficulty(&self, SocketAddr, Difficulty, u64);
}

View file

@ -84,7 +84,7 @@ fn peer_handshake() {
Ok(())
})
.and_then(|_| {
assert!(server.peer_count() > 0);
assert!(server.peers.peer_count() > 0);
server.stop();
Ok(())
})

View file

@ -40,7 +40,6 @@ use clap::{App, Arg, ArgMatches, SubCommand};
use daemonize::Daemonize;
use config::GlobalConfig;
use wallet::WalletConfig;
use core::global;
use core::core::amount_to_hr_string;
use util::{init_logger, LoggingConfig, LOGGER};
@ -169,11 +168,6 @@ fn main() {
.help("Directory in which to store wallet files (defaults to current \
directory)")
.takes_value(true))
.arg(Arg::with_name("port")
.short("l")
.long("port")
.help("Port on which to run the wallet listener when in listen mode")
.takes_value(true))
.arg(Arg::with_name("external")
.short("e")
.long("external")
@ -197,18 +191,22 @@ fn main() {
.long("key_derivations")
.default_value("1000")
.takes_value(true))
.subcommand(SubCommand::with_name("listen")
.about("Run the wallet in listening mode. If an input file is \
provided, will process it, otherwise runs in server mode waiting \
for send requests.")
.arg(Arg::with_name("input")
.help("Partial transaction to receive, expects as a JSON file.")
.short("i")
.long("input")
.about("Runs the wallet in listening mode waiting for transactions.")
.arg(Arg::with_name("port")
.short("l")
.long("port")
.help("Port on which to run the wallet listener")
.takes_value(true)))
.subcommand(SubCommand::with_name("receive")
.about("Depreciated, use 'listen' instead"))
.about("Processes a JSON transaction file.")
.arg(Arg::with_name("input")
.help("Partial transaction to process, expects a JSON file.")
.short("i")
.long("input")
.takes_value(true)))
.subcommand(SubCommand::with_name("send")
.about("Builds a transaction to send someone some coins. By default, \
@ -372,10 +370,6 @@ fn wallet_command(wallet_args: &ArgMatches, global_config: GlobalConfig) {
// just get defaults from the global config
let mut wallet_config = global_config.members.unwrap().wallet;
if let Some(port) = wallet_args.value_of("port") {
wallet_config.api_listen_port = port.to_string();
}
if wallet_args.is_present("external") {
wallet_config.api_listen_interface = "0.0.0.0".to_string();
}
@ -417,14 +411,22 @@ fn wallet_command(wallet_args: &ArgMatches, global_config: GlobalConfig) {
.expect("Failed to derive keychain from seed file and passphrase.");
match wallet_args.subcommand() {
("listen", Some(listen_args)) => if let Some(f) = listen_args.value_of("input") {
let mut file = File::open(f).expect("Unable to open transaction file.");
("listen", Some(listen_args)) => {
if let Some(port) = listen_args.value_of("port") {
wallet_config.api_listen_port = port.to_string();
}
wallet::server::start_rest_apis(wallet_config, keychain);
},
("receive", Some(receive_args)) => {
let input = receive_args
.value_of("input")
.expect("Input file required");
let mut file = File::open(input)
.expect("Unable to open transaction file.");
let mut contents = String::new();
file.read_to_string(&mut contents)
.expect("Unable to read transaction file.");
wallet::receive_json_tx_str(&wallet_config, &keychain, contents.as_str()).unwrap();
} else {
wallet::server::start_rest_apis(wallet_config, keychain);
},
("send", Some(send_args)) => {
let amount = send_args

View file

@ -163,6 +163,8 @@ struct RemoveLog {
removed: Vec<(u64, u32)>,
// Holds positions temporarily until flush is called.
removed_tmp: Vec<(u64, u32)>,
// Holds truncated removed temporarily until discarded or committed
removed_bak: Vec<(u64, u32)>,
}
impl RemoveLog {
@ -174,6 +176,7 @@ impl RemoveLog {
path: path,
removed: removed,
removed_tmp: vec![],
removed_bak: vec![],
})
}
@ -181,10 +184,14 @@ impl RemoveLog {
fn truncate(&mut self, last_offs: u32) -> io::Result<()> {
// simplifying assumption: we always remove older than what's in tmp
self.removed_tmp = vec![];
// DEBUG
let _ = self.flush_truncate(last_offs);
if last_offs == 0 {
self.removed = vec![];
} else {
// backing it up before truncating
self.removed_bak = self.removed.clone();
self.removed = self.removed
.iter()
.filter(|&&(_, idx)| idx < last_offs)
@ -194,6 +201,15 @@ impl RemoveLog {
Ok(())
}
// DEBUG: saves the remove log to the side before each truncate
fn flush_truncate(&mut self, last_offs: u32) -> io::Result<()> {
let mut file = File::create(format!("{}.{}", self.path.clone(), last_offs))?;
for elmt in &self.removed {
file.write_all(&ser::ser_vec(&elmt).unwrap()[..])?;
}
file.sync_data()
}
/// Append a set of new positions to the remove log. Both adds those
/// positions the ordered in-memory set and to the file.
fn append(&mut self, elmts: Vec<u64>, index: u32) -> io::Result<()> {
@ -223,11 +239,16 @@ impl RemoveLog {
file.write_all(&ser::ser_vec(&elmt).unwrap()[..])?;
}
self.removed_tmp = vec![];
self.removed_bak = vec![];
file.sync_data()
}
/// Discard pending changes
fn discard(&mut self) {
if self.removed_bak.len() > 0 {
self.removed = self.removed_bak.clone();
self.removed_bak = vec![];
}
self.removed_tmp = vec![];
}