From f8a9526279b37873b53dd55c8e9a346011bce1e2 Mon Sep 17 00:00:00 2001 From: Ignotus Peverell Date: Sun, 3 Dec 2017 12:46:00 +0000 Subject: [PATCH 01/11] Check before borrow, fixes #267 --- grin/src/adapters.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/grin/src/adapters.rs b/grin/src/adapters.rs index 3be4a745a..8462fef8a 100644 --- a/grin/src/adapters.rs +++ b/grin/src/adapters.rs @@ -263,7 +263,7 @@ impl NetAdapter for NetToChainAdapter { self.total_height() ); - if self.p2p_server.is_initialized() { + if diff.into_num() > 0 && self.p2p_server.is_initialized() { if let Some(peer) = self.p2p_server.borrow().get_peer(&addr) { let mut peer = peer.write().unwrap(); peer.info.total_difficulty = diff; From 060be4b82734ea8fb5ecddd8bf1bdf98aafdeb9f Mon Sep 17 00:00:00 2001 From: Ignotus Peverell Date: Thu, 30 Nov 2017 06:41:07 +0000 Subject: [PATCH 02/11] We do not want to sync with old peers anyway --- grin/src/adapters.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/grin/src/adapters.rs b/grin/src/adapters.rs index 8462fef8a..3be4a745a 100644 --- a/grin/src/adapters.rs +++ b/grin/src/adapters.rs @@ -263,7 +263,7 @@ impl NetAdapter for NetToChainAdapter { self.total_height() ); - if diff.into_num() > 0 && self.p2p_server.is_initialized() { + if self.p2p_server.is_initialized() { if let Some(peer) = self.p2p_server.borrow().get_peer(&addr) { let mut peer = peer.write().unwrap(); peer.info.total_difficulty = diff; From 6b48e7840ded968c45f7abe0fd152deb833e1683 Mon Sep 17 00:00:00 2001 From: Ignotus Peverell Date: Tue, 28 Nov 2017 15:37:02 -0500 Subject: [PATCH 03/11] Fix for locator response generation When generating the locator, we just looked for the first block in the locator we had locally. However that block could be on a losing fork, in which case we replied with blocks from that losing fork, making things worse. Now building the locator only based on blocks in our winning fork. --- grin/src/adapters.rs | 54 ++++++++++++++++++++++++++++++-------------- 1 file changed, 37 insertions(+), 17 deletions(-) diff --git a/grin/src/adapters.rs b/grin/src/adapters.rs index 3be4a745a..e9749bd9a 100644 --- a/grin/src/adapters.rs +++ b/grin/src/adapters.rs @@ -149,23 +149,9 @@ impl NetAdapter for NetToChainAdapter { locator, ); - if locator.len() == 0 { - return vec![]; - } - - // recursively go back through the locator vector - // and stop when we find a header that we recognize - // this will be a header shared in common between us and the peer - let known = self.chain.get_block_header(&locator[0]); - let header = match known { - Ok(header) => header, - Err(chain::Error::StoreErr(store::Error::NotFoundErr, _)) => { - return self.locate_headers(locator[1..].to_vec()); - } - Err(e) => { - error!(LOGGER, "Could not build header locator: {:?}", e); - return vec![]; - } + let header = match self.find_common_header(locator) { + Some(header) => header, + None => return vec![], }; debug!( @@ -335,6 +321,40 @@ impl NetToChainAdapter { } self.syncing.load(Ordering::Relaxed) } + + // recursively go back through the locator vector and stop when we find + // a header that we recognize this will be a header shared in common + // between us and the peer + fn find_common_header(&self, locator: Vec) -> Option { + if locator.len() == 0 { + return None; + } + + let known = self.chain.get_block_header(&locator[0]); + + match known { + Ok(header) => { + // even if we know the block, it may not be on our winning chain + let known_winning = self.chain.get_header_by_height(header.height); + if let Ok(known_winning) = known_winning { + if known_winning.hash() != header.hash() { + self.find_common_header(locator[1..].to_vec()) + } else { + Some(header) + } + } else { + self.find_common_header(locator[1..].to_vec()) + } + }, + Err(chain::Error::StoreErr(store::Error::NotFoundErr, _)) => { + self.find_common_header(locator[1..].to_vec()) + }, + Err(e) => { + error!(LOGGER, "Could not build header locator: {:?}", e); + None + } + } + } /// Prepare options for the chain pipeline fn chain_opts(&self) -> chain::Options { From b893a6c8ee0027981642e0c5152b18d6417ced30 Mon Sep 17 00:00:00 2001 From: Ignotus Peverell Date: Tue, 28 Nov 2017 18:43:02 -0500 Subject: [PATCH 04/11] Back up the removed log before truncating in memory Without a backup, the in-memory data structure stays truncated even if the rewind is abandoned. Also add some logging on our most problematic block in case it still is problematic. --- chain/src/sumtree.rs | 6 +++++- store/src/sumtree.rs | 21 +++++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/chain/src/sumtree.rs b/chain/src/sumtree.rs index 1db758c59..090887797 100644 --- a/chain/src/sumtree.rs +++ b/chain/src/sumtree.rs @@ -239,9 +239,13 @@ impl<'a> Extension<'a> { pub fn apply_block(&mut self, b: &Block) -> Result<(), Error> { // doing inputs first guarantees an input can't spend an output in the - // same block, enforcing block cut-through + // same block, enforcing block cut-through for input in &b.inputs { let pos_res = self.commit_index.get_output_pos(&input.commitment()); + if b.hash().to_string() == "f697a877" { + debug!(LOGGER, "input pos: {:?}, commit: {} {:?}", + pos_res, input.commitment().hash(), input.commitment()); + } if let Ok(pos) = pos_res { match self.output_pmmr.prune(pos, b.header.height as u32) { Ok(true) => { diff --git a/store/src/sumtree.rs b/store/src/sumtree.rs index 17454da7c..7d57bbd49 100644 --- a/store/src/sumtree.rs +++ b/store/src/sumtree.rs @@ -163,6 +163,8 @@ struct RemoveLog { removed: Vec<(u64, u32)>, // Holds positions temporarily until flush is called. removed_tmp: Vec<(u64, u32)>, + // Holds truncated removed temporarily until discarded or committed + removed_bak: Vec<(u64, u32)>, } impl RemoveLog { @@ -174,6 +176,7 @@ impl RemoveLog { path: path, removed: removed, removed_tmp: vec![], + removed_bak: vec![], }) } @@ -181,10 +184,14 @@ impl RemoveLog { fn truncate(&mut self, last_offs: u32) -> io::Result<()> { // simplifying assumption: we always remove older than what's in tmp self.removed_tmp = vec![]; + // DEBUG + let _ = self.flush_truncate(last_offs); if last_offs == 0 { self.removed = vec![]; } else { + // backing it up before truncating + self.removed_bak = self.removed.clone(); self.removed = self.removed .iter() .filter(|&&(_, idx)| idx < last_offs) @@ -194,6 +201,15 @@ impl RemoveLog { Ok(()) } + // DEBUG: saves the remove log to the side before each truncate + fn flush_truncate(&mut self, last_offs: u32) -> io::Result<()> { + let mut file = File::create(format!("{}.{}", self.path.clone(), last_offs))?; + for elmt in &self.removed { + file.write_all(&ser::ser_vec(&elmt).unwrap()[..])?; + } + file.sync_data() + } + /// Append a set of new positions to the remove log. Both adds those /// positions the ordered in-memory set and to the file. fn append(&mut self, elmts: Vec, index: u32) -> io::Result<()> { @@ -223,11 +239,16 @@ impl RemoveLog { file.write_all(&ser::ser_vec(&elmt).unwrap()[..])?; } self.removed_tmp = vec![]; + self.removed_bak = vec![]; file.sync_data() } /// Discard pending changes fn discard(&mut self) { + if self.removed_bak.len() > 0 { + self.removed = self.removed_bak.clone(); + self.removed_bak = vec![]; + } self.removed_tmp = vec![]; } From 7ef752f61c93bebb56b73701dfa7e7ef1cfb0bed Mon Sep 17 00:00:00 2001 From: Andrew Bellenie Date: Wed, 29 Nov 2017 02:26:53 +0000 Subject: [PATCH 05/11] Breakout listen -i to separate method (#337) * Rename wallet 'receive' to 'listen' * Handle use of previous 'receive' command gracefully and clarify docs * Move listen port from main wallet config to subcommand and break input out into own file method * Clean up arg description * Add placeholder to docs for file method in wallet * Tidy wallet help text * Rename pending wallet 'receive' to 'request' * Restore receive method for file based transactions --- doc/wallet.md | 4 ++++ src/bin/grin.rs | 45 ++++++++++++++++++++++++--------------------- 2 files changed, 28 insertions(+), 21 deletions(-) diff --git a/doc/wallet.md b/doc/wallet.md index b49dfde1d..154c13f75 100644 --- a/doc/wallet.md +++ b/doc/wallet.md @@ -53,6 +53,10 @@ Builds a transaction to send someone some coins. Creates and outputs a transacti Replaced by `listen` (see above). The `receive` command might later be recycled to actively accept one or several specific transactions. +### grin wallet request + +(tbd) + ### grin wallet burn *TESTING ONLY*: Burns the provided amount to a known key. Similar to send but burns an output to allow single-party diff --git a/src/bin/grin.rs b/src/bin/grin.rs index 71cc66a86..9a5a33df0 100644 --- a/src/bin/grin.rs +++ b/src/bin/grin.rs @@ -169,11 +169,6 @@ fn main() { .help("Directory in which to store wallet files (defaults to current \ directory)") .takes_value(true)) - .arg(Arg::with_name("port") - .short("l") - .long("port") - .help("Port on which to run the wallet listener when in listen mode") - .takes_value(true)) .arg(Arg::with_name("external") .short("e") .long("external") @@ -197,19 +192,23 @@ fn main() { .long("key_derivations") .default_value("1000") .takes_value(true)) + .subcommand(SubCommand::with_name("listen") - .about("Run the wallet in listening mode. If an input file is \ - provided, will process it, otherwise runs in server mode waiting \ - for send requests.") + .about("Runs the wallet in listening mode waiting for transactions.") + .arg(Arg::with_name("port") + .short("l") + .long("port") + .help("Port on which to run the wallet listener") + .takes_value(true))) + + .subcommand(SubCommand::with_name("receive") + .about("Processes a JSON transaction file.") .arg(Arg::with_name("input") - .help("Partial transaction to receive, expects as a JSON file.") + .help("Partial transaction to process, expects a JSON file.") .short("i") .long("input") .takes_value(true))) - - .subcommand(SubCommand::with_name("receive") - .about("Depreciated, use 'listen' instead")) - + .subcommand(SubCommand::with_name("send") .about("Builds a transaction to send someone some coins. By default, \ the transaction will just be printed to stdout. If a destination is \ @@ -372,10 +371,6 @@ fn wallet_command(wallet_args: &ArgMatches, global_config: GlobalConfig) { // just get defaults from the global config let mut wallet_config = global_config.members.unwrap().wallet; - if let Some(port) = wallet_args.value_of("port") { - wallet_config.api_listen_port = port.to_string(); - } - if wallet_args.is_present("external") { wallet_config.api_listen_interface = "0.0.0.0".to_string(); } @@ -417,14 +412,22 @@ fn wallet_command(wallet_args: &ArgMatches, global_config: GlobalConfig) { .expect("Failed to derive keychain from seed file and passphrase."); match wallet_args.subcommand() { - ("listen", Some(listen_args)) => if let Some(f) = listen_args.value_of("input") { - let mut file = File::open(f).expect("Unable to open transaction file."); + ("listen", Some(listen_args)) => { + if let Some(port) = listen_args.value_of("port") { + wallet_config.api_listen_port = port.to_string(); + } + wallet::server::start_rest_apis(wallet_config, keychain); + }, + ("receive", Some(receive_args)) => { + let input = receive_args + .value_of("input") + .expect("Input file required"); + let mut file = File::open(input) + .expect("Unable to open transaction file."); let mut contents = String::new(); file.read_to_string(&mut contents) .expect("Unable to read transaction file."); wallet::receive_json_tx_str(&wallet_config, &keychain, contents.as_str()).unwrap(); - } else { - wallet::server::start_rest_apis(wallet_config, keychain); }, ("send", Some(send_args)) => { let amount = send_args From 1a86900bac7ea93cfa211a84a6c3bb83694be86c Mon Sep 17 00:00:00 2001 From: Marco Tanzi Date: Sun, 3 Dec 2017 22:48:46 +0000 Subject: [PATCH 06/11] Removed annoying warning during the build (#420) --- grin/src/miner.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/grin/src/miner.rs b/grin/src/miner.rs index 0e21b14a4..716c01766 100644 --- a/grin/src/miner.rs +++ b/grin/src/miner.rs @@ -17,7 +17,7 @@ use rand::{self, Rng}; use std::sync::{Arc, RwLock}; -use std::{str, thread}; +use std::thread; use std; use time; @@ -588,7 +588,7 @@ impl Miner { let result=self.chain.set_sumtree_roots(&mut b); match result { Ok(_) => Ok((b, block_fees)), - //If it's a duplicate commitment, it's likely trying to use + //If it's a duplicate commitment, it's likely trying to use //a key that's already been derived but not in the wallet //for some reason, allow caller to retry Err(chain::Error::DuplicateCommitment(e)) => From cac6181c21ea61a4226a9f77926c149deb907f56 Mon Sep 17 00:00:00 2001 From: Ignotus Peverell Date: Sun, 3 Dec 2017 12:46:00 +0000 Subject: [PATCH 07/11] Check before borrow, fixes #267 --- grin/src/adapters.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/grin/src/adapters.rs b/grin/src/adapters.rs index e9749bd9a..01f5f7975 100644 --- a/grin/src/adapters.rs +++ b/grin/src/adapters.rs @@ -249,7 +249,7 @@ impl NetAdapter for NetToChainAdapter { self.total_height() ); - if self.p2p_server.is_initialized() { + if diff.into_num() > 0 && self.p2p_server.is_initialized() { if let Some(peer) = self.p2p_server.borrow().get_peer(&addr) { let mut peer = peer.write().unwrap(); peer.info.total_difficulty = diff; @@ -321,7 +321,7 @@ impl NetToChainAdapter { } self.syncing.load(Ordering::Relaxed) } - + // recursively go back through the locator vector and stop when we find // a header that we recognize this will be a header shared in common // between us and the peer From be094883de4ab463ea8bab35bc45c1a3447a522e Mon Sep 17 00:00:00 2001 From: Ignotus Peverell Date: Wed, 6 Dec 2017 00:47:26 +0000 Subject: [PATCH 08/11] Improved fix for MMR dup detection --- chain/src/sumtree.rs | 47 +++++++++++++++++-------------------------- core/src/core/pmmr.rs | 6 +++++- 2 files changed, 24 insertions(+), 29 deletions(-) diff --git a/chain/src/sumtree.rs b/chain/src/sumtree.rs index 090887797..12fb8e547 100644 --- a/chain/src/sumtree.rs +++ b/chain/src/sumtree.rs @@ -261,29 +261,22 @@ impl<'a> Extension<'a> { } } - // checking any position after the MMR size is useless, catches rewind - // edge cases - let output_max_index = self.output_pmmr.unpruned_size(); - let kernel_max_index = self.kernel_pmmr.unpruned_size(); - for out in &b.outputs { let commit = out.commitment(); if let Ok(pos) = self.commit_index.get_output_pos(&commit) { - if pos <= output_max_index { - // we need to check whether the commitment is in the current MMR view - // as well as the index doesn't support rewind and is non-authoritative - // (non-historical node will have a much smaller one) - // note that this doesn't show the commitment *never* existed, just - // that this is not an existing unspent commitment right now - if let Some(c) = self.output_pmmr.get(pos) { - let hashsum = HashSum::from_summable( - pos, &SumCommit{commit}, Some(out.switch_commit_hash)); - // as we're processing a new fork, we may get a position on the old - // fork that exists but matches a different node, filtering that - // case out - if c.hash == hashsum.hash { - return Err(Error::DuplicateCommitment(out.commitment())); - } + // we need to check whether the commitment is in the current MMR view + // as well as the index doesn't support rewind and is non-authoritative + // (non-historical node will have a much smaller one) + // note that this doesn't show the commitment *never* existed, just + // that this is not an existing unspent commitment right now + if let Some(c) = self.output_pmmr.get(pos) { + let hashsum = HashSum::from_summable( + pos, &SumCommit{commit}, Some(out.switch_commit_hash)); + // as we're processing a new fork, we may get a position on the old + // fork that exists but matches a different node, filtering that + // case out + if c.hash == hashsum.hash { + return Err(Error::DuplicateCommitment(out.commitment())); } } } @@ -307,14 +300,12 @@ impl<'a> Extension<'a> { for kernel in &b.kernels { if let Ok(pos) = self.commit_index.get_kernel_pos(&kernel.excess) { - if pos <= kernel_max_index { - // same as outputs - if let Some(k) = self.kernel_pmmr.get(pos) { - let hashsum = HashSum::from_summable( - pos, &NoSum(kernel), None::); - if k.hash == hashsum.hash { - return Err(Error::DuplicateKernel(kernel.excess.clone())); - } + // same as outputs + if let Some(k) = self.kernel_pmmr.get(pos) { + let hashsum = HashSum::from_summable( + pos, &NoSum(kernel), None::); + if k.hash == hashsum.hash { + return Err(Error::DuplicateKernel(kernel.excess.clone())); } } } diff --git a/core/src/core/pmmr.rs b/core/src/core/pmmr.rs index 7fad68b13..c1ef7756c 100644 --- a/core/src/core/pmmr.rs +++ b/core/src/core/pmmr.rs @@ -347,7 +347,11 @@ where /// Helper function to get the HashSum of a node at a given position from /// the backend. pub fn get(&self, position: u64) -> Option> { - self.backend.get(position) + if position > self.last_pos { + None + } else { + self.backend.get(position) + } } /// Helper function to get the last N nodes inserted, i.e. the last From 4a03b90190881a61e0f57cbfe4b91d633685bd36 Mon Sep 17 00:00:00 2001 From: Ignotus Peverell Date: Sun, 3 Dec 2017 00:05:43 +0000 Subject: [PATCH 09/11] Name all threads --- api/src/handlers.rs | 140 ++++++++++++++++++++++---------------------- grin/src/seed.rs | 3 +- grin/src/server.rs | 20 ++++--- 3 files changed, 84 insertions(+), 79 deletions(-) diff --git a/api/src/handlers.rs b/api/src/handlers.rs index eae64e1c3..69272d795 100644 --- a/api/src/handlers.rs +++ b/api/src/handlers.rs @@ -231,24 +231,24 @@ impl Handler for SumTreeHandler { } pub struct PeersAllHandler { - pub p2p_server: Arc, + pub peers: p2p::Peers, } impl Handler for PeersAllHandler { fn handle(&self, _req: &mut Request) -> IronResult { - let peers = &self.p2p_server.all_peers(); + let peers = &self.peers.all_peers(); json_response_pretty(&peers) } } pub struct PeersConnectedHandler { - pub p2p_server: Arc, + pub peers: p2p::Peers, } impl Handler for PeersConnectedHandler { fn handle(&self, _req: &mut Request) -> IronResult { let mut peers = vec![]; - for p in &self.p2p_server.connected_peers() { + for p in &self.peers.connected_peers() { let p = p.read().unwrap(); let peer_info = p.info.clone(); peers.push(peer_info); @@ -262,7 +262,7 @@ impl Handler for PeersConnectedHandler { /// POST /v1/peers/10.12.12.13/ban /// TODO POST /v1/peers/10.12.12.13/unban pub struct PeerHandler { - pub p2p_server: Arc, + pub peers: p2p::Peers, } impl Handler for PeerHandler { @@ -276,7 +276,7 @@ impl Handler for PeerHandler { "ban" => { path_elems.pop(); if let Ok(addr) = path_elems.last().unwrap().parse() { - self.p2p_server.ban_peer(&addr); + self.peers.ban_peer(&addr); Ok(Response::with((status::Ok, ""))) } else { Ok(Response::with((status::BadRequest, ""))) @@ -459,74 +459,76 @@ pub fn start_rest_apis( addr: String, chain: Arc, tx_pool: Arc>>, - p2p_server: Arc, + peers: p2p::Peers, ) where T: pool::BlockChain + Send + Sync + 'static, { - thread::spawn(move || { - // build handlers and register them under the appropriate endpoint - let utxo_handler = UtxoHandler { - chain: chain.clone(), - }; - let block_handler = BlockHandler { - chain: chain.clone(), - }; - let chain_tip_handler = ChainHandler { - chain: chain.clone(), - }; - let sumtree_handler = SumTreeHandler { - chain: chain.clone(), - }; - let pool_info_handler = PoolInfoHandler { - tx_pool: tx_pool.clone(), - }; - let pool_push_handler = PoolPushHandler { - tx_pool: tx_pool.clone(), - }; - let peers_all_handler = PeersAllHandler { - p2p_server: p2p_server.clone(), - }; - let peers_connected_handler = PeersConnectedHandler { - p2p_server: p2p_server.clone(), - }; - let peer_handler = PeerHandler { - p2p_server: p2p_server.clone(), - }; +let _ = thread::Builder::new() + .name("apis".to_string()) + .spawn(move || { + // build handlers and register them under the appropriate endpoint + let utxo_handler = UtxoHandler { + chain: chain.clone(), + }; + let block_handler = BlockHandler { + chain: chain.clone(), + }; + let chain_tip_handler = ChainHandler { + chain: chain.clone(), + }; + let sumtree_handler = SumTreeHandler { + chain: chain.clone(), + }; + let pool_info_handler = PoolInfoHandler { + tx_pool: tx_pool.clone(), + }; + let pool_push_handler = PoolPushHandler { + tx_pool: tx_pool.clone(), + }; + let peers_all_handler = PeersAllHandler { + p2p_server: p2p_server.clone(), + }; + let peers_connected_handler = PeersConnectedHandler { + p2p_server: p2p_server.clone(), + }; + let peer_handler = PeerHandler { + p2p_server: p2p_server.clone(), + }; - let route_list = vec!( - "get /".to_string(), - "get /blocks".to_string(), - "get /chain".to_string(), - "get /chain/utxos".to_string(), - "get /sumtrees/roots".to_string(), - "get /sumtrees/lastutxos?n=10".to_string(), - "get /sumtrees/lastrangeproofs".to_string(), - "get /sumtrees/lastkernels".to_string(), - "get /pool".to_string(), - "post /pool/push".to_string(), - "get /peers/all".to_string(), - "get /peers/connected".to_string(), - ); - let index_handler = IndexHandler { list: route_list }; - let router = router!( - index: get "/" => index_handler, - blocks: get "/blocks/*" => block_handler, - chain_tip: get "/chain" => chain_tip_handler, - chain_utxos: get "/chain/utxos/*" => utxo_handler, - sumtree_roots: get "/sumtrees/*" => sumtree_handler, - pool_info: get "/pool" => pool_info_handler, - pool_push: post "/pool/push" => pool_push_handler, - peers_all: get "/peers/all" => peers_all_handler, - peers_connected: get "/peers/connected" => peers_connected_handler, - peer: post "/peers/*" => peer_handler, - ); + let route_list = vec!( + "get /".to_string(), + "get /blocks".to_string(), + "get /chain".to_string(), + "get /chain/utxos".to_string(), + "get /sumtrees/roots".to_string(), + "get /sumtrees/lastutxos?n=10".to_string(), + "get /sumtrees/lastrangeproofs".to_string(), + "get /sumtrees/lastkernels".to_string(), + "get /pool".to_string(), + "post /pool/push".to_string(), + "get /peers/all".to_string(), + "get /peers/connected".to_string(), + ); + let index_handler = IndexHandler { list: route_list }; + let router = router!( + index: get "/" => index_handler, + blocks: get "/blocks/*" => block_handler, + chain_tip: get "/chain" => chain_tip_handler, + chain_utxos: get "/chain/utxos/*" => utxo_handler, + sumtree_roots: get "/sumtrees/*" => sumtree_handler, + pool_info: get "/pool" => pool_info_handler, + pool_push: post "/pool/push" => pool_push_handler, + peers_all: get "/peers/all" => peers_all_handler, + peers_connected: get "/peers/connected" => peers_connected_handler, + peer: post "/peers/*" => peer_handler, + ); - let mut apis = ApiServer::new("/v1".to_string()); - apis.register_handler(router); + let mut apis = ApiServer::new("/v1".to_string()); + apis.register_handler(router); - info!(LOGGER, "Starting HTTP API server at {}.", addr); - apis.start(&addr[..]).unwrap_or_else(|e| { - error!(LOGGER, "Failed to start API HTTP server: {}.", e); - }); + info!(LOGGER, "Starting HTTP API server at {}.", addr); + apis.start(&addr[..]).unwrap_or_else(|e| { + error!(LOGGER, "Failed to start API HTTP server: {}.", e); + }); }); } diff --git a/grin/src/seed.rs b/grin/src/seed.rs index ae356e680..6a0196da1 100644 --- a/grin/src/seed.rs +++ b/grin/src/seed.rs @@ -169,7 +169,8 @@ impl Seeder { // a thread pool is required so we don't block the event loop with a // db query - let thread_pool = cpupool::CpuPool::new(1); + let thread_pool = cpupool::Builder::new() + .pool_size(1).name_prefix("seed").create(); let p2p_server = self.p2p.clone(); let seeder = thread_pool .spawn_fn(move || { diff --git a/grin/src/server.rs b/grin/src/server.rs index 61e9660a9..8862fa7f6 100644 --- a/grin/src/server.rs +++ b/grin/src/server.rs @@ -205,15 +205,17 @@ impl Server { let mut miner = miner::Miner::new(config.clone(), self.chain.clone(), self.tx_pool.clone()); miner.set_debug_output_id(format!("Port {}", self.config.p2p_config.unwrap().port)); - thread::spawn(move || { - // TODO push this down in the run loop so miner gets paused anytime we - // decide to sync again - let secs_5 = time::Duration::from_secs(5); - while net_adapter.is_syncing() { - thread::sleep(secs_5); - } - miner.run_loop(config.clone(), cuckoo_size as u32, proof_size); - }); + let _ = thread::Builder::new() + .name("miner".to_string()) + .spawn(move || { + // TODO push this down in the run loop so miner gets paused anytime we + // decide to sync again + let secs_5 = time::Duration::from_secs(5); + while net_adapter.is_syncing() { + thread::sleep(secs_5); + } + miner.run_loop(config.clone(), cuckoo_size as u32, proof_size); + }); } /// The chain head From e50703d79e0c91f942d000875111a0096bae3ce0 Mon Sep 17 00:00:00 2001 From: Ignotus Peverell Date: Tue, 12 Dec 2017 16:40:26 +0000 Subject: [PATCH 10/11] Externalized all peers selection and handling (#468) Moved handling to the peer map out of the p2p server and into its own struct. Allowed factoring code from the net adapter and simplification of some interactions. Also removes the need for the adapter to reference the p2p server or peers. Fixes #430, #453 and #456 --- api/src/handlers.rs | 6 +- grin/src/adapters.rs | 168 +++------------- grin/src/seed.rs | 53 +++-- grin/src/server.rs | 28 +-- grin/src/sync.rs | 93 ++++++--- p2p/src/lib.rs | 6 +- p2p/src/peer.rs | 10 +- p2p/src/peers.rs | 381 ++++++++++++++++++++++++++++++++++++ p2p/src/server.rs | 309 ++++------------------------- p2p/src/types.rs | 16 +- p2p/tests/peer_handshake.rs | 2 +- src/bin/grin.rs | 1 - 12 files changed, 573 insertions(+), 500 deletions(-) create mode 100644 p2p/src/peers.rs diff --git a/api/src/handlers.rs b/api/src/handlers.rs index 69272d795..5a189a6aa 100644 --- a/api/src/handlers.rs +++ b/api/src/handlers.rs @@ -486,13 +486,13 @@ let _ = thread::Builder::new() tx_pool: tx_pool.clone(), }; let peers_all_handler = PeersAllHandler { - p2p_server: p2p_server.clone(), + peers: peers.clone(), }; let peers_connected_handler = PeersConnectedHandler { - p2p_server: p2p_server.clone(), + peers: peers.clone(), }; let peer_handler = PeerHandler { - p2p_server: p2p_server.clone(), + peers: peers.clone(), }; let route_list = vec!( diff --git a/grin/src/adapters.rs b/grin/src/adapters.rs index 01f5f7975..8f771007b 100644 --- a/grin/src/adapters.rs +++ b/grin/src/adapters.rs @@ -1,4 +1,4 @@ -// Copyright 2016 The Grin Developers +// Copyright 2017 The Grin Developers // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -21,7 +21,7 @@ use core::core::{self, Output}; use core::core::block::BlockHeader; use core::core::hash::{Hash, Hashed}; use core::core::target::Difficulty; -use p2p::{self, NetAdapter, PeerData, State}; +use p2p; use pool; use util::secp::pedersen::Commitment; use util::OneTime; @@ -32,13 +32,12 @@ use util::LOGGER; /// blocks and transactions are received and forwards to the chain and pool /// implementations. pub struct NetToChainAdapter { + currently_syncing: Arc, chain: Arc, - p2p_server: OneTime>, tx_pool: Arc>>, - syncing: AtomicBool, } -impl NetAdapter for NetToChainAdapter { +impl p2p::ChainAdapter for NetToChainAdapter { fn total_difficulty(&self) -> Difficulty { self.chain.total_difficulty() } @@ -65,7 +64,7 @@ impl NetAdapter for NetToChainAdapter { } } - fn block_received(&self, b: core::Block, addr: SocketAddr) { + fn block_received(&self, b: core::Block, _: SocketAddr) -> bool { let bhash = b.hash(); debug!( LOGGER, @@ -79,19 +78,16 @@ impl NetAdapter for NetToChainAdapter { if let &Err(ref e) = &res { debug!(LOGGER, "Block {} refused by chain: {:?}", bhash, e); - - // if the peer sent us a block that's intrinsically bad, they're either - // mistaken or manevolent, both of which require a ban if e.is_bad_block() { - self.p2p_server.borrow().ban_peer(&addr); - // - // // header chain should be consistent with the sync head here - // // we just banned the peer that sent a bad block so - // // sync head should resolve itself if/when we find an alternative peer - // // with more work than ourselves - // // we should not need to reset the header head here + // header chain should be consistent with the sync head here + // we just banned the peer that sent a bad block so + // sync head should resolve itself if/when we find an alternative peer + // with more work than ourselves + // we should not need to reset the header head here + return false; } } + true } fn headers_received(&self, bhs: Vec, addr: SocketAddr) { @@ -193,135 +189,21 @@ impl NetAdapter for NetToChainAdapter { } } - /// Find good peers we know with the provided capability and return their - /// addresses. - fn find_peer_addrs(&self, capab: p2p::Capabilities) -> Vec { - let peers = self.p2p_server.borrow() - .find_peers(State::Healthy, capab, p2p::MAX_PEER_ADDRS as usize); - debug!(LOGGER, "Got {} peer addrs to send.", peers.len()); - map_vec!(peers, |p| p.addr) - } - - /// A list of peers has been received from one of our peers. - fn peer_addrs_received(&self, peer_addrs: Vec) { - debug!(LOGGER, "Received {} peer addrs, saving.", peer_addrs.len()); - for pa in peer_addrs { - if let Ok(e) = self.p2p_server.borrow().exists_peer(pa) { - if e { - continue; - } - } - let peer = PeerData { - addr: pa, - capabilities: p2p::UNKNOWN, - user_agent: "".to_string(), - flags: State::Healthy, - }; - if let Err(e) = self.p2p_server.borrow().save_peer(&peer) { - error!(LOGGER, "Could not save received peer address: {:?}", e); - } - } - } - - /// Network successfully connected to a peer. - fn peer_connected(&self, pi: &p2p::PeerInfo) { - debug!(LOGGER, "Saving newly connected peer {}.", pi.addr); - let peer = PeerData { - addr: pi.addr, - capabilities: pi.capabilities, - user_agent: pi.user_agent.clone(), - flags: State::Healthy, - }; - if let Err(e) = self.p2p_server.borrow().save_peer(&peer) { - error!(LOGGER, "Could not save connected peer: {:?}", e); - } - } - - fn peer_difficulty(&self, addr: SocketAddr, diff: Difficulty, height: u64) { - debug!( - LOGGER, - "peer total_diff @ height (ping/pong): {}: {} @ {} \ - vs us: {} @ {}", - addr, - diff, - height, - self.total_difficulty(), - self.total_height() - ); - - if diff.into_num() > 0 && self.p2p_server.is_initialized() { - if let Some(peer) = self.p2p_server.borrow().get_peer(&addr) { - let mut peer = peer.write().unwrap(); - peer.info.total_difficulty = diff; - } - } - } } impl NetToChainAdapter { pub fn new( + currently_syncing: Arc, chain_ref: Arc, tx_pool: Arc>>, ) -> NetToChainAdapter { NetToChainAdapter { + currently_syncing: currently_syncing, chain: chain_ref, - p2p_server: OneTime::new(), tx_pool: tx_pool, - syncing: AtomicBool::new(true), } } - /// Setup the p2p server on the adapter - pub fn init(&self, p2p: Arc) { - self.p2p_server.init(p2p); - } - - /// Whether we're currently syncing the chain or we're fully caught up and - /// just receiving blocks through gossip. - pub fn is_syncing(&self) -> bool { - let local_diff = self.total_difficulty(); - let peer = self.p2p_server.borrow().most_work_peer(); - - // if we're already syncing, we're caught up if no peer has a higher - // difficulty than us - if self.syncing.load(Ordering::Relaxed) { - if let Some(peer) = peer { - if let Ok(peer) = peer.try_read() { - if peer.info.total_difficulty <= local_diff { - info!(LOGGER, "sync: caught up on most worked chain, disabling sync"); - self.syncing.store(false, Ordering::Relaxed); - } - } - } else { - info!(LOGGER, "sync: no peers available, disabling sync"); - self.syncing.store(false, Ordering::Relaxed); - } - } else { - if let Some(peer) = peer { - if let Ok(peer) = peer.try_read() { - // sum the last 5 difficulties to give us the threshold - let threshold = self.chain - .difficulty_iter() - .filter_map(|x| x.map(|(_, x)| x).ok()) - .take(5) - .fold(Difficulty::zero(), |sum, val| sum + val); - - if peer.info.total_difficulty > local_diff.clone() + threshold.clone() { - info!( - LOGGER, - "sync: total_difficulty {}, peer_difficulty {}, threshold {} (last 5 blocks), enabling sync", - local_diff, - peer.info.total_difficulty, - threshold, - ); - self.syncing.store(true, Ordering::Relaxed); - } - } - } - } - self.syncing.load(Ordering::Relaxed) - } - // recursively go back through the locator vector and stop when we find // a header that we recognize this will be a header shared in common // between us and the peer @@ -358,7 +240,7 @@ impl NetToChainAdapter { /// Prepare options for the chain pipeline fn chain_opts(&self) -> chain::Options { - let opts = if self.is_syncing() { + let opts = if self.currently_syncing.load(Ordering::Relaxed) { chain::SYNC } else { chain::NONE @@ -372,7 +254,7 @@ impl NetToChainAdapter { /// the network to broadcast the block pub struct ChainToPoolAndNetAdapter { tx_pool: Arc>>, - p2p: OneTime>, + peers: OneTime, } impl ChainAdapter for ChainToPoolAndNetAdapter { @@ -387,7 +269,7 @@ impl ChainAdapter for ChainToPoolAndNetAdapter { ); } } - self.p2p.borrow().broadcast_block(b); + self.peers.borrow().broadcast_block(b); } } @@ -397,23 +279,23 @@ impl ChainToPoolAndNetAdapter { ) -> ChainToPoolAndNetAdapter { ChainToPoolAndNetAdapter { tx_pool: tx_pool, - p2p: OneTime::new(), + peers: OneTime::new(), } } - pub fn init(&self, p2p: Arc) { - self.p2p.init(p2p); + pub fn init(&self, peers: p2p::Peers) { + self.peers.init(peers); } } /// Adapter between the transaction pool and the network, to relay /// transactions that have been accepted. pub struct PoolToNetAdapter { - p2p: OneTime>, + peers: OneTime, } impl pool::PoolAdapter for PoolToNetAdapter { fn tx_accepted(&self, tx: &core::Transaction) { - self.p2p.borrow().broadcast_transaction(tx); + self.peers.borrow().broadcast_transaction(tx); } } @@ -421,13 +303,13 @@ impl PoolToNetAdapter { /// Create a new pool to net adapter pub fn new() -> PoolToNetAdapter { PoolToNetAdapter { - p2p: OneTime::new(), + peers: OneTime::new(), } } /// Setup the p2p server on the adapter - pub fn init(&self, p2p: Arc) { - self.p2p.init(p2p); + pub fn init(&self, peers: p2p::Peers) { + self.peers.init(peers); } } diff --git a/grin/src/seed.rs b/grin/src/seed.rs index 6a0196da1..5ab7971b8 100644 --- a/grin/src/seed.rs +++ b/grin/src/seed.rs @@ -36,18 +36,20 @@ const PEER_PREFERRED_COUNT: u32 = 8; const SEEDS_URL: &'static str = "http://grin-tech.org/seeds.txt"; pub struct Seeder { - p2p: Arc, - + peers: p2p::Peers, + p2p_server: Arc, capabilities: p2p::Capabilities, } impl Seeder { pub fn new( capabilities: p2p::Capabilities, - p2p: Arc, + p2p_server: Arc, + peers: p2p::Peers, ) -> Seeder { Seeder { - p2p: p2p, + peers: peers, + p2p_server: p2p_server, capabilities: capabilities, } } @@ -76,7 +78,7 @@ impl Seeder { &self, tx: mpsc::UnboundedSender, ) -> Box> { - let p2p_server = self.p2p.clone(); + let peers = self.peers.clone(); let capabilities = self.capabilities.clone(); // now spawn a new future to regularly check if we need to acquire more peers @@ -84,19 +86,19 @@ impl Seeder { let mon_loop = Timer::default() .interval(time::Duration::from_secs(30)) .for_each(move |_| { - let total_count = p2p_server.all_peers().len(); + let total_count = peers.all_peers().len(); debug!( LOGGER, "monitor_peers: {} most_work_peers, {} connected, {} total known", - p2p_server.most_work_peers().len(), - p2p_server.connected_peers().len(), + peers.most_work_peers().len(), + peers.connected_peers().len(), total_count, ); let mut healthy_count = 0; let mut banned_count = 0; let mut defunct_count = 0; - for x in p2p_server.all_peers() { + for x in peers.all_peers() { if x.flags == p2p::State::Healthy { healthy_count += 1 } else if x.flags == p2p::State::Banned { banned_count += 1 } else if x.flags == p2p::State::Defunct { defunct_count += 1 }; @@ -113,14 +115,14 @@ impl Seeder { // maintenance step first, clean up p2p server peers { - p2p_server.clean_peers(PEER_PREFERRED_COUNT as usize); + peers.clean_peers(PEER_PREFERRED_COUNT as usize); } // not enough peers, getting more from db - if p2p_server.peer_count() < PEER_PREFERRED_COUNT { + if peers.peer_count() < PEER_PREFERRED_COUNT { // loop over connected peers // ask them for their list of peers - for p in p2p_server.connected_peers() { + for p in peers.connected_peers() { if let Ok(p) = p.try_read() { debug!( LOGGER, @@ -138,7 +140,7 @@ impl Seeder { // find some peers from our db // and queue them up for a connection attempt - let peers = p2p_server.find_peers( + let peers = peers.find_peers( p2p::State::Healthy, p2p::UNKNOWN, 100, @@ -171,11 +173,11 @@ impl Seeder { // db query let thread_pool = cpupool::Builder::new() .pool_size(1).name_prefix("seed").create(); - let p2p_server = self.p2p.clone(); + let peers = self.peers.clone(); let seeder = thread_pool .spawn_fn(move || { // check if we have some peers in db - let peers = p2p_server.find_peers( + let peers = peers.find_peers( p2p::State::Healthy, p2p::FULL_HIST, 100, @@ -215,21 +217,15 @@ impl Seeder { rx: mpsc::UnboundedReceiver, ) -> Box> { let capab = self.capabilities; - let p2p_server = self.p2p.clone(); + let peers = self.peers.clone(); + let p2p_server = self.p2p_server.clone(); let listener = rx.for_each(move |peer_addr| { debug!(LOGGER, "New peer address to connect to: {}.", peer_addr); let inner_h = h.clone(); - if p2p_server.peer_count() < PEER_MAX_COUNT { - h.spawn( - connect_and_req( - capab, - p2p_server.clone(), - inner_h, - peer_addr, - ) - ) - }; + if peers.peer_count() < PEER_MAX_COUNT { + h.spawn(connect_and_req(capab, p2p_server.clone(), peers.clone(), inner_h, peer_addr)) + } Box::new(future::ok(())) }); Box::new(listener) @@ -293,11 +289,12 @@ pub fn predefined_seeds( fn connect_and_req( capab: p2p::Capabilities, p2p: Arc, + peers: p2p::Peers, h: reactor::Handle, addr: SocketAddr, ) -> Box> { let connect_peer = p2p.connect_peer(addr, h); - let p2p_server = p2p.clone(); + let peers = peers.clone(); let fut = connect_peer.then(move |p| { match p { Ok(Some(p)) => { @@ -311,7 +308,7 @@ fn connect_and_req( }, Err(e) => { debug!(LOGGER, "connect_and_req: {} is Defunct; {:?}", addr, e); - let _ = p2p_server.update_state(addr, p2p::State::Defunct); + let _ = peers.update_state(addr, p2p::State::Defunct); }, } Ok(()) diff --git a/grin/src/server.rs b/grin/src/server.rs index 8862fa7f6..48baa35b8 100644 --- a/grin/src/server.rs +++ b/grin/src/server.rs @@ -18,6 +18,7 @@ use std::net::SocketAddr; use std::sync::{Arc, RwLock}; +use std::sync::atomic::{AtomicBool, Ordering}; use std::thread; use std::time; @@ -51,7 +52,7 @@ pub struct Server { chain: Arc, /// in-memory transaction pool tx_pool: Arc>>, - net_adapter: Arc, + currently_syncing: Arc, } impl Server { @@ -108,7 +109,10 @@ impl Server { pool_adapter.set_chain(shared_chain.clone()); + let currently_syncing = Arc::new(AtomicBool::new(true)); + let net_adapter = Arc::new(NetToChainAdapter::new( + currently_syncing.clone(), shared_chain.clone(), tx_pool.clone(), )); @@ -120,11 +124,11 @@ impl Server { net_adapter.clone(), genesis.hash(), )?); - chain_adapter.init(p2p_server.clone()); - pool_net_adapter.init(p2p_server.clone()); - net_adapter.init(p2p_server.clone()); + chain_adapter.init(p2p_server.peers.clone()); + pool_net_adapter.init(p2p_server.peers.clone()); - let seed = seed::Seeder::new(config.capabilities, p2p_server.clone()); + let seed = seed::Seeder::new( + config.capabilities, p2p_server.clone(), p2p_server.peers.clone()); match config.seeding_type.clone() { Seeding::None => { warn!( @@ -152,8 +156,8 @@ impl Server { } sync::run_sync( - net_adapter.clone(), - p2p_server.clone(), + currently_syncing.clone(), + p2p_server.peers.clone(), shared_chain.clone(), ); @@ -165,7 +169,7 @@ impl Server { config.api_http_addr.clone(), shared_chain.clone(), tx_pool.clone(), - p2p_server.clone(), + p2p_server.peers.clone(), ); warn!(LOGGER, "Grin server started."); @@ -175,7 +179,7 @@ impl Server { p2p: p2p_server, chain: shared_chain, tx_pool: tx_pool, - net_adapter: net_adapter, + currently_syncing: currently_syncing, }) } @@ -193,7 +197,7 @@ impl Server { /// Number of peers pub fn peer_count(&self) -> u32 { - self.p2p.peer_count() + self.p2p.peers.peer_count() } /// Start mining for blocks on a separate thread. Uses toy miner by default, @@ -201,7 +205,7 @@ impl Server { pub fn start_miner(&self, config: pow::types::MinerConfig) { let cuckoo_size = global::sizeshift(); let proof_size = global::proofsize(); - let net_adapter = self.net_adapter.clone(); + let currently_syncing = self.currently_syncing.clone(); let mut miner = miner::Miner::new(config.clone(), self.chain.clone(), self.tx_pool.clone()); miner.set_debug_output_id(format!("Port {}", self.config.p2p_config.unwrap().port)); @@ -211,7 +215,7 @@ impl Server { // TODO push this down in the run loop so miner gets paused anytime we // decide to sync again let secs_5 = time::Duration::from_secs(5); - while net_adapter.is_syncing() { + while currently_syncing.load(Ordering::Relaxed) { thread::sleep(secs_5); } miner.run_loop(config.clone(), cuckoo_size as u32, proof_size); diff --git a/grin/src/sync.rs b/grin/src/sync.rs index 2d740273b..1571ff426 100644 --- a/grin/src/sync.rs +++ b/grin/src/sync.rs @@ -15,24 +15,24 @@ use std::thread; use std::time::Duration; use std::sync::{Arc, RwLock}; +use std::sync::atomic::{AtomicBool, Ordering}; use time; -use adapters::NetToChainAdapter; use chain; use core::core::hash::{Hash, Hashed}; -use p2p::{self, Peer}; +use core::core::target::Difficulty; +use p2p::{self, Peer, Peers, ChainAdapter}; use types::Error; use util::LOGGER; /// Starts the syncing loop, just spawns two threads that loop forever pub fn run_sync( - adapter: Arc, - p2p_server: Arc, + currently_syncing: Arc, + peers: p2p::Peers, chain: Arc, ) { - let a_inner = adapter.clone(); - let p2p_inner = p2p_server.clone(); - let c_inner = chain.clone(); + + let chain = chain.clone(); let _ = thread::Builder::new() .name("sync".to_string()) .spawn(move || { @@ -43,13 +43,16 @@ pub fn run_sync( thread::sleep(Duration::from_secs(30)); loop { - if a_inner.is_syncing() { + let syncing = needs_syncing( + currently_syncing.clone(), peers.clone(), chain.clone()); + if syncing { + let current_time = time::now_utc(); // run the header sync every 10s if current_time - prev_header_sync > time::Duration::seconds(10) { header_sync( - p2p_server.clone(), + peers.clone(), chain.clone(), ); prev_header_sync = current_time; @@ -58,8 +61,8 @@ pub fn run_sync( // run the body_sync every 5s if current_time - prev_body_sync > time::Duration::seconds(5) { body_sync( - p2p_inner.clone(), - c_inner.clone(), + peers.clone(), + chain.clone(), ); prev_body_sync = current_time; } @@ -72,10 +75,8 @@ pub fn run_sync( }); } -fn body_sync( - p2p_server: Arc, - chain: Arc, -) { +fn body_sync(peers: Peers, chain: Arc) { + let body_head: chain::Tip = chain.head().unwrap(); let header_head: chain::Tip = chain.get_header_head().unwrap(); let sync_head: chain::Tip = chain.get_sync_head().unwrap(); @@ -112,7 +113,7 @@ fn body_sync( // if we have 5 most_work_peers then ask for 50 blocks total (peer_count * 10) // max will be 80 if all 8 peers are advertising most_work let peer_count = { - p2p_server.most_work_peers().len() + peers.most_work_peers().len() }; let block_count = peer_count * 10; @@ -134,7 +135,7 @@ fn body_sync( ); for hash in hashes_to_get.clone() { - let peer = p2p_server.most_work_peer(); + let peer = peers.most_work_peer(); if let Some(peer) = peer { if let Ok(peer) = peer.try_read() { let _ = peer.send_block_request(hash); @@ -144,14 +145,11 @@ fn body_sync( } } -pub fn header_sync( - p2p_server: Arc, - chain: Arc, -) { +pub fn header_sync(peers: Peers, chain: Arc) { if let Ok(header_head) = chain.get_header_head() { let difficulty = header_head.total_difficulty; - if let Some(peer) = p2p_server.most_work_peer() { + if let Some(peer) = peers.most_work_peer() { if let Ok(p) = peer.try_read() { let peer_difficulty = p.info.total_difficulty.clone(); if peer_difficulty > difficulty { @@ -189,6 +187,57 @@ fn request_headers( Ok(()) } + +/// Whether we're currently syncing the chain or we're fully caught up and +/// just receiving blocks through gossip. +pub fn needs_syncing( + currently_syncing: Arc, + peers: Peers, + chain: Arc) -> bool { + + let local_diff = peers.total_difficulty(); + let peer = peers.most_work_peer(); + + // if we're already syncing, we're caught up if no peer has a higher + // difficulty than us + if currently_syncing.load(Ordering::Relaxed) { + if let Some(peer) = peer { + if let Ok(peer) = peer.try_read() { + if peer.info.total_difficulty <= local_diff { + info!(LOGGER, "sync: caught up on most worked chain, disabling sync"); + currently_syncing.store(false, Ordering::Relaxed); + } + } + } else { + info!(LOGGER, "sync: no peers available, disabling sync"); + currently_syncing.store(false, Ordering::Relaxed); + } + } else { + if let Some(peer) = peer { + if let Ok(peer) = peer.try_read() { + // sum the last 5 difficulties to give us the threshold + let threshold = chain + .difficulty_iter() + .filter_map(|x| x.map(|(_, x)| x).ok()) + .take(5) + .fold(Difficulty::zero(), |sum, val| sum + val); + + if peer.info.total_difficulty > local_diff.clone() + threshold.clone() { + info!( + LOGGER, + "sync: total_difficulty {}, peer_difficulty {}, threshold {} (last 5 blocks), enabling sync", + local_diff, + peer.info.total_difficulty, + threshold, + ); + currently_syncing.store(true, Ordering::Relaxed); + } + } + } + } + currently_syncing.load(Ordering::Relaxed) +} + /// We build a locator based on sync_head. /// Even if sync_head is significantly out of date we will "reset" it once we start getting /// headers back from a peer. diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index cf23fc837..09739187f 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -47,13 +47,15 @@ pub mod handshake; mod rate_limit; mod msg; mod peer; +mod peers; mod protocol; mod server; mod store; mod types; pub use server::{DummyAdapter, Server}; +pub use peers::Peers; pub use peer::Peer; -pub use types::{Capabilities, Error, NetAdapter, P2PConfig, PeerInfo, FULL_HIST, FULL_NODE, - MAX_BLOCK_HEADERS, MAX_PEER_ADDRS, UNKNOWN}; +pub use types::{Capabilities, Error, ChainAdapter, P2PConfig, PeerInfo, FULL_HIST, FULL_NODE, + MAX_BLOCK_HEADERS, MAX_PEER_ADDRS, UNKNOWN}; pub use store::{PeerData, State}; diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index e7ef3b03a..880579d43 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -221,7 +221,7 @@ impl TrackingAdapter { } } -impl NetAdapter for TrackingAdapter { +impl ChainAdapter for TrackingAdapter { fn total_difficulty(&self) -> Difficulty { self.adapter.total_difficulty() } @@ -235,7 +235,7 @@ impl NetAdapter for TrackingAdapter { self.adapter.transaction_received(tx) } - fn block_received(&self, b: core::Block, addr: SocketAddr) { + fn block_received(&self, b: core::Block, addr: SocketAddr) -> bool { self.push(b.hash()); self.adapter.block_received(b, addr) } @@ -251,7 +251,9 @@ impl NetAdapter for TrackingAdapter { fn get_block(&self, h: Hash) -> Option { self.adapter.get_block(h) } +} +impl NetAdapter for TrackingAdapter { fn find_peer_addrs(&self, capab: Capabilities) -> Vec { self.adapter.find_peer_addrs(capab) } @@ -260,10 +262,6 @@ impl NetAdapter for TrackingAdapter { self.adapter.peer_addrs_received(addrs) } - fn peer_connected(&self, pi: &PeerInfo) { - self.adapter.peer_connected(pi) - } - fn peer_difficulty(&self, addr: SocketAddr, diff: Difficulty, height:u64) { self.adapter.peer_difficulty(addr, diff, height) } diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs new file mode 100644 index 000000000..892d68b7f --- /dev/null +++ b/p2p/src/peers.rs @@ -0,0 +1,381 @@ +// Copyright 2016 The Grin Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::{Arc, RwLock}; + +use rand::{thread_rng, Rng}; + +use core::core; +use core::core::hash::Hash; +use core::core::target::Difficulty; +use util::LOGGER; + +use peer::Peer; +use store::{PeerStore, PeerData, State}; +use types::*; + +#[derive(Clone)] +pub struct Peers { + pub adapter: Arc, + store: Arc, + peers: Arc>>>>, +} + +unsafe impl Send for Peers {} +unsafe impl Sync for Peers {} + +impl Peers { + pub fn new(store: PeerStore, adapter: Arc) -> Peers { + Peers { + adapter: adapter, + store: Arc::new(store), + peers: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Adds the peer to our internal peer mapping. Note that the peer is still + /// returned so the server can run it. + pub fn add_connected(&self, p: Peer) -> Arc> { + debug!(LOGGER, "Saving newly connected peer {}.", p.info.addr); + let peer_data = PeerData { + addr: p.info.addr, + capabilities: p.info.capabilities, + user_agent: p.info.user_agent.clone(), + flags: State::Healthy, + }; + if let Err(e) = self.save_peer(&peer_data) { + error!(LOGGER, "Could not save connected peer: {:?}", e); + } + + let addr = p.info.addr.clone(); + let apeer = Arc::new(RwLock::new(p)); + { + let mut peers = self.peers.write().unwrap(); + peers.insert(addr, apeer.clone()); + } + apeer.clone() + } + + pub fn is_known(&self, addr: &SocketAddr) -> bool { + self.get_peer(addr).is_some() + } + + pub fn connected_peers(&self) -> Vec>> { + self.peers.read().unwrap().values().map(|p| p.clone()).collect() + } + + /// Get a peer we're connected to by address. + pub fn get_peer(&self, addr: &SocketAddr) -> Option>> { + self.peers.read().unwrap().get(addr).map(|p| p.clone()) + } + + /// Number of peers we're currently connected to. + pub fn peer_count(&self) -> u32 { + self.connected_peers().len() as u32 + } + + /// Return vec of all peers that currently have the most worked branch, + /// showing the highest total difficulty. + pub fn most_work_peers(&self) -> Vec>> { + let peers = self.connected_peers(); + if peers.len() == 0 { + return vec![]; + } + + let max_total_difficulty = peers + .iter() + .map(|x| { + match x.try_read() { + Ok(peer) => peer.info.total_difficulty.clone(), + Err(_) => Difficulty::zero(), + } + }) + .max() + .unwrap(); + + let mut max_peers = peers + .iter() + .filter(|x| { + match x.try_read() { + Ok(peer) => { + peer.info.total_difficulty == max_total_difficulty + }, + Err(_) => false, + } + }) + .cloned() + .collect::>(); + + thread_rng().shuffle(&mut max_peers); + max_peers + } + + /// Returns single random peer with the most worked branch, showing the highest total + /// difficulty. + pub fn most_work_peer(&self) -> Option>> { + match self.most_work_peers().first() { + Some(x) => Some(x.clone()), + None => None + } + } + + /// Returns a random connected peer. + pub fn random_peer(&self) -> Option>> { + let peers = self.connected_peers(); + Some(thread_rng().choose(&peers).unwrap().clone()) + } + + pub fn is_banned(&self, peer_addr: SocketAddr) -> bool { + if let Ok(peer_data) = self.store.get_peer(peer_addr) { + if peer_data.flags == State::Banned { + return true; + } + } + false + } + + /// Bans a peer, disconnecting it if we're currently connected + pub fn ban_peer(&self, peer_addr: &SocketAddr) { + if let Err(e) = self.update_state(peer_addr.clone(), State::Banned) { + error!(LOGGER, "Couldn't ban {}: {:?}", peer_addr, e); + } + + if let Some(peer) = self.get_peer(peer_addr) { + debug!(LOGGER, "Banning peer {}", peer_addr); + // setting peer status will get it removed at the next clean_peer + let peer = peer.write().unwrap(); + peer.set_banned(); + peer.stop(); + } + } + + /// Broadcasts the provided block to all our peers. A peer implementation + /// may drop the broadcast request if it knows the remote peer already has + /// the block. + pub fn broadcast_block(&self, b: &core::Block) { + let peers = self.connected_peers(); + let mut count = 0; + for p in peers { + let p = p.read().unwrap(); + if p.is_connected() { + if let Err(e) = p.send_block(b) { + debug!(LOGGER, "Error sending block to peer: {:?}", e); + } else { + count += 1; + } + } + } + debug!(LOGGER, "Broadcasted block {} to {} peers.", b.header.height, count); + } + + /// Broadcasts the provided transaction to all our peers. A peer + /// implementation may drop the broadcast request if it knows the + /// remote peer already has the transaction. + pub fn broadcast_transaction(&self, tx: &core::Transaction) { + let peers = self.connected_peers(); + for p in peers { + let p = p.read().unwrap(); + if p.is_connected() { + if let Err(e) = p.send_transaction(tx) { + debug!(LOGGER, "Error sending block to peer: {:?}", e); + } + } + } + } + + /// Ping all our connected peers. Always automatically expects a pong back or + /// disconnects. This acts as a liveness test. + pub fn check_all(&self, total_difficulty: Difficulty, height: u64) { + let peers_map = self.peers.read().unwrap(); + for p in peers_map.values() { + let p = p.read().unwrap(); + if p.is_connected() { + let _ = p.send_ping(total_difficulty.clone(), height); + } + } + } + + /// All peer information we have in storage + pub fn all_peers(&self) -> Vec { + self.store.all_peers() + } + + /// Find peers in store (not necessarily connected) and return their data + pub fn find_peers(&self, state: State, cap: Capabilities, count: usize) -> Vec { + self.store.find_peers(state, cap, count) + } + + /// Whether we've already seen a peer with the provided address + pub fn exists_peer(&self, peer_addr: SocketAddr) -> Result { + self.store.exists_peer(peer_addr).map_err(From::from) + } + + /// Saves updated information about a peer + pub fn save_peer(&self, p: &PeerData) -> Result<(), Error> { + self.store.save_peer(p).map_err(From::from) + } + + /// Updates the state of a peer in store + pub fn update_state(&self, peer_addr: SocketAddr, new_state: State) -> Result<(), Error> { + self.store.update_state(peer_addr, new_state).map_err(From::from) + } + + /// Iterate over the peer list and prune all peers we have + /// lost connection to or have been deemed problematic. + /// Also avoid connected peer count getting too high. + pub fn clean_peers(&self, desired_count: usize) { + let mut rm = vec![]; + + // build a list of peers to be cleaned up + for peer in self.connected_peers() { + let peer_inner = peer.read().unwrap(); + if peer_inner.is_banned() { + debug!(LOGGER, "cleaning {:?}, peer banned", peer_inner.info.addr); + rm.push(peer.clone()); + } else if !peer_inner.is_connected() { + debug!(LOGGER, "cleaning {:?}, not connected", peer_inner.info.addr); + rm.push(peer.clone()); + } + } + + // now clean up peer map based on the list to remove + { + let mut peers = self.peers.write().unwrap(); + for p in rm.clone() { + let p = p.read().unwrap(); + peers.remove(&p.info.addr); + } + } + + // ensure we do not have too many connected peers + // really fighting with the double layer of rwlocks here... + let excess_count = { + let peer_count = self.peer_count().clone() as usize; + if peer_count > desired_count { + peer_count - desired_count + } else { + 0 + } + }; + + // map peers to addrs in a block to bound how long we keep the read lock for + let addrs = { + self.connected_peers().iter().map(|x| { + let p = x.read().unwrap(); + p.info.addr.clone() + }).collect::>() + }; + + // now remove them taking a short-lived write lock each time + // maybe better to take write lock once and remove them all? + for x in addrs + .iter() + .take(excess_count) { + let mut peers = self.peers.write().unwrap(); + peers.remove(x); + } + } + + pub fn stop(self) { + let peers = self.connected_peers(); + for peer in peers { + let peer = peer.read().unwrap(); + peer.stop(); + } + } +} + +impl ChainAdapter for Peers { + fn total_difficulty(&self) -> Difficulty { + self.adapter.total_difficulty() + } + fn total_height(&self) -> u64 { + self.adapter.total_height() + } + fn transaction_received(&self, tx: core::Transaction) { + self.adapter.transaction_received(tx) + } + fn block_received(&self, b: core::Block, peer_addr: SocketAddr) -> bool { + if !self.adapter.block_received(b, peer_addr) { + // if the peer sent us a block that's intrinsically bad, they're either + // mistaken or manevolent, both of which require a ban + self.ban_peer(&peer_addr); + false + } else { + true + } + } + fn headers_received(&self, headers: Vec, peer_addr:SocketAddr) { + self.adapter.headers_received(headers, peer_addr) + } + fn locate_headers(&self, hs: Vec) -> Vec { + self.adapter.locate_headers(hs) + } + fn get_block(&self, h: Hash) -> Option { + self.adapter.get_block(h) + } +} + +impl NetAdapter for Peers { + /// Find good peers we know with the provided capability and return their + /// addresses. + fn find_peer_addrs(&self, capab: Capabilities) -> Vec { + let peers = self.find_peers(State::Healthy, capab, MAX_PEER_ADDRS as usize); + debug!(LOGGER, "Got {} peer addrs to send.", peers.len()); + map_vec!(peers, |p| p.addr) + } + + /// A list of peers has been received from one of our peers. + fn peer_addrs_received(&self, peer_addrs: Vec) { + debug!(LOGGER, "Received {} peer addrs, saving.", peer_addrs.len()); + for pa in peer_addrs { + if let Ok(e) = self.exists_peer(pa) { + if e { + continue; + } + } + let peer = PeerData { + addr: pa, + capabilities: UNKNOWN, + user_agent: "".to_string(), + flags: State::Healthy, + }; + if let Err(e) = self.save_peer(&peer) { + error!(LOGGER, "Could not save received peer address: {:?}", e); + } + } + } + + fn peer_difficulty(&self, addr: SocketAddr, diff: Difficulty, height: u64) { + debug!( + LOGGER, + "peer total_diff @ height (ping/pong): {}: {} @ {} \ + vs us: {} @ {}", + addr, + diff, + height, + self.total_difficulty(), + self.total_height() + ); + + if diff.into_num() > 0 { + if let Some(peer) = self.get_peer(&addr) { + let mut peer = peer.write().unwrap(); + peer.info.total_difficulty = diff; + } + } + } +} diff --git a/p2p/src/server.rs b/p2p/src/server.rs index 9cbe3ca72..884d7fd44 100644 --- a/p2p/src/server.rs +++ b/p2p/src/server.rs @@ -16,7 +16,6 @@ //! other peers in the network. use std::cell::RefCell; -use std::collections::HashMap; use std::net::{SocketAddr, Shutdown}; use std::sync::{Arc, RwLock}; use std::time::Duration; @@ -24,7 +23,6 @@ use std::time::Duration; use futures; use futures::{Future, Stream}; use futures::future::{self, IntoFuture}; -use rand::{thread_rng, Rng}; use tokio_core::net::{TcpListener, TcpStream}; use tokio_core::reactor; use tokio_timer::Timer; @@ -34,13 +32,14 @@ use core::core::hash::Hash; use core::core::target::Difficulty; use handshake::Handshake; use peer::Peer; -use store::{PeerStore, PeerData, State}; +use peers::Peers; +use store::PeerStore; use types::*; use util::LOGGER; /// A no-op network adapter used for testing. pub struct DummyAdapter {} -impl NetAdapter for DummyAdapter { +impl ChainAdapter for DummyAdapter { fn total_difficulty(&self) -> Difficulty { Difficulty::one() } @@ -48,7 +47,7 @@ impl NetAdapter for DummyAdapter { 0 } fn transaction_received(&self, _: core::Transaction) {} - fn block_received(&self, _: core::Block, _: SocketAddr) {} + fn block_received(&self, _: core::Block, _: SocketAddr) -> bool { true } fn headers_received(&self, _: Vec, _:SocketAddr) {} fn locate_headers(&self, _: Vec) -> Vec { vec![] @@ -56,11 +55,12 @@ impl NetAdapter for DummyAdapter { fn get_block(&self, _: Hash) -> Option { None } +} +impl NetAdapter for DummyAdapter { fn find_peer_addrs(&self, _: Capabilities) -> Vec { vec![] } fn peer_addrs_received(&self, _: Vec) {} - fn peer_connected(&self, _: &PeerInfo) {} fn peer_difficulty(&self, _: SocketAddr, _: Difficulty, _:u64) {} } @@ -69,10 +69,8 @@ impl NetAdapter for DummyAdapter { pub struct Server { config: P2PConfig, capabilities: Capabilities, - store: Arc, - peers: Arc>>>>, handshake: Arc, - adapter: Arc, + pub peers: Peers, stop: RefCell>>, } @@ -86,16 +84,13 @@ impl Server { db_root: String, capab: Capabilities, config: P2PConfig, - adapter: Arc, - genesis: Hash, + adapter: Arc, ) -> Result { Ok(Server { config: config, capabilities: capab, - store: Arc::new(PeerStore::new(db_root)?), - peers: Arc::new(RwLock::new(HashMap::new())), - handshake: Arc::new(Handshake::new(genesis)), - adapter: adapter, + handshake: Arc::new(Handshake::new()), + peers: Peers::new(PeerStore::new(db_root)?, adapter), stop: RefCell::new(None), }) } @@ -109,39 +104,33 @@ impl Server { let handshake = self.handshake.clone(); let peers = self.peers.clone(); - let adapter = self.adapter.clone(); let capab = self.capabilities.clone(); - let store = self.store.clone(); // main peer acceptance future handling handshake let hp = h.clone(); let peers_listen = socket.incoming().map_err(From::from).map(move |(conn, _)| { // aaaand.. reclone for the internal closures - let adapter = adapter.clone(); - let store = store.clone(); let peers = peers.clone(); + let peers2 = peers.clone(); let handshake = handshake.clone(); let hp = hp.clone(); future::ok(conn).and_then(move |conn| { // Refuse banned peers connection if let Ok(peer_addr) = conn.peer_addr() { - if let Ok(peer_data) = store.get_peer(peer_addr) { - if peer_data.flags == State::Banned { - debug!(LOGGER, "Peer {} banned, refusing connection.", peer_addr); - if let Err(e) = conn.shutdown(Shutdown::Both) { - debug!(LOGGER, "Error shutting down conn: {:?}", e); - } - return Err(Error::Banned) + if peers.is_banned(peer_addr) { + debug!(LOGGER, "Peer {} banned, refusing connection.", peer_addr); + if let Err(e) = conn.shutdown(Shutdown::Both) { + debug!(LOGGER, "Error shutting down conn: {:?}", e); } + return Err(Error::Banned) } } Ok(conn) }).and_then(move |conn| { - let peers = peers.clone(); - let total_diff = adapter.total_difficulty(); + let total_diff = peers2.total_difficulty(); // accept the peer and add it to the server map let accept = Peer::accept( @@ -149,9 +138,9 @@ impl Server { capab, total_diff, &handshake.clone(), - adapter.clone(), + Arc::new(peers2.clone()), ); - let added = add_to_peers(peers, adapter.clone(), accept); + let added = add_to_peers(peers2, accept); // wire in a future to timeout the accept after 5 secs let timed_peer = with_timeout(Box::new(added), &hp); @@ -186,14 +175,13 @@ impl Server { // timer to regularly check on our peers by pinging them - let adapter = self.adapter.clone(); let peers_inner = self.peers.clone(); let peers_timer = Timer::default() .interval(Duration::new(20, 0)) .fold((), move |_, _| { - let total_diff = adapter.total_difficulty(); - let total_height = adapter.total_height(); - check_peers(peers_inner.clone(), total_diff, total_height); + let total_diff = peers_inner.total_difficulty(); + let total_height = peers_inner.total_height(); + peers_inner.check_all(total_diff, total_height); Ok(()) }); @@ -218,7 +206,8 @@ impl Server { addr: SocketAddr, h: reactor::Handle, ) -> Box>>, Error = Error>> { - if let Some(p) = self.get_peer(&addr) { + + if let Some(p) = self.peers.get_peer(&addr) { // if we're already connected to the addr, just return the peer debug!(LOGGER, "connect_peer: already connected {}", addr); return Box::new(future::ok(Some(p))); @@ -229,7 +218,6 @@ impl Server { // cloneapalooza let peers = self.peers.clone(); let handshake = self.handshake.clone(); - let adapter = self.adapter.clone(); let capab = self.capabilities.clone(); let self_addr = SocketAddr::new(self.config.host, self.config.port); @@ -245,8 +233,7 @@ impl Server { let h2 = h.clone(); let request = socket_connect .and_then(move |socket| { - let peers = peers.clone(); - let total_diff = adapter.clone().total_difficulty(); + let total_diff = peers.total_difficulty(); // connect to the peer and add it to the server map, wiring it a timeout for // the handshake @@ -256,9 +243,9 @@ impl Server { total_diff, self_addr, handshake.clone(), - adapter.clone(), + Arc::new(peers.clone()), ); - let added = add_to_peers(peers, adapter, connect); + let added = add_to_peers(peers, connect); with_timeout(Box::new(added), &h) }) .and_then(move |(socket, peer)| { @@ -272,256 +259,26 @@ impl Server { Box::new(request) } - /// Check if the server already knows this peer (is already connected). - pub fn is_known(&self, addr: &SocketAddr) -> bool { - self.get_peer(addr).is_some() - } - - pub fn connected_peers(&self) -> Vec>> { - self.peers.read().unwrap().values().map(|p| p.clone()).collect() - } - - /// Get a peer we're connected to by address. - pub fn get_peer(&self, addr: &SocketAddr) -> Option>> { - self.peers.read().unwrap().get(addr).map(|p| p.clone()) - } - - /// Have the server iterate over its peer list and prune all peers we have - /// lost connection to or have been deemed problematic. - /// Also avoid connected peer count getting too high. - pub fn clean_peers(&self, desired_count: usize) { - let mut rm = vec![]; - - // build a list of peers to be cleaned up - for peer in self.connected_peers() { - let peer_inner = peer.read().unwrap(); - if peer_inner.is_banned() { - debug!(LOGGER, "cleaning {:?}, peer banned", peer_inner.info.addr); - rm.push(peer.clone()); - } else if !peer_inner.is_connected() { - debug!(LOGGER, "cleaning {:?}, not connected", peer_inner.info.addr); - rm.push(peer.clone()); - } - } - - // now clean up peer map based on the list to remove - { - let mut peers = self.peers.write().unwrap(); - for p in rm.clone() { - let p = p.read().unwrap(); - peers.remove(&p.info.addr); - } - } - - // ensure we do not have too many connected peers - // really fighting with the double layer of rwlocks here... - let excess_count = { - let peer_count = self.peer_count().clone() as usize; - if peer_count > desired_count { - peer_count - desired_count - } else { - 0 - } - }; - - // map peers to addrs in a block to bound how long we keep the read lock for - let addrs = { - self.connected_peers().iter().map(|x| { - let p = x.read().unwrap(); - p.info.addr.clone() - }).collect::>() - }; - - // now remove them taking a short-lived write lock each time - // maybe better to take write lock once and remove them all? - for x in addrs - .iter() - .take(excess_count) { - let mut peers = self.peers.write().unwrap(); - peers.remove(x); - } - } - - /// Return vec of all peers that currently have the most worked branch, - /// showing the highest total difficulty. - pub fn most_work_peers(&self) -> Vec>> { - let peers = self.connected_peers(); - if peers.len() == 0 { - return vec![]; - } - - let max_total_difficulty = peers - .iter() - .map(|x| { - match x.try_read() { - Ok(peer) => peer.info.total_difficulty.clone(), - Err(_) => Difficulty::zero(), - } - }) - .max() - .unwrap(); - - let mut max_peers = peers - .iter() - .filter(|x| { - match x.try_read() { - Ok(peer) => { - peer.info.total_difficulty == max_total_difficulty - }, - Err(_) => false, - } - }) - .cloned() - .collect::>(); - - thread_rng().shuffle(&mut max_peers); - max_peers - } - - /// Returns single random peer with the most worked branch, showing the highest total - /// difficulty. - pub fn most_work_peer(&self) -> Option>> { - match self.most_work_peers().first() { - Some(x) => Some(x.clone()), - None => None - } - } - - /// Returns a random connected peer. - pub fn random_peer(&self) -> Option>> { - let peers = self.connected_peers(); - Some(thread_rng().choose(&peers).unwrap().clone()) - } - - /// Broadcasts the provided block to all our peers. A peer implementation - /// may drop the broadcast request if it knows the remote peer already has - /// the block. - pub fn broadcast_block(&self, b: &core::Block) { - let peers = self.connected_peers(); - let mut count = 0; - for p in peers { - let p = p.read().unwrap(); - if p.is_connected() { - if let Err(e) = p.send_block(b) { - debug!(LOGGER, "Error sending block to peer: {:?}", e); - } else { - count += 1; - } - } - } - debug!(LOGGER, "Broadcasted block {} to {} peers.", b.header.height, count); - } - - /// Broadcasts the provided transaction to all our peers. A peer - /// implementation may drop the broadcast request if it knows the - /// remote peer already has the transaction. - pub fn broadcast_transaction(&self, tx: &core::Transaction) { - let peers = self.connected_peers(); - for p in peers { - let p = p.read().unwrap(); - if p.is_connected() { - if let Err(e) = p.send_transaction(tx) { - debug!(LOGGER, "Error sending block to peer: {:?}", e); - } - } - } - } - - /// Number of peers we're currently connected to. - pub fn peer_count(&self) -> u32 { - self.connected_peers().len() as u32 - } - - /// Bans a peer, disconnecting it if we're currently connected - pub fn ban_peer(&self, peer_addr: &SocketAddr) { - if let Err(e) = self.update_state(peer_addr.clone(), State::Banned) { - error!(LOGGER, "Couldn't ban {}: {:?}", peer_addr, e); - } - - if let Some(peer) = self.get_peer(peer_addr) { - debug!(LOGGER, "Banning peer {}", peer_addr); - // setting peer status will get it removed at the next clean_peer - let peer = peer.write().unwrap(); - peer.set_banned(); - peer.stop(); - } - } - /// Stops the server. Disconnect from all peers at the same time. pub fn stop(self) { info!(LOGGER, "calling stop on server"); - let peers = self.connected_peers(); - for peer in peers { - let peer = peer.read().unwrap(); - peer.stop(); - } + self.peers.stop(); self.stop.into_inner().unwrap().send(()).unwrap(); } - - /// All peer information we have in storage - pub fn all_peers(&self) -> Vec { - self.store.all_peers() - } - - /// Find peers in store (not necessarily connected) and return their data - pub fn find_peers(&self, state: State, cap: Capabilities, count: usize) -> Vec { - self.store.find_peers(state, cap, count) - } - - /// Whether we've already seen a peer with the provided address - pub fn exists_peer(&self, peer_addr: SocketAddr) -> Result { - self.store.exists_peer(peer_addr).map_err(From::from) - } - - /// Saves updated information about a peer - pub fn save_peer(&self, p: &PeerData) -> Result<(), Error> { - self.store.save_peer(p).map_err(From::from) - } - - /// Updates the state of a peer in store - pub fn update_state(&self, peer_addr: SocketAddr, new_state: State) -> Result<(), Error> { - self.store.update_state(peer_addr, new_state).map_err(From::from) - } } // Adds the peer built by the provided future in the peers map -fn add_to_peers( - peers: Arc>>>>, - adapter: Arc, - peer_fut: A, -) -> Box>), ()>, Error = Error>> -where - A: IntoFuture + 'static, -{ +fn add_to_peers(peers: Peers, peer_fut: A) + -> Box>), ()>, Error = Error>> + where A: IntoFuture + 'static { + let peer_add = peer_fut.into_future().map(move |(conn, peer)| { - adapter.peer_connected(&peer.info); - let addr = peer.info.addr.clone(); - let apeer = Arc::new(RwLock::new(peer)); - { - let mut peers = peers.write().unwrap(); - peers.insert(addr, apeer.clone()); - } + let apeer = peers.add_connected(peer); Ok((conn, apeer)) }); Box::new(peer_add) } -// Ping all our connected peers. Always automatically expects a pong back or -// disconnects. This acts as a liveness test. -fn check_peers( - peers: Arc>>>>, - total_difficulty: Difficulty, - height: u64 -) { - let peers_map = peers.read().unwrap(); - for p in peers_map.values() { - let p = p.read().unwrap(); - if p.is_connected() { - let _ = p.send_ping(total_difficulty.clone(), height); - } - } -} - // Adds a timeout to a future fn with_timeout( fut: Box, Error = Error>>, diff --git a/p2p/src/types.rs b/p2p/src/types.rs index af71951a3..d0c292104 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -162,7 +162,7 @@ pub trait Protocol { /// Bridge between the networking layer and the rest of the system. Handles the /// forwarding or querying of blocks and transactions from the network among /// other things. -pub trait NetAdapter: Sync + Send { +pub trait ChainAdapter: Sync + Send { /// Current total difficulty on our chain fn total_difficulty(&self) -> Difficulty; @@ -172,8 +172,11 @@ pub trait NetAdapter: Sync + Send { /// A valid transaction has been received from one of our peers fn transaction_received(&self, tx: core::Transaction); - /// A block has been received from one of our peers - fn block_received(&self, b: core::Block, addr: SocketAddr); + /// A block has been received from one of our peers. Returns true if the + /// block could be handled properly and is not deemed defective by the + /// chain. Returning false means the block will nenver be valid and + /// may result in the peer being banned. + fn block_received(&self, b: core::Block, addr: SocketAddr) -> bool; /// A set of block header has been received, typically in response to a /// block @@ -187,7 +190,11 @@ pub trait NetAdapter: Sync + Send { /// Gets a full block by its hash. fn get_block(&self, h: Hash) -> Option; +} +/// Additional methods required by the protocol that don't need to be +/// externally implemented. +pub trait NetAdapter: ChainAdapter { /// Find good peers we know with the provided capability and return their /// addresses. fn find_peer_addrs(&self, capab: Capabilities) -> Vec; @@ -195,9 +202,6 @@ pub trait NetAdapter: Sync + Send { /// A list of peers has been received from one of our peers. fn peer_addrs_received(&self, Vec); - /// Network successfully connected to a peer. - fn peer_connected(&self, &PeerInfo); - /// Heard total_difficulty from a connected peer (via ping/pong). fn peer_difficulty(&self, SocketAddr, Difficulty, u64); } diff --git a/p2p/tests/peer_handshake.rs b/p2p/tests/peer_handshake.rs index c43cc097a..e872739a9 100644 --- a/p2p/tests/peer_handshake.rs +++ b/p2p/tests/peer_handshake.rs @@ -84,7 +84,7 @@ fn peer_handshake() { Ok(()) }) .and_then(|_| { - assert!(server.peer_count() > 0); + assert!(server.peers.peer_count() > 0); server.stop(); Ok(()) }) diff --git a/src/bin/grin.rs b/src/bin/grin.rs index 9a5a33df0..64dff73c5 100644 --- a/src/bin/grin.rs +++ b/src/bin/grin.rs @@ -40,7 +40,6 @@ use clap::{App, Arg, ArgMatches, SubCommand}; use daemonize::Daemonize; use config::GlobalConfig; -use wallet::WalletConfig; use core::global; use core::core::amount_to_hr_string; use util::{init_logger, LoggingConfig, LOGGER}; From 487e50c3d28df053d8ef9288fed1529df571d02e Mon Sep 17 00:00:00 2001 From: AntiochP <30642645+antiochp@users.noreply.github.com> Date: Mon, 20 Nov 2017 12:35:52 -0500 Subject: [PATCH 11/11] compare genesis during peering handshake (#327) * wip - send genesis in handshake * error if genesis mismatch on handshake * fix the tests * preserve order of existing fields in hand/shake --- p2p/src/server.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/p2p/src/server.rs b/p2p/src/server.rs index 884d7fd44..59c1d570d 100644 --- a/p2p/src/server.rs +++ b/p2p/src/server.rs @@ -85,11 +85,12 @@ impl Server { capab: Capabilities, config: P2PConfig, adapter: Arc, + genesis: Hash, ) -> Result { Ok(Server { config: config, capabilities: capab, - handshake: Arc::new(Handshake::new()), + handshake: Arc::new(Handshake::new(genesis)), peers: Peers::new(PeerStore::new(db_root)?, adapter), stop: RefCell::new(None), })