simplify sync process further - one thread for both body and header sync (#427) (#428)

* simplify sync process further - one thread, try_read on peer for robustness
* add most_work_peers() for convenience, add most_work_peers count to "monitoring peers" log msg
This commit is contained in:
AntiochP 2017-12-05 11:42:25 -05:00 committed by GitHub
parent 7b9351864a
commit 179f74462a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 111 additions and 72 deletions

View file

@ -80,12 +80,12 @@ impl NetAdapter for NetToChainAdapter {
// mistaken or manevolent, both of which require a ban // mistaken or manevolent, both of which require a ban
if e.is_bad_block() { if e.is_bad_block() {
self.p2p_server.borrow().ban_peer(&addr); self.p2p_server.borrow().ban_peer(&addr);
//
// and if we're currently syncing, our header chain is now wrong, it // // header chain should be consistent with the sync head here
// needs to be reset // // we just banned the peer that sent a bad block so
if self.is_syncing() { // // sync head should resolve itself if/when we find an alternative peer
self.chain.reset_header_head(); // // with more work than ourselves
} // // we should not need to reset the header head here
} }
} }
} }

View file

@ -88,7 +88,8 @@ impl Seeder {
.for_each(move |_| { .for_each(move |_| {
debug!( debug!(
LOGGER, LOGGER,
"monitoring peers ({} of {})", "monitoring peers ({} / {} / {})",
p2p_server.most_work_peers().len(),
p2p_server.connected_peers().len(), p2p_server.connected_peers().len(),
p2p_server.all_peers().len(), p2p_server.all_peers().len(),
); );

View file

@ -15,11 +15,12 @@
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use time;
use adapters::NetToChainAdapter; use adapters::NetToChainAdapter;
use chain; use chain;
use core::core::hash::{Hash, Hashed}; use core::core::hash::{Hash, Hashed};
use p2p::{self, Peer, NetAdapter}; use p2p::{self, Peer};
use types::Error; use types::Error;
use util::LOGGER; use util::LOGGER;
@ -33,22 +34,34 @@ pub fn run_sync(
let p2p_inner = p2p_server.clone(); let p2p_inner = p2p_server.clone();
let c_inner = chain.clone(); let c_inner = chain.clone();
let _ = thread::Builder::new() let _ = thread::Builder::new()
.name("body_sync".to_string()) .name("sync".to_string())
.spawn(move || { .spawn(move || {
let mut prev_body_sync = time::now_utc();
let mut prev_header_sync = prev_body_sync.clone();
loop { loop {
if a_inner.is_syncing() { if a_inner.is_syncing() {
body_sync(p2p_inner.clone(), c_inner.clone()); let current_time = time::now_utc();
} else {
thread::sleep(Duration::from_secs(5)); // run the header sync every 5s
} if current_time - prev_header_sync > time::Duration::seconds(5) {
} header_sync(
}); p2p_server.clone(),
let _ = thread::Builder::new() chain.clone(),
.name("header_sync".to_string()) );
.spawn(move || { prev_header_sync = current_time;
loop { }
if adapter.is_syncing() {
header_sync(adapter.clone(), p2p_server.clone(), chain.clone()); // run the body_sync every iteration (1s)
if current_time - prev_body_sync > time::Duration::seconds(1) {
body_sync(
p2p_inner.clone(),
c_inner.clone(),
);
prev_body_sync = current_time;
}
thread::sleep(Duration::from_millis(250));
} else { } else {
thread::sleep(Duration::from_secs(5)); thread::sleep(Duration::from_secs(5));
} }
@ -98,10 +111,12 @@ fn body_sync(
} }
hashes.reverse(); hashes.reverse();
// let peer_count = p2p_server.most_work_peers().len();
let hashes_to_get = hashes let hashes_to_get = hashes
.iter() .iter()
.filter(|x| !chain.get_block(&x).is_ok()) .filter(|x| !chain.get_block(&x).is_ok())
.take(10) .take(10)
// .take(peer_count * 2)
.cloned() .cloned()
.collect::<Vec<_>>(); .collect::<Vec<_>>();
@ -115,48 +130,35 @@ fn body_sync(
); );
for hash in hashes_to_get.clone() { for hash in hashes_to_get.clone() {
// TODO - what condition should we choose most_work_peer v random_peer (if any?) let peer = p2p_server.most_work_peer();
let peer = if hashes_to_get.len() < 100 {
p2p_server.most_work_peer()
} else {
p2p_server.random_peer()
};
if let Some(peer) = peer { if let Some(peer) = peer {
let peer = peer.read().unwrap(); if let Ok(peer) = peer.try_read() {
if let Err(e) = peer.send_block_request(hash) { let _ = peer.send_block_request(hash);
debug!(LOGGER, "block_sync: error requesting block: {:?}, {:?}", hash, e);
} }
} }
} }
thread::sleep(Duration::from_secs(1));
} else {
thread::sleep(Duration::from_secs(5));
} }
} }
pub fn header_sync( pub fn header_sync(
adapter: Arc<NetToChainAdapter>,
p2p_server: Arc<p2p::Server>, p2p_server: Arc<p2p::Server>,
chain: Arc<chain::Chain>, chain: Arc<chain::Chain>,
) { ) {
debug!(LOGGER, "header_sync: loop"); if let Ok(header_head) = chain.get_header_head() {
let difficulty = header_head.total_difficulty;
let difficulty = adapter.total_difficulty(); if let Some(peer) = p2p_server.most_work_peer() {
if let Ok(p) = peer.try_read() {
if let Some(peer) = p2p_server.most_work_peer() { let peer_difficulty = p.info.total_difficulty.clone();
let peer = peer.clone(); if peer_difficulty > difficulty {
let p = peer.read().unwrap(); let _ = request_headers(
let peer_difficulty = p.info.total_difficulty.clone(); peer.clone(),
chain.clone(),
if peer_difficulty > difficulty { );
let _ = request_headers( }
peer.clone(), }
chain.clone(),
);
} }
} }
thread::sleep(Duration::from_secs(5));
} }
/// Request some block headers from a peer to advance us. /// Request some block headers from a peer to advance us.
@ -165,15 +167,26 @@ fn request_headers(
chain: Arc<chain::Chain>, chain: Arc<chain::Chain>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let locator = get_locator(chain)?; let locator = get_locator(chain)?;
let peer = peer.read().unwrap(); match peer.try_read() {
debug!( Ok(peer) => {
LOGGER, debug!(
"sync: asking {} for headers, locator: {:?}", LOGGER,
peer.info.addr, "sync: request_headers: asking {} for headers, {:?}",
locator, peer.info.addr,
); locator,
let _ = peer.send_header_request(locator); );
Ok(()) let _ = peer.send_header_request(locator);
Ok(())
},
Err(_) => {
// not much we can do here, log and try again next time
warn!(
LOGGER,
"sync: request_headers: failed to get read lock on peer",
);
Ok(())
},
}
} }
/// We build a locator based on sync_head. /// We build a locator based on sync_head.

View file

@ -296,24 +296,49 @@ impl Server {
rm rm
} }
/// Returns the peer with the most worked branch, showing the highest total /// Return vec of all peers that currently have the most worked branch,
/// difficulty. /// showing the highest total difficulty.
pub fn most_work_peer(&self) -> Option<Arc<RwLock<Peer>>> { pub fn most_work_peers(&self) -> Vec<Arc<RwLock<Peer>>> {
let mut peers = self.connected_peers(); let peers = self.connected_peers();
if peers.len() == 0 { if peers.len() == 0 {
return None; return vec![];
} }
// we want to randomize which "most_work_peer" we select let max_total_difficulty = peers
thread_rng().shuffle(&mut peers[..]); .iter()
.map(|x| {
match x.try_read() {
Ok(peer) => peer.info.total_difficulty.clone(),
Err(_) => Difficulty::zero(),
}
})
.max()
.unwrap();
peers.sort_by_key(|p| { let mut max_peers = peers
let p = p.read().unwrap(); .iter()
p.info.total_difficulty.clone() .filter(|x| {
}); match x.try_read() {
Ok(peer) => {
peer.info.total_difficulty == max_total_difficulty
},
Err(_) => false,
}
})
.cloned()
.collect::<Vec<_>>();
let peer = peers.last().unwrap(); thread_rng().shuffle(&mut max_peers);
Some(peer.clone()) max_peers
}
/// Returns single random peer with the most worked branch, showing the highest total
/// difficulty.
pub fn most_work_peer(&self) -> Option<Arc<RwLock<Peer>>> {
match self.most_work_peers().first() {
Some(x) => Some(x.clone()),
None => None
}
} }
/// Returns a random connected peer. /// Returns a random connected peer.