Get peers as quick as possible ()

* Get peers as quick as possible. Currently we get 2 peers and then wait 20 seconds before trying to get more
* Second pass
* Fix peer_count
This commit is contained in:
hashmap 2018-08-23 03:31:06 +02:00 committed by Ignotus Peverell
parent 9554f93e1d
commit 7dd2888b71
5 changed files with 160 additions and 141 deletions
p2p/src
servers

View file

@ -16,8 +16,8 @@ use std::collections::VecDeque;
use std::net::{SocketAddr, TcpStream};
use std::sync::{Arc, RwLock};
use rand::Rng;
use rand::os::OsRng;
use rand::Rng;
use core::core::hash::Hash;
use core::core::target::Difficulty;

View file

@ -114,6 +114,7 @@ impl Peers {
.read()
.unwrap()
.values()
.filter(|p| p.read().unwrap().is_connected())
.cloned()
.collect::<Vec<_>>();
thread_rng().shuffle(&mut res);
@ -127,7 +128,8 @@ impl Peers {
.filter(|x| match x.try_read() {
Ok(peer) => peer.info.direction == Direction::Outbound,
Err(_) => false,
}).collect::<Vec<_>>();
})
.collect::<Vec<_>>();
res
}
@ -138,7 +140,12 @@ impl Peers {
/// Number of peers we're currently connected to.
pub fn peer_count(&self) -> u32 {
self.peers.read().unwrap().len() as u32
self.peers
.read()
.unwrap()
.values()
.filter(|p| p.read().unwrap().is_connected())
.count() as u32
}
// Return vec of connected peers that currently advertise more work
@ -156,7 +163,8 @@ impl Peers {
.filter(|x| match x.try_read() {
Ok(peer) => peer.info.total_difficulty > total_difficulty,
Err(_) => false,
}).collect::<Vec<_>>();
})
.collect::<Vec<_>>();
thread_rng().shuffle(&mut max_peers);
max_peers
@ -180,7 +188,8 @@ impl Peers {
&& peer.info.capabilities.contains(Capabilities::FULL_HIST)
}
Err(_) => false,
}).collect::<Vec<_>>();
})
.collect::<Vec<_>>();
thread_rng().shuffle(&mut max_peers);
max_peers
@ -209,7 +218,8 @@ impl Peers {
.map(|x| match x.try_read() {
Ok(peer) => peer.info.total_difficulty.clone(),
Err(_) => Difficulty::zero(),
}).max()
})
.max()
.unwrap();
let mut max_peers = peers
@ -217,7 +227,8 @@ impl Peers {
.filter(|x| match x.try_read() {
Ok(peer) => peer.info.total_difficulty == max_total_difficulty,
Err(_) => false,
}).collect::<Vec<_>>();
})
.collect::<Vec<_>>();
thread_rng().shuffle(&mut max_peers);
max_peers
@ -267,10 +278,10 @@ impl Peers {
Ok(_) => {
if self.is_banned(*peer_addr) {
if let Err(e) = self.update_state(*peer_addr, State::Healthy) {
error!(LOGGER, "Couldn't unban {}: {:?}", peer_addr, e)
error!(LOGGER, "Couldn't unban {}: {:?}", peer_addr, e);
}
} else {
error!(LOGGER, "Couldn't unban {}: peer is not banned", peer_addr)
error!(LOGGER, "Couldn't unban {}: peer is not banned", peer_addr);
}
}
Err(e) => error!(LOGGER, "Couldn't unban {}: {:?}", peer_addr, e),
@ -484,7 +495,8 @@ impl Peers {
.map(|x| {
let p = x.read().unwrap();
p.info.addr.clone()
}).collect::<Vec<_>>()
})
.collect::<Vec<_>>()
};
// now remove them taking a short-lived write lock each time

View file

