diff --git a/api/src/handlers.rs b/api/src/handlers.rs index eae64e1c3..5a189a6aa 100644 --- a/api/src/handlers.rs +++ b/api/src/handlers.rs @@ -231,24 +231,24 @@ impl Handler for SumTreeHandler { } pub struct PeersAllHandler { - pub p2p_server: Arc, + pub peers: p2p::Peers, } impl Handler for PeersAllHandler { fn handle(&self, _req: &mut Request) -> IronResult { - let peers = &self.p2p_server.all_peers(); + let peers = &self.peers.all_peers(); json_response_pretty(&peers) } } pub struct PeersConnectedHandler { - pub p2p_server: Arc, + pub peers: p2p::Peers, } impl Handler for PeersConnectedHandler { fn handle(&self, _req: &mut Request) -> IronResult { let mut peers = vec![]; - for p in &self.p2p_server.connected_peers() { + for p in &self.peers.connected_peers() { let p = p.read().unwrap(); let peer_info = p.info.clone(); peers.push(peer_info); @@ -262,7 +262,7 @@ impl Handler for PeersConnectedHandler { /// POST /v1/peers/10.12.12.13/ban /// TODO POST /v1/peers/10.12.12.13/unban pub struct PeerHandler { - pub p2p_server: Arc, + pub peers: p2p::Peers, } impl Handler for PeerHandler { @@ -276,7 +276,7 @@ impl Handler for PeerHandler { "ban" => { path_elems.pop(); if let Ok(addr) = path_elems.last().unwrap().parse() { - self.p2p_server.ban_peer(&addr); + self.peers.ban_peer(&addr); Ok(Response::with((status::Ok, ""))) } else { Ok(Response::with((status::BadRequest, ""))) @@ -459,74 +459,76 @@ pub fn start_rest_apis( addr: String, chain: Arc, tx_pool: Arc>>, - p2p_server: Arc, + peers: p2p::Peers, ) where T: pool::BlockChain + Send + Sync + 'static, { - thread::spawn(move || { - // build handlers and register them under the appropriate endpoint - let utxo_handler = UtxoHandler { - chain: chain.clone(), - }; - let block_handler = BlockHandler { - chain: chain.clone(), - }; - let chain_tip_handler = ChainHandler { - chain: chain.clone(), - }; - let sumtree_handler = SumTreeHandler { - chain: chain.clone(), - }; - let pool_info_handler = PoolInfoHandler { - tx_pool: tx_pool.clone(), - }; - let pool_push_handler = PoolPushHandler { - tx_pool: tx_pool.clone(), - }; - let peers_all_handler = PeersAllHandler { - p2p_server: p2p_server.clone(), - }; - let peers_connected_handler = PeersConnectedHandler { - p2p_server: p2p_server.clone(), - }; - let peer_handler = PeerHandler { - p2p_server: p2p_server.clone(), - }; +let _ = thread::Builder::new() + .name("apis".to_string()) + .spawn(move || { + // build handlers and register them under the appropriate endpoint + let utxo_handler = UtxoHandler { + chain: chain.clone(), + }; + let block_handler = BlockHandler { + chain: chain.clone(), + }; + let chain_tip_handler = ChainHandler { + chain: chain.clone(), + }; + let sumtree_handler = SumTreeHandler { + chain: chain.clone(), + }; + let pool_info_handler = PoolInfoHandler { + tx_pool: tx_pool.clone(), + }; + let pool_push_handler = PoolPushHandler { + tx_pool: tx_pool.clone(), + }; + let peers_all_handler = PeersAllHandler { + peers: peers.clone(), + }; + let peers_connected_handler = PeersConnectedHandler { + peers: peers.clone(), + }; + let peer_handler = PeerHandler { + peers: peers.clone(), + }; - let route_list = vec!( - "get /".to_string(), - "get /blocks".to_string(), - "get /chain".to_string(), - "get /chain/utxos".to_string(), - "get /sumtrees/roots".to_string(), - "get /sumtrees/lastutxos?n=10".to_string(), - "get /sumtrees/lastrangeproofs".to_string(), - "get /sumtrees/lastkernels".to_string(), - "get /pool".to_string(), - "post /pool/push".to_string(), - "get /peers/all".to_string(), - "get /peers/connected".to_string(), - ); - let index_handler = IndexHandler { list: route_list }; - let router = router!( - index: get "/" => index_handler, - blocks: get "/blocks/*" => block_handler, - chain_tip: get "/chain" => chain_tip_handler, - chain_utxos: get "/chain/utxos/*" => utxo_handler, - sumtree_roots: get "/sumtrees/*" => sumtree_handler, - pool_info: get "/pool" => pool_info_handler, - pool_push: post "/pool/push" => pool_push_handler, - peers_all: get "/peers/all" => peers_all_handler, - peers_connected: get "/peers/connected" => peers_connected_handler, - peer: post "/peers/*" => peer_handler, - ); + let route_list = vec!( + "get /".to_string(), + "get /blocks".to_string(), + "get /chain".to_string(), + "get /chain/utxos".to_string(), + "get /sumtrees/roots".to_string(), + "get /sumtrees/lastutxos?n=10".to_string(), + "get /sumtrees/lastrangeproofs".to_string(), + "get /sumtrees/lastkernels".to_string(), + "get /pool".to_string(), + "post /pool/push".to_string(), + "get /peers/all".to_string(), + "get /peers/connected".to_string(), + ); + let index_handler = IndexHandler { list: route_list }; + let router = router!( + index: get "/" => index_handler, + blocks: get "/blocks/*" => block_handler, + chain_tip: get "/chain" => chain_tip_handler, + chain_utxos: get "/chain/utxos/*" => utxo_handler, + sumtree_roots: get "/sumtrees/*" => sumtree_handler, + pool_info: get "/pool" => pool_info_handler, + pool_push: post "/pool/push" => pool_push_handler, + peers_all: get "/peers/all" => peers_all_handler, + peers_connected: get "/peers/connected" => peers_connected_handler, + peer: post "/peers/*" => peer_handler, + ); - let mut apis = ApiServer::new("/v1".to_string()); - apis.register_handler(router); + let mut apis = ApiServer::new("/v1".to_string()); + apis.register_handler(router); - info!(LOGGER, "Starting HTTP API server at {}.", addr); - apis.start(&addr[..]).unwrap_or_else(|e| { - error!(LOGGER, "Failed to start API HTTP server: {}.", e); - }); + info!(LOGGER, "Starting HTTP API server at {}.", addr); + apis.start(&addr[..]).unwrap_or_else(|e| { + error!(LOGGER, "Failed to start API HTTP server: {}.", e); + }); }); } diff --git a/chain/src/sumtree.rs b/chain/src/sumtree.rs index 1db758c59..12fb8e547 100644 --- a/chain/src/sumtree.rs +++ b/chain/src/sumtree.rs @@ -239,9 +239,13 @@ impl<'a> Extension<'a> { pub fn apply_block(&mut self, b: &Block) -> Result<(), Error> { // doing inputs first guarantees an input can't spend an output in the - // same block, enforcing block cut-through + // same block, enforcing block cut-through for input in &b.inputs { let pos_res = self.commit_index.get_output_pos(&input.commitment()); + if b.hash().to_string() == "f697a877" { + debug!(LOGGER, "input pos: {:?}, commit: {} {:?}", + pos_res, input.commitment().hash(), input.commitment()); + } if let Ok(pos) = pos_res { match self.output_pmmr.prune(pos, b.header.height as u32) { Ok(true) => { @@ -257,29 +261,22 @@ impl<'a> Extension<'a> { } } - // checking any position after the MMR size is useless, catches rewind - // edge cases - let output_max_index = self.output_pmmr.unpruned_size(); - let kernel_max_index = self.kernel_pmmr.unpruned_size(); - for out in &b.outputs { let commit = out.commitment(); if let Ok(pos) = self.commit_index.get_output_pos(&commit) { - if pos <= output_max_index { - // we need to check whether the commitment is in the current MMR view - // as well as the index doesn't support rewind and is non-authoritative - // (non-historical node will have a much smaller one) - // note that this doesn't show the commitment *never* existed, just - // that this is not an existing unspent commitment right now - if let Some(c) = self.output_pmmr.get(pos) { - let hashsum = HashSum::from_summable( - pos, &SumCommit{commit}, Some(out.switch_commit_hash)); - // as we're processing a new fork, we may get a position on the old - // fork that exists but matches a different node, filtering that - // case out - if c.hash == hashsum.hash { - return Err(Error::DuplicateCommitment(out.commitment())); - } + // we need to check whether the commitment is in the current MMR view + // as well as the index doesn't support rewind and is non-authoritative + // (non-historical node will have a much smaller one) + // note that this doesn't show the commitment *never* existed, just + // that this is not an existing unspent commitment right now + if let Some(c) = self.output_pmmr.get(pos) { + let hashsum = HashSum::from_summable( + pos, &SumCommit{commit}, Some(out.switch_commit_hash)); + // as we're processing a new fork, we may get a position on the old + // fork that exists but matches a different node, filtering that + // case out + if c.hash == hashsum.hash { + return Err(Error::DuplicateCommitment(out.commitment())); } } } @@ -303,14 +300,12 @@ impl<'a> Extension<'a> { for kernel in &b.kernels { if let Ok(pos) = self.commit_index.get_kernel_pos(&kernel.excess) { - if pos <= kernel_max_index { - // same as outputs - if let Some(k) = self.kernel_pmmr.get(pos) { - let hashsum = HashSum::from_summable( - pos, &NoSum(kernel), None::); - if k.hash == hashsum.hash { - return Err(Error::DuplicateKernel(kernel.excess.clone())); - } + // same as outputs + if let Some(k) = self.kernel_pmmr.get(pos) { + let hashsum = HashSum::from_summable( + pos, &NoSum(kernel), None::); + if k.hash == hashsum.hash { + return Err(Error::DuplicateKernel(kernel.excess.clone())); } } } diff --git a/core/src/core/pmmr.rs b/core/src/core/pmmr.rs index 7fad68b13..c1ef7756c 100644 --- a/core/src/core/pmmr.rs +++ b/core/src/core/pmmr.rs @@ -347,7 +347,11 @@ where /// Helper function to get the HashSum of a node at a given position from /// the backend. pub fn get(&self, position: u64) -> Option> { - self.backend.get(position) + if position > self.last_pos { + None + } else { + self.backend.get(position) + } } /// Helper function to get the last N nodes inserted, i.e. the last diff --git a/doc/wallet.md b/doc/wallet.md index b49dfde1d..154c13f75 100644 --- a/doc/wallet.md +++ b/doc/wallet.md @@ -53,6 +53,10 @@ Builds a transaction to send someone some coins. Creates and outputs a transacti Replaced by `listen` (see above). The `receive` command might later be recycled to actively accept one or several specific transactions. +### grin wallet request + +(tbd) + ### grin wallet burn *TESTING ONLY*: Burns the provided amount to a known key. Similar to send but burns an output to allow single-party diff --git a/grin/src/adapters.rs b/grin/src/adapters.rs index 3be4a745a..8f771007b 100644 --- a/grin/src/adapters.rs +++ b/grin/src/adapters.rs @@ -1,4 +1,4 @@ -// Copyright 2016 The Grin Developers +// Copyright 2017 The Grin Developers // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -21,7 +21,7 @@ use core::core::{self, Output}; use core::core::block::BlockHeader; use core::core::hash::{Hash, Hashed}; use core::core::target::Difficulty; -use p2p::{self, NetAdapter, PeerData, State}; +use p2p; use pool; use util::secp::pedersen::Commitment; use util::OneTime; @@ -32,13 +32,12 @@ use util::LOGGER; /// blocks and transactions are received and forwards to the chain and pool /// implementations. pub struct NetToChainAdapter { + currently_syncing: Arc, chain: Arc, - p2p_server: OneTime>, tx_pool: Arc>>, - syncing: AtomicBool, } -impl NetAdapter for NetToChainAdapter { +impl p2p::ChainAdapter for NetToChainAdapter { fn total_difficulty(&self) -> Difficulty { self.chain.total_difficulty() } @@ -65,7 +64,7 @@ impl NetAdapter for NetToChainAdapter { } } - fn block_received(&self, b: core::Block, addr: SocketAddr) { + fn block_received(&self, b: core::Block, _: SocketAddr) -> bool { let bhash = b.hash(); debug!( LOGGER, @@ -79,19 +78,16 @@ impl NetAdapter for NetToChainAdapter { if let &Err(ref e) = &res { debug!(LOGGER, "Block {} refused by chain: {:?}", bhash, e); - - // if the peer sent us a block that's intrinsically bad, they're either - // mistaken or manevolent, both of which require a ban if e.is_bad_block() { - self.p2p_server.borrow().ban_peer(&addr); - // - // // header chain should be consistent with the sync head here - // // we just banned the peer that sent a bad block so - // // sync head should resolve itself if/when we find an alternative peer - // // with more work than ourselves - // // we should not need to reset the header head here + // header chain should be consistent with the sync head here + // we just banned the peer that sent a bad block so + // sync head should resolve itself if/when we find an alternative peer + // with more work than ourselves + // we should not need to reset the header head here + return false; } } + true } fn headers_received(&self, bhs: Vec, addr: SocketAddr) { @@ -149,23 +145,9 @@ impl NetAdapter for NetToChainAdapter { locator, ); - if locator.len() == 0 { - return vec![]; - } - - // recursively go back through the locator vector - // and stop when we find a header that we recognize - // this will be a header shared in common between us and the peer - let known = self.chain.get_block_header(&locator[0]); - let header = match known { - Ok(header) => header, - Err(chain::Error::StoreErr(store::Error::NotFoundErr, _)) => { - return self.locate_headers(locator[1..].to_vec()); - } - Err(e) => { - error!(LOGGER, "Could not build header locator: {:?}", e); - return vec![]; - } + let header = match self.find_common_header(locator) { + Some(header) => header, + None => return vec![], }; debug!( @@ -207,138 +189,58 @@ impl NetAdapter for NetToChainAdapter { } } - /// Find good peers we know with the provided capability and return their - /// addresses. - fn find_peer_addrs(&self, capab: p2p::Capabilities) -> Vec { - let peers = self.p2p_server.borrow() - .find_peers(State::Healthy, capab, p2p::MAX_PEER_ADDRS as usize); - debug!(LOGGER, "Got {} peer addrs to send.", peers.len()); - map_vec!(peers, |p| p.addr) - } - - /// A list of peers has been received from one of our peers. - fn peer_addrs_received(&self, peer_addrs: Vec) { - debug!(LOGGER, "Received {} peer addrs, saving.", peer_addrs.len()); - for pa in peer_addrs { - if let Ok(e) = self.p2p_server.borrow().exists_peer(pa) { - if e { - continue; - } - } - let peer = PeerData { - addr: pa, - capabilities: p2p::UNKNOWN, - user_agent: "".to_string(), - flags: State::Healthy, - }; - if let Err(e) = self.p2p_server.borrow().save_peer(&peer) { - error!(LOGGER, "Could not save received peer address: {:?}", e); - } - } - } - - /// Network successfully connected to a peer. - fn peer_connected(&self, pi: &p2p::PeerInfo) { - debug!(LOGGER, "Saving newly connected peer {}.", pi.addr); - let peer = PeerData { - addr: pi.addr, - capabilities: pi.capabilities, - user_agent: pi.user_agent.clone(), - flags: State::Healthy, - }; - if let Err(e) = self.p2p_server.borrow().save_peer(&peer) { - error!(LOGGER, "Could not save connected peer: {:?}", e); - } - } - - fn peer_difficulty(&self, addr: SocketAddr, diff: Difficulty, height: u64) { - debug!( - LOGGER, - "peer total_diff @ height (ping/pong): {}: {} @ {} \ - vs us: {} @ {}", - addr, - diff, - height, - self.total_difficulty(), - self.total_height() - ); - - if self.p2p_server.is_initialized() { - if let Some(peer) = self.p2p_server.borrow().get_peer(&addr) { - let mut peer = peer.write().unwrap(); - peer.info.total_difficulty = diff; - } - } - } } impl NetToChainAdapter { pub fn new( + currently_syncing: Arc, chain_ref: Arc, tx_pool: Arc>>, ) -> NetToChainAdapter { NetToChainAdapter { + currently_syncing: currently_syncing, chain: chain_ref, - p2p_server: OneTime::new(), tx_pool: tx_pool, - syncing: AtomicBool::new(true), } } - /// Setup the p2p server on the adapter - pub fn init(&self, p2p: Arc) { - self.p2p_server.init(p2p); - } + // recursively go back through the locator vector and stop when we find + // a header that we recognize this will be a header shared in common + // between us and the peer + fn find_common_header(&self, locator: Vec) -> Option { + if locator.len() == 0 { + return None; + } - /// Whether we're currently syncing the chain or we're fully caught up and - /// just receiving blocks through gossip. - pub fn is_syncing(&self) -> bool { - let local_diff = self.total_difficulty(); - let peer = self.p2p_server.borrow().most_work_peer(); + let known = self.chain.get_block_header(&locator[0]); - // if we're already syncing, we're caught up if no peer has a higher - // difficulty than us - if self.syncing.load(Ordering::Relaxed) { - if let Some(peer) = peer { - if let Ok(peer) = peer.try_read() { - if peer.info.total_difficulty <= local_diff { - info!(LOGGER, "sync: caught up on most worked chain, disabling sync"); - self.syncing.store(false, Ordering::Relaxed); - } - } - } else { - info!(LOGGER, "sync: no peers available, disabling sync"); - self.syncing.store(false, Ordering::Relaxed); - } - } else { - if let Some(peer) = peer { - if let Ok(peer) = peer.try_read() { - // sum the last 5 difficulties to give us the threshold - let threshold = self.chain - .difficulty_iter() - .filter_map(|x| x.map(|(_, x)| x).ok()) - .take(5) - .fold(Difficulty::zero(), |sum, val| sum + val); - - if peer.info.total_difficulty > local_diff.clone() + threshold.clone() { - info!( - LOGGER, - "sync: total_difficulty {}, peer_difficulty {}, threshold {} (last 5 blocks), enabling sync", - local_diff, - peer.info.total_difficulty, - threshold, - ); - self.syncing.store(true, Ordering::Relaxed); + match known { + Ok(header) => { + // even if we know the block, it may not be on our winning chain + let known_winning = self.chain.get_header_by_height(header.height); + if let Ok(known_winning) = known_winning { + if known_winning.hash() != header.hash() { + self.find_common_header(locator[1..].to_vec()) + } else { + Some(header) } + } else { + self.find_common_header(locator[1..].to_vec()) } + }, + Err(chain::Error::StoreErr(store::Error::NotFoundErr, _)) => { + self.find_common_header(locator[1..].to_vec()) + }, + Err(e) => { + error!(LOGGER, "Could not build header locator: {:?}", e); + None } } - self.syncing.load(Ordering::Relaxed) } /// Prepare options for the chain pipeline fn chain_opts(&self) -> chain::Options { - let opts = if self.is_syncing() { + let opts = if self.currently_syncing.load(Ordering::Relaxed) { chain::SYNC } else { chain::NONE @@ -352,7 +254,7 @@ impl NetToChainAdapter { /// the network to broadcast the block pub struct ChainToPoolAndNetAdapter { tx_pool: Arc>>, - p2p: OneTime>, + peers: OneTime, } impl ChainAdapter for ChainToPoolAndNetAdapter { @@ -367,7 +269,7 @@ impl ChainAdapter for ChainToPoolAndNetAdapter { ); } } - self.p2p.borrow().broadcast_block(b); + self.peers.borrow().broadcast_block(b); } } @@ -377,23 +279,23 @@ impl ChainToPoolAndNetAdapter { ) -> ChainToPoolAndNetAdapter { ChainToPoolAndNetAdapter { tx_pool: tx_pool, - p2p: OneTime::new(), + peers: OneTime::new(), } } - pub fn init(&self, p2p: Arc) { - self.p2p.init(p2p); + pub fn init(&self, peers: p2p::Peers) { + self.peers.init(peers); } } /// Adapter between the transaction pool and the network, to relay /// transactions that have been accepted. pub struct PoolToNetAdapter { - p2p: OneTime>, + peers: OneTime, } impl pool::PoolAdapter for PoolToNetAdapter { fn tx_accepted(&self, tx: &core::Transaction) { - self.p2p.borrow().broadcast_transaction(tx); + self.peers.borrow().broadcast_transaction(tx); } } @@ -401,13 +303,13 @@ impl PoolToNetAdapter { /// Create a new pool to net adapter pub fn new() -> PoolToNetAdapter { PoolToNetAdapter { - p2p: OneTime::new(), + peers: OneTime::new(), } } /// Setup the p2p server on the adapter - pub fn init(&self, p2p: Arc) { - self.p2p.init(p2p); + pub fn init(&self, peers: p2p::Peers) { + self.peers.init(peers); } } diff --git a/grin/src/miner.rs b/grin/src/miner.rs index 0e21b14a4..716c01766 100644 --- a/grin/src/miner.rs +++ b/grin/src/miner.rs @@ -17,7 +17,7 @@ use rand::{self, Rng}; use std::sync::{Arc, RwLock}; -use std::{str, thread}; +use std::thread; use std; use time; @@ -588,7 +588,7 @@ impl Miner { let result=self.chain.set_sumtree_roots(&mut b); match result { Ok(_) => Ok((b, block_fees)), - //If it's a duplicate commitment, it's likely trying to use + //If it's a duplicate commitment, it's likely trying to use //a key that's already been derived but not in the wallet //for some reason, allow caller to retry Err(chain::Error::DuplicateCommitment(e)) => diff --git a/grin/src/seed.rs b/grin/src/seed.rs index ae356e680..5ab7971b8 100644 --- a/grin/src/seed.rs +++ b/grin/src/seed.rs @@ -36,18 +36,20 @@ const PEER_PREFERRED_COUNT: u32 = 8; const SEEDS_URL: &'static str = "http://grin-tech.org/seeds.txt"; pub struct Seeder { - p2p: Arc, - + peers: p2p::Peers, + p2p_server: Arc, capabilities: p2p::Capabilities, } impl Seeder { pub fn new( capabilities: p2p::Capabilities, - p2p: Arc, + p2p_server: Arc, + peers: p2p::Peers, ) -> Seeder { Seeder { - p2p: p2p, + peers: peers, + p2p_server: p2p_server, capabilities: capabilities, } } @@ -76,7 +78,7 @@ impl Seeder { &self, tx: mpsc::UnboundedSender, ) -> Box> { - let p2p_server = self.p2p.clone(); + let peers = self.peers.clone(); let capabilities = self.capabilities.clone(); // now spawn a new future to regularly check if we need to acquire more peers @@ -84,19 +86,19 @@ impl Seeder { let mon_loop = Timer::default() .interval(time::Duration::from_secs(30)) .for_each(move |_| { - let total_count = p2p_server.all_peers().len(); + let total_count = peers.all_peers().len(); debug!( LOGGER, "monitor_peers: {} most_work_peers, {} connected, {} total known", - p2p_server.most_work_peers().len(), - p2p_server.connected_peers().len(), + peers.most_work_peers().len(), + peers.connected_peers().len(), total_count, ); let mut healthy_count = 0; let mut banned_count = 0; let mut defunct_count = 0; - for x in p2p_server.all_peers() { + for x in peers.all_peers() { if x.flags == p2p::State::Healthy { healthy_count += 1 } else if x.flags == p2p::State::Banned { banned_count += 1 } else if x.flags == p2p::State::Defunct { defunct_count += 1 }; @@ -113,14 +115,14 @@ impl Seeder { // maintenance step first, clean up p2p server peers { - p2p_server.clean_peers(PEER_PREFERRED_COUNT as usize); + peers.clean_peers(PEER_PREFERRED_COUNT as usize); } // not enough peers, getting more from db - if p2p_server.peer_count() < PEER_PREFERRED_COUNT { + if peers.peer_count() < PEER_PREFERRED_COUNT { // loop over connected peers // ask them for their list of peers - for p in p2p_server.connected_peers() { + for p in peers.connected_peers() { if let Ok(p) = p.try_read() { debug!( LOGGER, @@ -138,7 +140,7 @@ impl Seeder { // find some peers from our db // and queue them up for a connection attempt - let peers = p2p_server.find_peers( + let peers = peers.find_peers( p2p::State::Healthy, p2p::UNKNOWN, 100, @@ -169,12 +171,13 @@ impl Seeder { // a thread pool is required so we don't block the event loop with a // db query - let thread_pool = cpupool::CpuPool::new(1); - let p2p_server = self.p2p.clone(); + let thread_pool = cpupool::Builder::new() + .pool_size(1).name_prefix("seed").create(); + let peers = self.peers.clone(); let seeder = thread_pool .spawn_fn(move || { // check if we have some peers in db - let peers = p2p_server.find_peers( + let peers = peers.find_peers( p2p::State::Healthy, p2p::FULL_HIST, 100, @@ -214,21 +217,15 @@ impl Seeder { rx: mpsc::UnboundedReceiver, ) -> Box> { let capab = self.capabilities; - let p2p_server = self.p2p.clone(); + let peers = self.peers.clone(); + let p2p_server = self.p2p_server.clone(); let listener = rx.for_each(move |peer_addr| { debug!(LOGGER, "New peer address to connect to: {}.", peer_addr); let inner_h = h.clone(); - if p2p_server.peer_count() < PEER_MAX_COUNT { - h.spawn( - connect_and_req( - capab, - p2p_server.clone(), - inner_h, - peer_addr, - ) - ) - }; + if peers.peer_count() < PEER_MAX_COUNT { + h.spawn(connect_and_req(capab, p2p_server.clone(), peers.clone(), inner_h, peer_addr)) + } Box::new(future::ok(())) }); Box::new(listener) @@ -292,11 +289,12 @@ pub fn predefined_seeds( fn connect_and_req( capab: p2p::Capabilities, p2p: Arc, + peers: p2p::Peers, h: reactor::Handle, addr: SocketAddr, ) -> Box> { let connect_peer = p2p.connect_peer(addr, h); - let p2p_server = p2p.clone(); + let peers = peers.clone(); let fut = connect_peer.then(move |p| { match p { Ok(Some(p)) => { @@ -310,7 +308,7 @@ fn connect_and_req( }, Err(e) => { debug!(LOGGER, "connect_and_req: {} is Defunct; {:?}", addr, e); - let _ = p2p_server.update_state(addr, p2p::State::Defunct); + let _ = peers.update_state(addr, p2p::State::Defunct); }, } Ok(()) diff --git a/grin/src/server.rs b/grin/src/server.rs index 61e9660a9..48baa35b8 100644 --- a/grin/src/server.rs +++ b/grin/src/server.rs @@ -18,6 +18,7 @@ use std::net::SocketAddr; use std::sync::{Arc, RwLock}; +use std::sync::atomic::{AtomicBool, Ordering}; use std::thread; use std::time; @@ -51,7 +52,7 @@ pub struct Server { chain: Arc, /// in-memory transaction pool tx_pool: Arc>>, - net_adapter: Arc, + currently_syncing: Arc, } impl Server { @@ -108,7 +109,10 @@ impl Server { pool_adapter.set_chain(shared_chain.clone()); + let currently_syncing = Arc::new(AtomicBool::new(true)); + let net_adapter = Arc::new(NetToChainAdapter::new( + currently_syncing.clone(), shared_chain.clone(), tx_pool.clone(), )); @@ -120,11 +124,11 @@ impl Server { net_adapter.clone(), genesis.hash(), )?); - chain_adapter.init(p2p_server.clone()); - pool_net_adapter.init(p2p_server.clone()); - net_adapter.init(p2p_server.clone()); + chain_adapter.init(p2p_server.peers.clone()); + pool_net_adapter.init(p2p_server.peers.clone()); - let seed = seed::Seeder::new(config.capabilities, p2p_server.clone()); + let seed = seed::Seeder::new( + config.capabilities, p2p_server.clone(), p2p_server.peers.clone()); match config.seeding_type.clone() { Seeding::None => { warn!( @@ -152,8 +156,8 @@ impl Server { } sync::run_sync( - net_adapter.clone(), - p2p_server.clone(), + currently_syncing.clone(), + p2p_server.peers.clone(), shared_chain.clone(), ); @@ -165,7 +169,7 @@ impl Server { config.api_http_addr.clone(), shared_chain.clone(), tx_pool.clone(), - p2p_server.clone(), + p2p_server.peers.clone(), ); warn!(LOGGER, "Grin server started."); @@ -175,7 +179,7 @@ impl Server { p2p: p2p_server, chain: shared_chain, tx_pool: tx_pool, - net_adapter: net_adapter, + currently_syncing: currently_syncing, }) } @@ -193,7 +197,7 @@ impl Server { /// Number of peers pub fn peer_count(&self) -> u32 { - self.p2p.peer_count() + self.p2p.peers.peer_count() } /// Start mining for blocks on a separate thread. Uses toy miner by default, @@ -201,19 +205,21 @@ impl Server { pub fn start_miner(&self, config: pow::types::MinerConfig) { let cuckoo_size = global::sizeshift(); let proof_size = global::proofsize(); - let net_adapter = self.net_adapter.clone(); + let currently_syncing = self.currently_syncing.clone(); let mut miner = miner::Miner::new(config.clone(), self.chain.clone(), self.tx_pool.clone()); miner.set_debug_output_id(format!("Port {}", self.config.p2p_config.unwrap().port)); - thread::spawn(move || { - // TODO push this down in the run loop so miner gets paused anytime we - // decide to sync again - let secs_5 = time::Duration::from_secs(5); - while net_adapter.is_syncing() { - thread::sleep(secs_5); - } - miner.run_loop(config.clone(), cuckoo_size as u32, proof_size); - }); + let _ = thread::Builder::new() + .name("miner".to_string()) + .spawn(move || { + // TODO push this down in the run loop so miner gets paused anytime we + // decide to sync again + let secs_5 = time::Duration::from_secs(5); + while currently_syncing.load(Ordering::Relaxed) { + thread::sleep(secs_5); + } + miner.run_loop(config.clone(), cuckoo_size as u32, proof_size); + }); } /// The chain head diff --git a/grin/src/sync.rs b/grin/src/sync.rs index 2d740273b..1571ff426 100644 --- a/grin/src/sync.rs +++ b/grin/src/sync.rs @@ -15,24 +15,24 @@ use std::thread; use std::time::Duration; use std::sync::{Arc, RwLock}; +use std::sync::atomic::{AtomicBool, Ordering}; use time; -use adapters::NetToChainAdapter; use chain; use core::core::hash::{Hash, Hashed}; -use p2p::{self, Peer}; +use core::core::target::Difficulty; +use p2p::{self, Peer, Peers, ChainAdapter}; use types::Error; use util::LOGGER; /// Starts the syncing loop, just spawns two threads that loop forever pub fn run_sync( - adapter: Arc, - p2p_server: Arc, + currently_syncing: Arc, + peers: p2p::Peers, chain: Arc, ) { - let a_inner = adapter.clone(); - let p2p_inner = p2p_server.clone(); - let c_inner = chain.clone(); + + let chain = chain.clone(); let _ = thread::Builder::new() .name("sync".to_string()) .spawn(move || { @@ -43,13 +43,16 @@ pub fn run_sync( thread::sleep(Duration::from_secs(30)); loop { - if a_inner.is_syncing() { + let syncing = needs_syncing( + currently_syncing.clone(), peers.clone(), chain.clone()); + if syncing { + let current_time = time::now_utc(); // run the header sync every 10s if current_time - prev_header_sync > time::Duration::seconds(10) { header_sync( - p2p_server.clone(), + peers.clone(), chain.clone(), ); prev_header_sync = current_time; @@ -58,8 +61,8 @@ pub fn run_sync( // run the body_sync every 5s if current_time - prev_body_sync > time::Duration::seconds(5) { body_sync( - p2p_inner.clone(), - c_inner.clone(), + peers.clone(), + chain.clone(), ); prev_body_sync = current_time; } @@ -72,10 +75,8 @@ pub fn run_sync( }); } -fn body_sync( - p2p_server: Arc, - chain: Arc, -) { +fn body_sync(peers: Peers, chain: Arc) { + let body_head: chain::Tip = chain.head().unwrap(); let header_head: chain::Tip = chain.get_header_head().unwrap(); let sync_head: chain::Tip = chain.get_sync_head().unwrap(); @@ -112,7 +113,7 @@ fn body_sync( // if we have 5 most_work_peers then ask for 50 blocks total (peer_count * 10) // max will be 80 if all 8 peers are advertising most_work let peer_count = { - p2p_server.most_work_peers().len() + peers.most_work_peers().len() }; let block_count = peer_count * 10; @@ -134,7 +135,7 @@ fn body_sync( ); for hash in hashes_to_get.clone() { - let peer = p2p_server.most_work_peer(); + let peer = peers.most_work_peer(); if let Some(peer) = peer { if let Ok(peer) = peer.try_read() { let _ = peer.send_block_request(hash); @@ -144,14 +145,11 @@ fn body_sync( } } -pub fn header_sync( - p2p_server: Arc, - chain: Arc, -) { +pub fn header_sync(peers: Peers, chain: Arc) { if let Ok(header_head) = chain.get_header_head() { let difficulty = header_head.total_difficulty; - if let Some(peer) = p2p_server.most_work_peer() { + if let Some(peer) = peers.most_work_peer() { if let Ok(p) = peer.try_read() { let peer_difficulty = p.info.total_difficulty.clone(); if peer_difficulty > difficulty { @@ -189,6 +187,57 @@ fn request_headers( Ok(()) } + +/// Whether we're currently syncing the chain or we're fully caught up and +/// just receiving blocks through gossip. +pub fn needs_syncing( + currently_syncing: Arc, + peers: Peers, + chain: Arc) -> bool { + + let local_diff = peers.total_difficulty(); + let peer = peers.most_work_peer(); + + // if we're already syncing, we're caught up if no peer has a higher + // difficulty than us + if currently_syncing.load(Ordering::Relaxed) { + if let Some(peer) = peer { + if let Ok(peer) = peer.try_read() { + if peer.info.total_difficulty <= local_diff { + info!(LOGGER, "sync: caught up on most worked chain, disabling sync"); + currently_syncing.store(false, Ordering::Relaxed); + } + } + } else { + info!(LOGGER, "sync: no peers available, disabling sync"); + currently_syncing.store(false, Ordering::Relaxed); + } + } else { + if let Some(peer) = peer { + if let Ok(peer) = peer.try_read() { + // sum the last 5 difficulties to give us the threshold + let threshold = chain + .difficulty_iter() + .filter_map(|x| x.map(|(_, x)| x).ok()) + .take(5) + .fold(Difficulty::zero(), |sum, val| sum + val); + + if peer.info.total_difficulty > local_diff.clone() + threshold.clone() { + info!( + LOGGER, + "sync: total_difficulty {}, peer_difficulty {}, threshold {} (last 5 blocks), enabling sync", + local_diff, + peer.info.total_difficulty, + threshold, + ); + currently_syncing.store(true, Ordering::Relaxed); + } + } + } + } + currently_syncing.load(Ordering::Relaxed) +} + /// We build a locator based on sync_head. /// Even if sync_head is significantly out of date we will "reset" it once we start getting /// headers back from a peer. diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index cf23fc837..09739187f 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -47,13 +47,15 @@ pub mod handshake; mod rate_limit; mod msg; mod peer; +mod peers; mod protocol; mod server; mod store; mod types; pub use server::{DummyAdapter, Server}; +pub use peers::Peers; pub use peer::Peer; -pub use types::{Capabilities, Error, NetAdapter, P2PConfig, PeerInfo, FULL_HIST, FULL_NODE, - MAX_BLOCK_HEADERS, MAX_PEER_ADDRS, UNKNOWN}; +pub use types::{Capabilities, Error, ChainAdapter, P2PConfig, PeerInfo, FULL_HIST, FULL_NODE, + MAX_BLOCK_HEADERS, MAX_PEER_ADDRS, UNKNOWN}; pub use store::{PeerData, State}; diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index e7ef3b03a..880579d43 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -221,7 +221,7 @@ impl TrackingAdapter { } } -impl NetAdapter for TrackingAdapter { +impl ChainAdapter for TrackingAdapter { fn total_difficulty(&self) -> Difficulty { self.adapter.total_difficulty() } @@ -235,7 +235,7 @@ impl NetAdapter for TrackingAdapter { self.adapter.transaction_received(tx) } - fn block_received(&self, b: core::Block, addr: SocketAddr) { + fn block_received(&self, b: core::Block, addr: SocketAddr) -> bool { self.push(b.hash()); self.adapter.block_received(b, addr) } @@ -251,7 +251,9 @@ impl NetAdapter for TrackingAdapter { fn get_block(&self, h: Hash) -> Option { self.adapter.get_block(h) } +} +impl NetAdapter for TrackingAdapter { fn find_peer_addrs(&self, capab: Capabilities) -> Vec { self.adapter.find_peer_addrs(capab) } @@ -260,10 +262,6 @@ impl NetAdapter for TrackingAdapter { self.adapter.peer_addrs_received(addrs) } - fn peer_connected(&self, pi: &PeerInfo) { - self.adapter.peer_connected(pi) - } - fn peer_difficulty(&self, addr: SocketAddr, diff: Difficulty, height:u64) { self.adapter.peer_difficulty(addr, diff, height) } diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs new file mode 100644 index 000000000..892d68b7f --- /dev/null +++ b/p2p/src/peers.rs @@ -0,0 +1,381 @@ +// Copyright 2016 The Grin Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::{Arc, RwLock}; + +use rand::{thread_rng, Rng}; + +use core::core; +use core::core::hash::Hash; +use core::core::target::Difficulty; +use util::LOGGER; + +use peer::Peer; +use store::{PeerStore, PeerData, State}; +use types::*; + +#[derive(Clone)] +pub struct Peers { + pub adapter: Arc, + store: Arc, + peers: Arc>>>>, +} + +unsafe impl Send for Peers {} +unsafe impl Sync for Peers {} + +impl Peers { + pub fn new(store: PeerStore, adapter: Arc) -> Peers { + Peers { + adapter: adapter, + store: Arc::new(store), + peers: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Adds the peer to our internal peer mapping. Note that the peer is still + /// returned so the server can run it. + pub fn add_connected(&self, p: Peer) -> Arc> { + debug!(LOGGER, "Saving newly connected peer {}.", p.info.addr); + let peer_data = PeerData { + addr: p.info.addr, + capabilities: p.info.capabilities, + user_agent: p.info.user_agent.clone(), + flags: State::Healthy, + }; + if let Err(e) = self.save_peer(&peer_data) { + error!(LOGGER, "Could not save connected peer: {:?}", e); + } + + let addr = p.info.addr.clone(); + let apeer = Arc::new(RwLock::new(p)); + { + let mut peers = self.peers.write().unwrap(); + peers.insert(addr, apeer.clone()); + } + apeer.clone() + } + + pub fn is_known(&self, addr: &SocketAddr) -> bool { + self.get_peer(addr).is_some() + } + + pub fn connected_peers(&self) -> Vec>> { + self.peers.read().unwrap().values().map(|p| p.clone()).collect() + } + + /// Get a peer we're connected to by address. + pub fn get_peer(&self, addr: &SocketAddr) -> Option>> { + self.peers.read().unwrap().get(addr).map(|p| p.clone()) + } + + /// Number of peers we're currently connected to. + pub fn peer_count(&self) -> u32 { + self.connected_peers().len() as u32 + } + + /// Return vec of all peers that currently have the most worked branch, + /// showing the highest total difficulty. + pub fn most_work_peers(&self) -> Vec>> { + let peers = self.connected_peers(); + if peers.len() == 0 { + return vec![]; + } + + let max_total_difficulty = peers + .iter() + .map(|x| { + match x.try_read() { + Ok(peer) => peer.info.total_difficulty.clone(), + Err(_) => Difficulty::zero(), + } + }) + .max() + .unwrap(); + + let mut max_peers = peers + .iter() + .filter(|x| { + match x.try_read() { + Ok(peer) => { + peer.info.total_difficulty == max_total_difficulty + }, + Err(_) => false, + } + }) + .cloned() + .collect::>(); + + thread_rng().shuffle(&mut max_peers); + max_peers + } + + /// Returns single random peer with the most worked branch, showing the highest total + /// difficulty. + pub fn most_work_peer(&self) -> Option>> { + match self.most_work_peers().first() { + Some(x) => Some(x.clone()), + None => None + } + } + + /// Returns a random connected peer. + pub fn random_peer(&self) -> Option>> { + let peers = self.connected_peers(); + Some(thread_rng().choose(&peers).unwrap().clone()) + } + + pub fn is_banned(&self, peer_addr: SocketAddr) -> bool { + if let Ok(peer_data) = self.store.get_peer(peer_addr) { + if peer_data.flags == State::Banned { + return true; + } + } + false + } + + /// Bans a peer, disconnecting it if we're currently connected + pub fn ban_peer(&self, peer_addr: &SocketAddr) { + if let Err(e) = self.update_state(peer_addr.clone(), State::Banned) { + error!(LOGGER, "Couldn't ban {}: {:?}", peer_addr, e); + } + + if let Some(peer) = self.get_peer(peer_addr) { + debug!(LOGGER, "Banning peer {}", peer_addr); + // setting peer status will get it removed at the next clean_peer + let peer = peer.write().unwrap(); + peer.set_banned(); + peer.stop(); + } + } + + /// Broadcasts the provided block to all our peers. A peer implementation + /// may drop the broadcast request if it knows the remote peer already has + /// the block. + pub fn broadcast_block(&self, b: &core::Block) { + let peers = self.connected_peers(); + let mut count = 0; + for p in peers { + let p = p.read().unwrap(); + if p.is_connected() { + if let Err(e) = p.send_block(b) { + debug!(LOGGER, "Error sending block to peer: {:?}", e); + } else { + count += 1; + } + } + } + debug!(LOGGER, "Broadcasted block {} to {} peers.", b.header.height, count); + } + + /// Broadcasts the provided transaction to all our peers. A peer + /// implementation may drop the broadcast request if it knows the + /// remote peer already has the transaction. + pub fn broadcast_transaction(&self, tx: &core::Transaction) { + let peers = self.connected_peers(); + for p in peers { + let p = p.read().unwrap(); + if p.is_connected() { + if let Err(e) = p.send_transaction(tx) { + debug!(LOGGER, "Error sending block to peer: {:?}", e); + } + } + } + } + + /// Ping all our connected peers. Always automatically expects a pong back or + /// disconnects. This acts as a liveness test. + pub fn check_all(&self, total_difficulty: Difficulty, height: u64) { + let peers_map = self.peers.read().unwrap(); + for p in peers_map.values() { + let p = p.read().unwrap(); + if p.is_connected() { + let _ = p.send_ping(total_difficulty.clone(), height); + } + } + } + + /// All peer information we have in storage + pub fn all_peers(&self) -> Vec { + self.store.all_peers() + } + + /// Find peers in store (not necessarily connected) and return their data + pub fn find_peers(&self, state: State, cap: Capabilities, count: usize) -> Vec { + self.store.find_peers(state, cap, count) + } + + /// Whether we've already seen a peer with the provided address + pub fn exists_peer(&self, peer_addr: SocketAddr) -> Result { + self.store.exists_peer(peer_addr).map_err(From::from) + } + + /// Saves updated information about a peer + pub fn save_peer(&self, p: &PeerData) -> Result<(), Error> { + self.store.save_peer(p).map_err(From::from) + } + + /// Updates the state of a peer in store + pub fn update_state(&self, peer_addr: SocketAddr, new_state: State) -> Result<(), Error> { + self.store.update_state(peer_addr, new_state).map_err(From::from) + } + + /// Iterate over the peer list and prune all peers we have + /// lost connection to or have been deemed problematic. + /// Also avoid connected peer count getting too high. + pub fn clean_peers(&self, desired_count: usize) { + let mut rm = vec![]; + + // build a list of peers to be cleaned up + for peer in self.connected_peers() { + let peer_inner = peer.read().unwrap(); + if peer_inner.is_banned() { + debug!(LOGGER, "cleaning {:?}, peer banned", peer_inner.info.addr); + rm.push(peer.clone()); + } else if !peer_inner.is_connected() { + debug!(LOGGER, "cleaning {:?}, not connected", peer_inner.info.addr); + rm.push(peer.clone()); + } + } + + // now clean up peer map based on the list to remove + { + let mut peers = self.peers.write().unwrap(); + for p in rm.clone() { + let p = p.read().unwrap(); + peers.remove(&p.info.addr); + } + } + + // ensure we do not have too many connected peers + // really fighting with the double layer of rwlocks here... + let excess_count = { + let peer_count = self.peer_count().clone() as usize; + if peer_count > desired_count { + peer_count - desired_count + } else { + 0 + } + }; + + // map peers to addrs in a block to bound how long we keep the read lock for + let addrs = { + self.connected_peers().iter().map(|x| { + let p = x.read().unwrap(); + p.info.addr.clone() + }).collect::>() + }; + + // now remove them taking a short-lived write lock each time + // maybe better to take write lock once and remove them all? + for x in addrs + .iter() + .take(excess_count) { + let mut peers = self.peers.write().unwrap(); + peers.remove(x); + } + } + + pub fn stop(self) { + let peers = self.connected_peers(); + for peer in peers { + let peer = peer.read().unwrap(); + peer.stop(); + } + } +} + +impl ChainAdapter for Peers { + fn total_difficulty(&self) -> Difficulty { + self.adapter.total_difficulty() + } + fn total_height(&self) -> u64 { + self.adapter.total_height() + } + fn transaction_received(&self, tx: core::Transaction) { + self.adapter.transaction_received(tx) + } + fn block_received(&self, b: core::Block, peer_addr: SocketAddr) -> bool { + if !self.adapter.block_received(b, peer_addr) { + // if the peer sent us a block that's intrinsically bad, they're either + // mistaken or manevolent, both of which require a ban + self.ban_peer(&peer_addr); + false + } else { + true + } + } + fn headers_received(&self, headers: Vec, peer_addr:SocketAddr) { + self.adapter.headers_received(headers, peer_addr) + } + fn locate_headers(&self, hs: Vec) -> Vec { + self.adapter.locate_headers(hs) + } + fn get_block(&self, h: Hash) -> Option { + self.adapter.get_block(h) + } +} + +impl NetAdapter for Peers { + /// Find good peers we know with the provided capability and return their + /// addresses. + fn find_peer_addrs(&self, capab: Capabilities) -> Vec { + let peers = self.find_peers(State::Healthy, capab, MAX_PEER_ADDRS as usize); + debug!(LOGGER, "Got {} peer addrs to send.", peers.len()); + map_vec!(peers, |p| p.addr) + } + + /// A list of peers has been received from one of our peers. + fn peer_addrs_received(&self, peer_addrs: Vec) { + debug!(LOGGER, "Received {} peer addrs, saving.", peer_addrs.len()); + for pa in peer_addrs { + if let Ok(e) = self.exists_peer(pa) { + if e { + continue; + } + } + let peer = PeerData { + addr: pa, + capabilities: UNKNOWN, + user_agent: "".to_string(), + flags: State::Healthy, + }; + if let Err(e) = self.save_peer(&peer) { + error!(LOGGER, "Could not save received peer address: {:?}", e); + } + } + } + + fn peer_difficulty(&self, addr: SocketAddr, diff: Difficulty, height: u64) { + debug!( + LOGGER, + "peer total_diff @ height (ping/pong): {}: {} @ {} \ + vs us: {} @ {}", + addr, + diff, + height, + self.total_difficulty(), + self.total_height() + ); + + if diff.into_num() > 0 { + if let Some(peer) = self.get_peer(&addr) { + let mut peer = peer.write().unwrap(); + peer.info.total_difficulty = diff; + } + } + } +} diff --git a/p2p/src/server.rs b/p2p/src/server.rs index 9cbe3ca72..59c1d570d 100644 --- a/p2p/src/server.rs +++ b/p2p/src/server.rs @@ -16,7 +16,6 @@ //! other peers in the network. use std::cell::RefCell; -use std::collections::HashMap; use std::net::{SocketAddr, Shutdown}; use std::sync::{Arc, RwLock}; use std::time::Duration; @@ -24,7 +23,6 @@ use std::time::Duration; use futures; use futures::{Future, Stream}; use futures::future::{self, IntoFuture}; -use rand::{thread_rng, Rng}; use tokio_core::net::{TcpListener, TcpStream}; use tokio_core::reactor; use tokio_timer::Timer; @@ -34,13 +32,14 @@ use core::core::hash::Hash; use core::core::target::Difficulty; use handshake::Handshake; use peer::Peer; -use store::{PeerStore, PeerData, State}; +use peers::Peers; +use store::PeerStore; use types::*; use util::LOGGER; /// A no-op network adapter used for testing. pub struct DummyAdapter {} -impl NetAdapter for DummyAdapter { +impl ChainAdapter for DummyAdapter { fn total_difficulty(&self) -> Difficulty { Difficulty::one() } @@ -48,7 +47,7 @@ impl NetAdapter for DummyAdapter { 0 } fn transaction_received(&self, _: core::Transaction) {} - fn block_received(&self, _: core::Block, _: SocketAddr) {} + fn block_received(&self, _: core::Block, _: SocketAddr) -> bool { true } fn headers_received(&self, _: Vec, _:SocketAddr) {} fn locate_headers(&self, _: Vec) -> Vec { vec![] @@ -56,11 +55,12 @@ impl NetAdapter for DummyAdapter { fn get_block(&self, _: Hash) -> Option { None } +} +impl NetAdapter for DummyAdapter { fn find_peer_addrs(&self, _: Capabilities) -> Vec { vec![] } fn peer_addrs_received(&self, _: Vec) {} - fn peer_connected(&self, _: &PeerInfo) {} fn peer_difficulty(&self, _: SocketAddr, _: Difficulty, _:u64) {} } @@ -69,10 +69,8 @@ impl NetAdapter for DummyAdapter { pub struct Server { config: P2PConfig, capabilities: Capabilities, - store: Arc, - peers: Arc>>>>, handshake: Arc, - adapter: Arc, + pub peers: Peers, stop: RefCell>>, } @@ -86,16 +84,14 @@ impl Server { db_root: String, capab: Capabilities, config: P2PConfig, - adapter: Arc, + adapter: Arc, genesis: Hash, ) -> Result { Ok(Server { config: config, capabilities: capab, - store: Arc::new(PeerStore::new(db_root)?), - peers: Arc::new(RwLock::new(HashMap::new())), handshake: Arc::new(Handshake::new(genesis)), - adapter: adapter, + peers: Peers::new(PeerStore::new(db_root)?, adapter), stop: RefCell::new(None), }) } @@ -109,39 +105,33 @@ impl Server { let handshake = self.handshake.clone(); let peers = self.peers.clone(); - let adapter = self.adapter.clone(); let capab = self.capabilities.clone(); - let store = self.store.clone(); // main peer acceptance future handling handshake let hp = h.clone(); let peers_listen = socket.incoming().map_err(From::from).map(move |(conn, _)| { // aaaand.. reclone for the internal closures - let adapter = adapter.clone(); - let store = store.clone(); let peers = peers.clone(); + let peers2 = peers.clone(); let handshake = handshake.clone(); let hp = hp.clone(); future::ok(conn).and_then(move |conn| { // Refuse banned peers connection if let Ok(peer_addr) = conn.peer_addr() { - if let Ok(peer_data) = store.get_peer(peer_addr) { - if peer_data.flags == State::Banned { - debug!(LOGGER, "Peer {} banned, refusing connection.", peer_addr); - if let Err(e) = conn.shutdown(Shutdown::Both) { - debug!(LOGGER, "Error shutting down conn: {:?}", e); - } - return Err(Error::Banned) + if peers.is_banned(peer_addr) { + debug!(LOGGER, "Peer {} banned, refusing connection.", peer_addr); + if let Err(e) = conn.shutdown(Shutdown::Both) { + debug!(LOGGER, "Error shutting down conn: {:?}", e); } + return Err(Error::Banned) } } Ok(conn) }).and_then(move |conn| { - let peers = peers.clone(); - let total_diff = adapter.total_difficulty(); + let total_diff = peers2.total_difficulty(); // accept the peer and add it to the server map let accept = Peer::accept( @@ -149,9 +139,9 @@ impl Server { capab, total_diff, &handshake.clone(), - adapter.clone(), + Arc::new(peers2.clone()), ); - let added = add_to_peers(peers, adapter.clone(), accept); + let added = add_to_peers(peers2, accept); // wire in a future to timeout the accept after 5 secs let timed_peer = with_timeout(Box::new(added), &hp); @@ -186,14 +176,13 @@ impl Server { // timer to regularly check on our peers by pinging them - let adapter = self.adapter.clone(); let peers_inner = self.peers.clone(); let peers_timer = Timer::default() .interval(Duration::new(20, 0)) .fold((), move |_, _| { - let total_diff = adapter.total_difficulty(); - let total_height = adapter.total_height(); - check_peers(peers_inner.clone(), total_diff, total_height); + let total_diff = peers_inner.total_difficulty(); + let total_height = peers_inner.total_height(); + peers_inner.check_all(total_diff, total_height); Ok(()) }); @@ -218,7 +207,8 @@ impl Server { addr: SocketAddr, h: reactor::Handle, ) -> Box>>, Error = Error>> { - if let Some(p) = self.get_peer(&addr) { + + if let Some(p) = self.peers.get_peer(&addr) { // if we're already connected to the addr, just return the peer debug!(LOGGER, "connect_peer: already connected {}", addr); return Box::new(future::ok(Some(p))); @@ -229,7 +219,6 @@ impl Server { // cloneapalooza let peers = self.peers.clone(); let handshake = self.handshake.clone(); - let adapter = self.adapter.clone(); let capab = self.capabilities.clone(); let self_addr = SocketAddr::new(self.config.host, self.config.port); @@ -245,8 +234,7 @@ impl Server { let h2 = h.clone(); let request = socket_connect .and_then(move |socket| { - let peers = peers.clone(); - let total_diff = adapter.clone().total_difficulty(); + let total_diff = peers.total_difficulty(); // connect to the peer and add it to the server map, wiring it a timeout for // the handshake @@ -256,9 +244,9 @@ impl Server { total_diff, self_addr, handshake.clone(), - adapter.clone(), + Arc::new(peers.clone()), ); - let added = add_to_peers(peers, adapter, connect); + let added = add_to_peers(peers, connect); with_timeout(Box::new(added), &h) }) .and_then(move |(socket, peer)| { @@ -272,256 +260,26 @@ impl Server { Box::new(request) } - /// Check if the server already knows this peer (is already connected). - pub fn is_known(&self, addr: &SocketAddr) -> bool { - self.get_peer(addr).is_some() - } - - pub fn connected_peers(&self) -> Vec>> { - self.peers.read().unwrap().values().map(|p| p.clone()).collect() - } - - /// Get a peer we're connected to by address. - pub fn get_peer(&self, addr: &SocketAddr) -> Option>> { - self.peers.read().unwrap().get(addr).map(|p| p.clone()) - } - - /// Have the server iterate over its peer list and prune all peers we have - /// lost connection to or have been deemed problematic. - /// Also avoid connected peer count getting too high. - pub fn clean_peers(&self, desired_count: usize) { - let mut rm = vec![]; - - // build a list of peers to be cleaned up - for peer in self.connected_peers() { - let peer_inner = peer.read().unwrap(); - if peer_inner.is_banned() { - debug!(LOGGER, "cleaning {:?}, peer banned", peer_inner.info.addr); - rm.push(peer.clone()); - } else if !peer_inner.is_connected() { - debug!(LOGGER, "cleaning {:?}, not connected", peer_inner.info.addr); - rm.push(peer.clone()); - } - } - - // now clean up peer map based on the list to remove - { - let mut peers = self.peers.write().unwrap(); - for p in rm.clone() { - let p = p.read().unwrap(); - peers.remove(&p.info.addr); - } - } - - // ensure we do not have too many connected peers - // really fighting with the double layer of rwlocks here... - let excess_count = { - let peer_count = self.peer_count().clone() as usize; - if peer_count > desired_count { - peer_count - desired_count - } else { - 0 - } - }; - - // map peers to addrs in a block to bound how long we keep the read lock for - let addrs = { - self.connected_peers().iter().map(|x| { - let p = x.read().unwrap(); - p.info.addr.clone() - }).collect::>() - }; - - // now remove them taking a short-lived write lock each time - // maybe better to take write lock once and remove them all? - for x in addrs - .iter() - .take(excess_count) { - let mut peers = self.peers.write().unwrap(); - peers.remove(x); - } - } - - /// Return vec of all peers that currently have the most worked branch, - /// showing the highest total difficulty. - pub fn most_work_peers(&self) -> Vec>> { - let peers = self.connected_peers(); - if peers.len() == 0 { - return vec![]; - } - - let max_total_difficulty = peers - .iter() - .map(|x| { - match x.try_read() { - Ok(peer) => peer.info.total_difficulty.clone(), - Err(_) => Difficulty::zero(), - } - }) - .max() - .unwrap(); - - let mut max_peers = peers - .iter() - .filter(|x| { - match x.try_read() { - Ok(peer) => { - peer.info.total_difficulty == max_total_difficulty - }, - Err(_) => false, - } - }) - .cloned() - .collect::>(); - - thread_rng().shuffle(&mut max_peers); - max_peers - } - - /// Returns single random peer with the most worked branch, showing the highest total - /// difficulty. - pub fn most_work_peer(&self) -> Option>> { - match self.most_work_peers().first() { - Some(x) => Some(x.clone()), - None => None - } - } - - /// Returns a random connected peer. - pub fn random_peer(&self) -> Option>> { - let peers = self.connected_peers(); - Some(thread_rng().choose(&peers).unwrap().clone()) - } - - /// Broadcasts the provided block to all our peers. A peer implementation - /// may drop the broadcast request if it knows the remote peer already has - /// the block. - pub fn broadcast_block(&self, b: &core::Block) { - let peers = self.connected_peers(); - let mut count = 0; - for p in peers { - let p = p.read().unwrap(); - if p.is_connected() { - if let Err(e) = p.send_block(b) { - debug!(LOGGER, "Error sending block to peer: {:?}", e); - } else { - count += 1; - } - } - } - debug!(LOGGER, "Broadcasted block {} to {} peers.", b.header.height, count); - } - - /// Broadcasts the provided transaction to all our peers. A peer - /// implementation may drop the broadcast request if it knows the - /// remote peer already has the transaction. - pub fn broadcast_transaction(&self, tx: &core::Transaction) { - let peers = self.connected_peers(); - for p in peers { - let p = p.read().unwrap(); - if p.is_connected() { - if let Err(e) = p.send_transaction(tx) { - debug!(LOGGER, "Error sending block to peer: {:?}", e); - } - } - } - } - - /// Number of peers we're currently connected to. - pub fn peer_count(&self) -> u32 { - self.connected_peers().len() as u32 - } - - /// Bans a peer, disconnecting it if we're currently connected - pub fn ban_peer(&self, peer_addr: &SocketAddr) { - if let Err(e) = self.update_state(peer_addr.clone(), State::Banned) { - error!(LOGGER, "Couldn't ban {}: {:?}", peer_addr, e); - } - - if let Some(peer) = self.get_peer(peer_addr) { - debug!(LOGGER, "Banning peer {}", peer_addr); - // setting peer status will get it removed at the next clean_peer - let peer = peer.write().unwrap(); - peer.set_banned(); - peer.stop(); - } - } - /// Stops the server. Disconnect from all peers at the same time. pub fn stop(self) { info!(LOGGER, "calling stop on server"); - let peers = self.connected_peers(); - for peer in peers { - let peer = peer.read().unwrap(); - peer.stop(); - } + self.peers.stop(); self.stop.into_inner().unwrap().send(()).unwrap(); } - - /// All peer information we have in storage - pub fn all_peers(&self) -> Vec { - self.store.all_peers() - } - - /// Find peers in store (not necessarily connected) and return their data - pub fn find_peers(&self, state: State, cap: Capabilities, count: usize) -> Vec { - self.store.find_peers(state, cap, count) - } - - /// Whether we've already seen a peer with the provided address - pub fn exists_peer(&self, peer_addr: SocketAddr) -> Result { - self.store.exists_peer(peer_addr).map_err(From::from) - } - - /// Saves updated information about a peer - pub fn save_peer(&self, p: &PeerData) -> Result<(), Error> { - self.store.save_peer(p).map_err(From::from) - } - - /// Updates the state of a peer in store - pub fn update_state(&self, peer_addr: SocketAddr, new_state: State) -> Result<(), Error> { - self.store.update_state(peer_addr, new_state).map_err(From::from) - } } // Adds the peer built by the provided future in the peers map -fn add_to_peers( - peers: Arc>>>>, - adapter: Arc, - peer_fut: A, -) -> Box>), ()>, Error = Error>> -where - A: IntoFuture + 'static, -{ +fn add_to_peers(peers: Peers, peer_fut: A) + -> Box>), ()>, Error = Error>> + where A: IntoFuture + 'static { + let peer_add = peer_fut.into_future().map(move |(conn, peer)| { - adapter.peer_connected(&peer.info); - let addr = peer.info.addr.clone(); - let apeer = Arc::new(RwLock::new(peer)); - { - let mut peers = peers.write().unwrap(); - peers.insert(addr, apeer.clone()); - } + let apeer = peers.add_connected(peer); Ok((conn, apeer)) }); Box::new(peer_add) } -// Ping all our connected peers. Always automatically expects a pong back or -// disconnects. This acts as a liveness test. -fn check_peers( - peers: Arc>>>>, - total_difficulty: Difficulty, - height: u64 -) { - let peers_map = peers.read().unwrap(); - for p in peers_map.values() { - let p = p.read().unwrap(); - if p.is_connected() { - let _ = p.send_ping(total_difficulty.clone(), height); - } - } -} - // Adds a timeout to a future fn with_timeout( fut: Box, Error = Error>>, diff --git a/p2p/src/types.rs b/p2p/src/types.rs index af71951a3..d0c292104 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -162,7 +162,7 @@ pub trait Protocol { /// Bridge between the networking layer and the rest of the system. Handles the /// forwarding or querying of blocks and transactions from the network among /// other things. -pub trait NetAdapter: Sync + Send { +pub trait ChainAdapter: Sync + Send { /// Current total difficulty on our chain fn total_difficulty(&self) -> Difficulty; @@ -172,8 +172,11 @@ pub trait NetAdapter: Sync + Send { /// A valid transaction has been received from one of our peers fn transaction_received(&self, tx: core::Transaction); - /// A block has been received from one of our peers - fn block_received(&self, b: core::Block, addr: SocketAddr); + /// A block has been received from one of our peers. Returns true if the + /// block could be handled properly and is not deemed defective by the + /// chain. Returning false means the block will nenver be valid and + /// may result in the peer being banned. + fn block_received(&self, b: core::Block, addr: SocketAddr) -> bool; /// A set of block header has been received, typically in response to a /// block @@ -187,7 +190,11 @@ pub trait NetAdapter: Sync + Send { /// Gets a full block by its hash. fn get_block(&self, h: Hash) -> Option; +} +/// Additional methods required by the protocol that don't need to be +/// externally implemented. +pub trait NetAdapter: ChainAdapter { /// Find good peers we know with the provided capability and return their /// addresses. fn find_peer_addrs(&self, capab: Capabilities) -> Vec; @@ -195,9 +202,6 @@ pub trait NetAdapter: Sync + Send { /// A list of peers has been received from one of our peers. fn peer_addrs_received(&self, Vec); - /// Network successfully connected to a peer. - fn peer_connected(&self, &PeerInfo); - /// Heard total_difficulty from a connected peer (via ping/pong). fn peer_difficulty(&self, SocketAddr, Difficulty, u64); } diff --git a/p2p/tests/peer_handshake.rs b/p2p/tests/peer_handshake.rs index c43cc097a..e872739a9 100644 --- a/p2p/tests/peer_handshake.rs +++ b/p2p/tests/peer_handshake.rs @@ -84,7 +84,7 @@ fn peer_handshake() { Ok(()) }) .and_then(|_| { - assert!(server.peer_count() > 0); + assert!(server.peers.peer_count() > 0); server.stop(); Ok(()) }) diff --git a/src/bin/grin.rs b/src/bin/grin.rs index 71cc66a86..64dff73c5 100644 --- a/src/bin/grin.rs +++ b/src/bin/grin.rs @@ -40,7 +40,6 @@ use clap::{App, Arg, ArgMatches, SubCommand}; use daemonize::Daemonize; use config::GlobalConfig; -use wallet::WalletConfig; use core::global; use core::core::amount_to_hr_string; use util::{init_logger, LoggingConfig, LOGGER}; @@ -169,11 +168,6 @@ fn main() { .help("Directory in which to store wallet files (defaults to current \ directory)") .takes_value(true)) - .arg(Arg::with_name("port") - .short("l") - .long("port") - .help("Port on which to run the wallet listener when in listen mode") - .takes_value(true)) .arg(Arg::with_name("external") .short("e") .long("external") @@ -197,19 +191,23 @@ fn main() { .long("key_derivations") .default_value("1000") .takes_value(true)) + .subcommand(SubCommand::with_name("listen") - .about("Run the wallet in listening mode. If an input file is \ - provided, will process it, otherwise runs in server mode waiting \ - for send requests.") + .about("Runs the wallet in listening mode waiting for transactions.") + .arg(Arg::with_name("port") + .short("l") + .long("port") + .help("Port on which to run the wallet listener") + .takes_value(true))) + + .subcommand(SubCommand::with_name("receive") + .about("Processes a JSON transaction file.") .arg(Arg::with_name("input") - .help("Partial transaction to receive, expects as a JSON file.") + .help("Partial transaction to process, expects a JSON file.") .short("i") .long("input") .takes_value(true))) - - .subcommand(SubCommand::with_name("receive") - .about("Depreciated, use 'listen' instead")) - + .subcommand(SubCommand::with_name("send") .about("Builds a transaction to send someone some coins. By default, \ the transaction will just be printed to stdout. If a destination is \ @@ -372,10 +370,6 @@ fn wallet_command(wallet_args: &ArgMatches, global_config: GlobalConfig) { // just get defaults from the global config let mut wallet_config = global_config.members.unwrap().wallet; - if let Some(port) = wallet_args.value_of("port") { - wallet_config.api_listen_port = port.to_string(); - } - if wallet_args.is_present("external") { wallet_config.api_listen_interface = "0.0.0.0".to_string(); } @@ -417,14 +411,22 @@ fn wallet_command(wallet_args: &ArgMatches, global_config: GlobalConfig) { .expect("Failed to derive keychain from seed file and passphrase."); match wallet_args.subcommand() { - ("listen", Some(listen_args)) => if let Some(f) = listen_args.value_of("input") { - let mut file = File::open(f).expect("Unable to open transaction file."); + ("listen", Some(listen_args)) => { + if let Some(port) = listen_args.value_of("port") { + wallet_config.api_listen_port = port.to_string(); + } + wallet::server::start_rest_apis(wallet_config, keychain); + }, + ("receive", Some(receive_args)) => { + let input = receive_args + .value_of("input") + .expect("Input file required"); + let mut file = File::open(input) + .expect("Unable to open transaction file."); let mut contents = String::new(); file.read_to_string(&mut contents) .expect("Unable to read transaction file."); wallet::receive_json_tx_str(&wallet_config, &keychain, contents.as_str()).unwrap(); - } else { - wallet::server::start_rest_apis(wallet_config, keychain); }, ("send", Some(send_args)) => { let amount = send_args diff --git a/store/src/sumtree.rs b/store/src/sumtree.rs index 17454da7c..7d57bbd49 100644 --- a/store/src/sumtree.rs +++ b/store/src/sumtree.rs @@ -163,6 +163,8 @@ struct RemoveLog { removed: Vec<(u64, u32)>, // Holds positions temporarily until flush is called. removed_tmp: Vec<(u64, u32)>, + // Holds truncated removed temporarily until discarded or committed + removed_bak: Vec<(u64, u32)>, } impl RemoveLog { @@ -174,6 +176,7 @@ impl RemoveLog { path: path, removed: removed, removed_tmp: vec![], + removed_bak: vec![], }) } @@ -181,10 +184,14 @@ impl RemoveLog { fn truncate(&mut self, last_offs: u32) -> io::Result<()> { // simplifying assumption: we always remove older than what's in tmp self.removed_tmp = vec![]; + // DEBUG + let _ = self.flush_truncate(last_offs); if last_offs == 0 { self.removed = vec![]; } else { + // backing it up before truncating + self.removed_bak = self.removed.clone(); self.removed = self.removed .iter() .filter(|&&(_, idx)| idx < last_offs) @@ -194,6 +201,15 @@ impl RemoveLog { Ok(()) } + // DEBUG: saves the remove log to the side before each truncate + fn flush_truncate(&mut self, last_offs: u32) -> io::Result<()> { + let mut file = File::create(format!("{}.{}", self.path.clone(), last_offs))?; + for elmt in &self.removed { + file.write_all(&ser::ser_vec(&elmt).unwrap()[..])?; + } + file.sync_data() + } + /// Append a set of new positions to the remove log. Both adds those /// positions the ordered in-memory set and to the file. fn append(&mut self, elmts: Vec, index: u32) -> io::Result<()> { @@ -223,11 +239,16 @@ impl RemoveLog { file.write_all(&ser::ser_vec(&elmt).unwrap()[..])?; } self.removed_tmp = vec![]; + self.removed_bak = vec![]; file.sync_data() } /// Discard pending changes fn discard(&mut self) { + if self.removed_bak.len() > 0 { + self.removed = self.removed_bak.clone(); + self.removed_bak = vec![]; + } self.removed_tmp = vec![]; }