diff --git a/grin/src/adapters.rs b/grin/src/adapters.rs index adc7c8744..e6818be1a 100644 --- a/grin/src/adapters.rs +++ b/grin/src/adapters.rs @@ -80,12 +80,12 @@ impl NetAdapter for NetToChainAdapter { // mistaken or manevolent, both of which require a ban if e.is_bad_block() { self.p2p_server.borrow().ban_peer(&addr); - - // and if we're currently syncing, our header chain is now wrong, it - // needs to be reset - if self.is_syncing() { - self.chain.reset_header_head(); - } + // + // // header chain should be consistent with the sync head here + // // we just banned the peer that sent a bad block so + // // sync head should resolve itself if/when we find an alternative peer + // // with more work than ourselves + // // we should not need to reset the header head here } } } diff --git a/grin/src/seed.rs b/grin/src/seed.rs index ec9b1fcba..b60467afe 100644 --- a/grin/src/seed.rs +++ b/grin/src/seed.rs @@ -88,7 +88,8 @@ impl Seeder { .for_each(move |_| { debug!( LOGGER, - "monitoring peers ({} of {})", + "monitoring peers ({} / {} / {})", + p2p_server.most_work_peers().len(), p2p_server.connected_peers().len(), p2p_server.all_peers().len(), ); diff --git a/grin/src/sync.rs b/grin/src/sync.rs index 331fe388b..000450072 100644 --- a/grin/src/sync.rs +++ b/grin/src/sync.rs @@ -15,11 +15,12 @@ use std::thread; use std::time::Duration; use std::sync::{Arc, RwLock}; +use time; use adapters::NetToChainAdapter; use chain; use core::core::hash::{Hash, Hashed}; -use p2p::{self, Peer, NetAdapter}; +use p2p::{self, Peer}; use types::Error; use util::LOGGER; @@ -33,22 +34,34 @@ pub fn run_sync( let p2p_inner = p2p_server.clone(); let c_inner = chain.clone(); let _ = thread::Builder::new() - .name("body_sync".to_string()) + .name("sync".to_string()) .spawn(move || { + let mut prev_body_sync = time::now_utc(); + let mut prev_header_sync = prev_body_sync.clone(); + loop { if a_inner.is_syncing() { - body_sync(p2p_inner.clone(), c_inner.clone()); - } else { - thread::sleep(Duration::from_secs(5)); - } - } - }); - let _ = thread::Builder::new() - .name("header_sync".to_string()) - .spawn(move || { - loop { - if adapter.is_syncing() { - header_sync(adapter.clone(), p2p_server.clone(), chain.clone()); + let current_time = time::now_utc(); + + // run the header sync every 5s + if current_time - prev_header_sync > time::Duration::seconds(5) { + header_sync( + p2p_server.clone(), + chain.clone(), + ); + prev_header_sync = current_time; + } + + // run the body_sync every iteration (1s) + if current_time - prev_body_sync > time::Duration::seconds(1) { + body_sync( + p2p_inner.clone(), + c_inner.clone(), + ); + prev_body_sync = current_time; + } + + thread::sleep(Duration::from_millis(250)); } else { thread::sleep(Duration::from_secs(5)); } @@ -98,10 +111,12 @@ fn body_sync( } hashes.reverse(); + // let peer_count = p2p_server.most_work_peers().len(); let hashes_to_get = hashes .iter() .filter(|x| !chain.get_block(&x).is_ok()) .take(10) + // .take(peer_count * 2) .cloned() .collect::>(); @@ -115,48 +130,35 @@ fn body_sync( ); for hash in hashes_to_get.clone() { - // TODO - what condition should we choose most_work_peer v random_peer (if any?) - let peer = if hashes_to_get.len() < 100 { - p2p_server.most_work_peer() - } else { - p2p_server.random_peer() - }; + let peer = p2p_server.most_work_peer(); if let Some(peer) = peer { - let peer = peer.read().unwrap(); - if let Err(e) = peer.send_block_request(hash) { - debug!(LOGGER, "block_sync: error requesting block: {:?}, {:?}", hash, e); + if let Ok(peer) = peer.try_read() { + let _ = peer.send_block_request(hash); } } } - thread::sleep(Duration::from_secs(1)); - } else { - thread::sleep(Duration::from_secs(5)); } } pub fn header_sync( - adapter: Arc, p2p_server: Arc, chain: Arc, - ) { - debug!(LOGGER, "header_sync: loop"); +) { + if let Ok(header_head) = chain.get_header_head() { + let difficulty = header_head.total_difficulty; - let difficulty = adapter.total_difficulty(); - - if let Some(peer) = p2p_server.most_work_peer() { - let peer = peer.clone(); - let p = peer.read().unwrap(); - let peer_difficulty = p.info.total_difficulty.clone(); - - if peer_difficulty > difficulty { - let _ = request_headers( - peer.clone(), - chain.clone(), - ); + if let Some(peer) = p2p_server.most_work_peer() { + if let Ok(p) = peer.try_read() { + let peer_difficulty = p.info.total_difficulty.clone(); + if peer_difficulty > difficulty { + let _ = request_headers( + peer.clone(), + chain.clone(), + ); + } + } } } - - thread::sleep(Duration::from_secs(5)); } /// Request some block headers from a peer to advance us. @@ -165,15 +167,26 @@ fn request_headers( chain: Arc, ) -> Result<(), Error> { let locator = get_locator(chain)?; - let peer = peer.read().unwrap(); - debug!( - LOGGER, - "sync: asking {} for headers, locator: {:?}", - peer.info.addr, - locator, - ); - let _ = peer.send_header_request(locator); - Ok(()) + match peer.try_read() { + Ok(peer) => { + debug!( + LOGGER, + "sync: request_headers: asking {} for headers, {:?}", + peer.info.addr, + locator, + ); + let _ = peer.send_header_request(locator); + Ok(()) + }, + Err(_) => { + // not much we can do here, log and try again next time + warn!( + LOGGER, + "sync: request_headers: failed to get read lock on peer", + ); + Ok(()) + }, + } } /// We build a locator based on sync_head. diff --git a/p2p/src/server.rs b/p2p/src/server.rs index d7db193bc..49361ce9b 100644 --- a/p2p/src/server.rs +++ b/p2p/src/server.rs @@ -296,24 +296,49 @@ impl Server { rm } - /// Returns the peer with the most worked branch, showing the highest total - /// difficulty. - pub fn most_work_peer(&self) -> Option>> { - let mut peers = self.connected_peers(); + /// Return vec of all peers that currently have the most worked branch, + /// showing the highest total difficulty. + pub fn most_work_peers(&self) -> Vec>> { + let peers = self.connected_peers(); if peers.len() == 0 { - return None; + return vec![]; } - // we want to randomize which "most_work_peer" we select - thread_rng().shuffle(&mut peers[..]); + let max_total_difficulty = peers + .iter() + .map(|x| { + match x.try_read() { + Ok(peer) => peer.info.total_difficulty.clone(), + Err(_) => Difficulty::zero(), + } + }) + .max() + .unwrap(); - peers.sort_by_key(|p| { - let p = p.read().unwrap(); - p.info.total_difficulty.clone() - }); + let mut max_peers = peers + .iter() + .filter(|x| { + match x.try_read() { + Ok(peer) => { + peer.info.total_difficulty == max_total_difficulty + }, + Err(_) => false, + } + }) + .cloned() + .collect::>(); - let peer = peers.last().unwrap(); - Some(peer.clone()) + thread_rng().shuffle(&mut max_peers); + max_peers + } + + /// Returns single random peer with the most worked branch, showing the highest total + /// difficulty. + pub fn most_work_peer(&self) -> Option>> { + match self.most_work_peers().first() { + Some(x) => Some(x.clone()), + None => None + } } /// Returns a random connected peer.