@ -16,14 +16,14 @@
//! a mining worker implementation
//!
use chrono::prelude::Utc;
use chrono::Duration;
use std::net::{SocketAddr, ToSocketAddrs};
use std::str;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{mpsc, Arc};
use std::thread;
use std::time;
use chrono::prelude::{Utc};
use chrono::Duration;
use api;
@ -60,7 +60,9 @@ pub fn connect_and_monitor(
loop {
let current_time = Utc::now();
if current_time - prev > Duration::seconds(20) {
if peers.peer_count() < p2p_server.config.peer_min_preferred_count()
|| current_time - prev > Duration::seconds(20)
{
// try to connect to any address sent to the channel
listen_for_addrs(peers.clone(), p2p_server.clone(), capabilities, &rx);
@ -122,7 +124,7 @@ fn monitor_peers(
LOGGER,
"monitor_peers: {} connected ({} most_work). \
all {} = {} healthy + {} banned + {} defunct",
peers.connected_peers().len(),
peers.peer_count(),
peers.most_work_peers().len(),
total_count,
healthy_count,
@ -151,8 +153,12 @@ fn monitor_peers(
// find some peers from our db
// and queue them up for a connection attempt
let peers = peers.find_peers(p2p::State::Healthy, p2p::Capabilities::UNKNOWN, 100);
for p in peers {
let new_peers = peers.find_peers(
p2p::State::Healthy,
p2p::Capabilities::UNKNOWN,
config.peer_max_count() as usize,
);
for p in new_peers.iter().filter(|p| !peers.is_known(&p.addr)) {
debug!(LOGGER, "monitor_peers: queue to soon try {}", p.addr);
tx.send(p.addr).unwrap();
}
@ -268,7 +274,8 @@ pub fn dns_seeds() -> Box<Fn() -> Vec<SocketAddr> + Send> {
pub fn web_seeds() -> Box<Fn() -> Vec<SocketAddr> + Send> {
Box::new(|| {
let text: String = api::client::get(SEEDS_URL).expect("Failed to resolve seeds");
let addrs = text.split_whitespace()
let addrs = text
.split_whitespace()
.map(|s| s.parse().unwrap())
.collect::<Vec<_>>();
debug!(LOGGER, "Retrieved seed addresses: {:?}", addrs);

View file

@ -12,12 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time;
use std::{cmp, thread};
use chrono::prelude::{DateTime, Utc};
use chrono::Duration;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time;
use std::{cmp, thread};
use chain;
use common::types::{Error, SyncState, SyncStatus};
@ -68,122 +68,132 @@ pub fn run_sync(
stop: Arc<AtomicBool>,
) {
let chain = chain.clone();
let _ = thread::Builder::new()
.name("sync".to_string())
.spawn(move || {
let mut si = SyncInfo::new();
let _ =
thread::Builder::new()
.name("sync".to_string())
.spawn(move || {
let mut si = SyncInfo::new();
{
// Initial sleep to give us time to peer with some nodes.
// Note: Even if we have "skip_sync_wait" we need to wait a
// short period of time for tests to do the right thing.
let wait_secs = if skip_sync_wait {
3
} else {
30
};
{
// Initial sleep to give us time to peer with some nodes.
// Note: Even if we have "skip_sync_wait" we need to wait a
// short period of time for tests to do the right thing.
let wait_secs = if skip_sync_wait { 3 } else { 30 };
awaiting_peers.store(true, Ordering::Relaxed);
let mut n = 0;
while peers.more_work_peers().len() < 4 && n < wait_secs {
thread::sleep(time::Duration::from_secs(1));
n += 1;
}
awaiting_peers.store(false, Ordering::Relaxed);
}
// fast sync has 3 "states":
// * syncing headers
// * once all headers are sync'd, requesting the txhashset state
// * once we have the state, get blocks after that
//
// full sync gets rid of the middle step and just starts from
// the genesis state
loop {
let horizon = global::cut_through_horizon() as u64;
let head = chain.head().unwrap();
let header_head = chain.get_header_head().unwrap();
// is syncing generally needed when we compare our state with others
let (syncing, most_work_height) =
needs_syncing(sync_state.as_ref(), peers.clone(), chain.clone());
if most_work_height > 0 {
// we can occasionally get a most work height of 0 if read locks fail
si.highest_height = most_work_height;
awaiting_peers.store(true, Ordering::Relaxed);
let mut n = 0;
while peers.more_work_peers().len() < 4 && n < wait_secs {
thread::sleep(time::Duration::from_secs(1));
n += 1;
}
awaiting_peers.store(false, Ordering::Relaxed);
}
if syncing {
let fast_sync_enabled =
!archive_mode && si.highest_height.saturating_sub(head.height) > horizon;
// fast sync has 3 "states":
// * syncing headers
// * once all headers are sync'd, requesting the txhashset state
// * once we have the state, get blocks after that
//
// full sync gets rid of the middle step and just starts from
// the genesis state
// run the header sync every 10s
if si.header_sync_due(&header_head) {
header_sync(peers.clone(), chain.clone());
loop {
let horizon = global::cut_through_horizon() as u64;
let head = chain.head().unwrap();
let header_head = chain.get_header_head().unwrap();
let status = sync_state.status();
match status{
SyncStatus::TxHashsetDownload => (),
_ => {
sync_state.update(SyncStatus::HeaderSync{current_height: header_head.height, highest_height: si.highest_height});
}
};
// is syncing generally needed when we compare our state with others
let (syncing, most_work_height) =
needs_syncing(sync_state.as_ref(), peers.clone(), chain.clone());
if most_work_height > 0 {
// we can occasionally get a most work height of 0 if read locks fail
si.highest_height = most_work_height;
}
if fast_sync_enabled {
if syncing {
let fast_sync_enabled = !archive_mode
&& si.highest_height.saturating_sub(head.height) > horizon;
// check sync error
{
let mut sync_error_need_clear = false;
{
let clone = sync_state.sync_error();
if let Some(ref sync_error) = *clone.read().unwrap() {
error!(LOGGER, "fast_sync: error = {:?}. restart fast sync", sync_error);
si.fast_sync_reset();
sync_error_need_clear = true;
// run the header sync every 10s
if si.header_sync_due(&header_head) {
header_sync(peers.clone(), chain.clone());
let status = sync_state.status();
match status {
SyncStatus::TxHashsetDownload => (),
_ => {
sync_state.update(SyncStatus::HeaderSync {
current_height: header_head.height,
highest_height: si.highest_height,
});
}
drop(clone);
}
if sync_error_need_clear {
sync_state.clear_sync_error();
}
};
}
// run fast sync if applicable, normally only run one-time, except restart in error
if header_head.height == si.highest_height {
let (go,download_timeout) = si.fast_sync_due();
if go {
if let Err(e) = fast_sync(peers.clone(), chain.clone(), &header_head) {
sync_state.set_sync_error(Error::P2P(e));
if fast_sync_enabled {
// check sync error
{
let mut sync_error_need_clear = false;
{
let clone = sync_state.sync_error();
if let Some(ref sync_error) = *clone.read().unwrap() {
error!(
LOGGER,
"fast_sync: error = {:?}. restart fast sync",
sync_error
);
si.fast_sync_reset();
sync_error_need_clear = true;
}
drop(clone);
}
if sync_error_need_clear {
sync_state.clear_sync_error();
}
sync_state.update(SyncStatus::TxHashsetDownload);
}
if SyncStatus::TxHashsetDownload == sync_state.status() && download_timeout {
error!(LOGGER, "fast_sync: TxHashsetDownload status timeout in 10 minutes!");
sync_state.set_sync_error(Error::P2P(p2p::Error::Timeout));
// run fast sync if applicable, normally only run one-time, except restart in error
if header_head.height == si.highest_height {
let (go, download_timeout) = si.fast_sync_due();
if go {
if let Err(e) =
fast_sync(peers.clone(), chain.clone(), &header_head)
{
sync_state.set_sync_error(Error::P2P(e));
}
sync_state.update(SyncStatus::TxHashsetDownload);
}
if SyncStatus::TxHashsetDownload == sync_state.status()
&& download_timeout
{
error!(LOGGER, "fast_sync: TxHashsetDownload status timeout in 10 minutes!");
sync_state.set_sync_error(Error::P2P(p2p::Error::Timeout));
}
}
} else {
// run the body_sync every 5s
if si.body_sync_due(&head) {
body_sync(peers.clone(), chain.clone());
sync_state.update(SyncStatus::BodySync {
current_height: head.height,
highest_height: si.highest_height,
});
}
}
} else {
// run the body_sync every 5s
if si.body_sync_due(&head) {
body_sync(peers.clone(), chain.clone());
sync_state.update(SyncStatus::BodySync{current_height: head.height, highest_height: si.highest_height});
}
sync_state.update(SyncStatus::NoSync);
}
} else {
sync_state.update(SyncStatus::NoSync);
}
thread::sleep(time::Duration::from_secs(1));
thread::sleep(time::Duration::from_secs(1));
if stop.load(Ordering::Relaxed) {
break;
if stop.load(Ordering::Relaxed) {
break;
}
}
}
});
});
}
fn body_sync(peers: Arc<Peers>, chain: Arc<chain::Chain>) {
@ -284,12 +294,15 @@ fn header_sync(peers: Arc<Peers>, chain: Arc<chain::Chain>) {
}
}
fn fast_sync(peers: Arc<Peers>, chain: Arc<chain::Chain>, header_head: &chain::Tip) -> Result<(), p2p::Error> {
fn fast_sync(
peers: Arc<Peers>,
chain: Arc<chain::Chain>,
header_head: &chain::Tip,
) -> Result<(), p2p::Error> {
let horizon = global::cut_through_horizon() as u64;
if let Some(peer) = peers.most_work_peer() {
if let Ok(p) = peer.try_read() {
// ask for txhashset at 90% of horizon, this still leaves time for download
// and validation to happen and stay within horizon
let mut txhashset_head = chain.get_block_header(&header_head.prev_block_h).unwrap();
@ -306,11 +319,7 @@ fn fast_sync(peers: Arc<Peers>, chain: Arc<chain::Chain>, header_head: &chain::T
bhash
);
if let Err(e) = p.send_txhashset_request(txhashset_head.height, bhash) {
error!(
LOGGER,
"fast_sync: send_txhashset_request err! {:?}",
e
);
error!(LOGGER, "fast_sync: send_txhashset_request err! {:?}", e);
return Err(e);
}
return Ok(());
@ -488,20 +497,20 @@ impl SyncInfo {
}
// For now this is a one-time thing (it can be slow) at initial startup.
fn fast_sync_due(&mut self) -> (bool,bool) {
fn fast_sync_due(&mut self) -> (bool, bool) {
let now = Utc::now();
let mut download_timeout = false;
match self.prev_fast_sync {
None => {
self.prev_fast_sync = Some(now);
(true,download_timeout)
(true, download_timeout)
}
Some(prev) => {
if now - prev > Duration::minutes(10) {
download_timeout = true;
}
(false,download_timeout)
(false, download_timeout)
}
}
}
@ -509,7 +518,6 @@ impl SyncInfo {
fn fast_sync_reset(&mut self) {
self.prev_fast_sync = None;
}
}
#[cfg(test)]
@ -533,7 +541,7 @@ mod test {
assert_eq!(
get_locator_heights(10000),
vec![
10000, 9998, 9994, 9986, 9970, 9938, 9874, 9746, 9490, 8978, 7954, 5906, 1810, 0
10000, 9998, 9994, 9986, 9970, 9938, 9874, 9746, 9490, 8978, 7954, 5906, 1810, 0,
]
);
}

View file

@ -36,13 +36,12 @@ use util::{init_test_logger, LOGGER};
#[test]
fn simple_server_wallet() {
init_test_logger();
info!(LOGGER, "starting simple_server_wallet");
let test_name_dir = "test_servers";
core::global::set_mining_mode(core::global::ChainTypes::AutomatedTesting);
framework::clean_all_output(test_name_dir);
init_test_logger();
// Run a separate coinbase wallet for coinbase transactions
let mut coinbase_config = LocalServerContainerConfig::default();
coinbase_config.name = String::from("coinbase_wallet_api");
@ -144,14 +143,13 @@ fn simple_server_wallet() {
/// Creates 2 servers and test P2P API
#[test]
fn test_p2p() {
init_test_logger();
info!(LOGGER, "starting test_p2p");
global::set_mining_mode(ChainTypes::AutomatedTesting);
let test_name_dir = "test_servers";
framework::clean_all_output(test_name_dir);
init_test_logger();
// Spawn server and let it run for a bit
let mut server_config_one = LocalServerContainerConfig::default();
server_config_one.name = String::from("p2p_server_one");
@ -192,14 +190,12 @@ fn test_p2p() {
let mut peers_all = get_all_peers(&base_addr, api_server_port);
assert!(peers_all.is_ok());
let pall = peers_all.unwrap();
println!("Peers: {:?}", &pall);
assert_eq!(pall.len(), 1);
assert_eq!(pall.len(), 2);
// Check that when we get peer connected the peer is here
let peers_connected = get_connected_peers(&base_addr, api_server_port);
assert!(peers_connected.is_ok());
let pc = peers_connected.unwrap();
println!("Peers connected: {:?}", &pc);
assert_eq!(pc.len(), 1);
// Check that the peer status is Healthy
@ -224,7 +220,7 @@ fn test_p2p() {
// Check from peer all
peers_all = get_all_peers(&base_addr, api_server_port);
assert!(peers_all.is_ok());
assert_eq!(peers_all.unwrap().len(), 1);
assert_eq!(peers_all.unwrap().len(), 2);
// Unban
let unban_result = unban_peer(&base_addr, api_server_port, &addr);
@ -233,7 +229,7 @@ fn test_p2p() {
// Check from peer connected
let peers_connected = get_connected_peers(&base_addr, api_server_port);
assert!(peers_connected.is_ok());
assert_eq!(peers_connected.unwrap().len(), 1);
assert_eq!(peers_connected.unwrap().len(), 0);
// Check its status is healthy with get peer
let peer = get_peer(&base_addr, api_server_port, &addr);
@ -327,7 +323,6 @@ fn get_outputs_by_ids2(
ids_string = ids_string + "?id=" + &id;
}
let ids_string = String::from(&ids_string[1..ids_string.len()]);
println!("{}", ids_string);
let url = format!(
"http://{}:{}/v1/chain/outputs/byids?{}",
base_addr, api_server_port, ids_string
@ -459,10 +454,7 @@ pub fn get_peer(
"http://{}:{}/v1/peers/{}",
base_addr, api_server_port, peer_addr
);
api::client::get::<p2p::PeerData>(url.as_str()).map_err(|e| {
println!("got error {:}", e);
Error::API(e)
})
api::client::get::<p2p::PeerData>(url.as_str()).map_err(|e| Error::API(e))
}
pub fn get_connected_peers(