Fix and cleanup of fast sync triggering logic (#916)

* Fix and cleanup of fast sync triggering logic
* New txhashset on fast sync has to be applied, not rolled back
* Do not block if peer send buffer is full, fixes #912
This commit is contained in:
Ignotus Peverell 2018-03-30 06:02:40 +00:00 committed by GitHub
parent 3e3fe6cae6
commit 5ba0dbf38d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 90 additions and 66 deletions

View file

@ -533,11 +533,18 @@ impl Chain {
self.store
.save_block_marker(&h, &(rewind_to_output, rewind_to_kernel))?;
debug!(
LOGGER,
"Going to validate new txhashset, might take some time..."
);
let mut txhashset =
txhashset::TxHashSet::open(self.db_root.clone(), self.store.clone(), None)?;
txhashset::extending(&mut txhashset, |extension| {
extension.validate(&header, false)?;
// TODO validate kernels and their sums with Outputs
// validate rewinds and rollbacks, in this specific case we want to
// apply the rewind
extension.cancel_rollback();
extension.rebuild_index()?;
Ok(())
})?;

View file

@ -146,7 +146,7 @@ impl TxHashSet {
let output_pmmr: PMMR<OutputIdentifier, _> =
PMMR::at(&mut self.output_pmmr_h.backend, self.output_pmmr_h.last_pos);
if let Some(hash) = output_pmmr.get_hash(pos) {
if hash == output_id.hash_with_index(pos-1) {
if hash == output_id.hash_with_index(pos - 1) {
Ok(hash)
} else {
Err(Error::TxHashSetErr(format!("txhashset hash mismatch")))
@ -386,7 +386,7 @@ impl<'a> Extension<'a> {
let commit = input.commitment();
let pos_res = self.get_output_pos(&commit);
if let Ok(pos) = pos_res {
let output_id_hash = OutputIdentifier::from_input(input).hash_with_index(pos-1);
let output_id_hash = OutputIdentifier::from_input(input).hash_with_index(pos - 1);
if let Some(read_hash) = self.output_pmmr.get_hash(pos) {
// check hash from pmmr matches hash from input (or corresponding output)
// if not then the input is not being honest about
@ -398,7 +398,7 @@ impl<'a> Extension<'a> {
|| output_id_hash
!= read_elem
.expect("no output at position")
.hash_with_index(pos-1)
.hash_with_index(pos - 1)
{
return Err(Error::TxHashSetErr(format!("output pmmr hash mismatch")));
}
@ -650,6 +650,11 @@ impl<'a> Extension<'a> {
self.rollback = true;
}
/// Cancel a previous rollback, to apply this extension
pub fn cancel_rollback(&mut self) {
self.rollback = false;
}
/// Dumps the output MMR.
/// We use this after compacting for visual confirmation that it worked.
pub fn dump_output_pmmr(&self) {

View file

@ -178,7 +178,7 @@ impl Server {
p2p_server.peers.clone(),
shared_chain.clone(),
skip_sync_wait,
!archive_mode,
archive_mode,
stop.clone(),
);

View file

@ -33,7 +33,7 @@ pub fn run_sync(
peers: Arc<p2p::Peers>,
chain: Arc<chain::Chain>,
skip_sync_wait: bool,
fast_sync: bool,
archive_mode: bool,
stop: Arc<AtomicBool>,
) {
let chain = chain.clone();
@ -42,7 +42,8 @@ pub fn run_sync(
.spawn(move || {
let mut prev_body_sync = time::now_utc();
let mut prev_header_sync = prev_body_sync.clone();
let mut prev_state_sync = prev_body_sync.clone() - time::Duration::seconds(5 * 60);
let mut prev_fast_sync = prev_body_sync.clone() - time::Duration::seconds(5 * 60);
let mut highest_height = 0;
// initial sleep to give us time to peer with some nodes
if !skip_sync_wait {
@ -64,16 +65,25 @@ pub fn run_sync(
let head = chain.head().unwrap();
let header_head = chain.get_header_head().unwrap();
// in archival nodes (no fast sync) we just consider we have the whole
// state already
let have_txhashset =
!fast_sync || header_head.height.saturating_sub(head.height) <= horizon;
// is syncing generally needed when we compare our state with others
let (syncing, most_work_height) =
needs_syncing(currently_syncing.as_ref(), peers.clone(), chain.clone());
let mut syncing = needs_syncing(
currently_syncing.as_ref(),
peers.clone(),
chain.clone(),
!have_txhashset,
if most_work_height > 0 {
// we can occasionally get a most work height of 0 if read locks fail
highest_height = most_work_height;
}
// in archival nodes (no fast sync) we just consider we have the whole
// state already, then fast sync triggers if other peers are much
// further ahead
let fast_sync_enabled =
!archive_mode && highest_height.saturating_sub(head.height) > horizon;
debug!(LOGGER, "syncing: {}, fast: {}", syncing, fast_sync_enabled);
debug!(
LOGGER,
"heights: {}, vs local {}", highest_height, header_head.height
);
let current_time = time::now_utc();
@ -85,42 +95,20 @@ pub fn run_sync(
}
// run the body_sync every 5s
if have_txhashset && current_time - prev_body_sync > time::Duration::seconds(5)
if !fast_sync_enabled
&& current_time - prev_body_sync > time::Duration::seconds(5)
{
body_sync(peers.clone(), chain.clone());
prev_body_sync = current_time;
}
} else if !have_txhashset && header_head.height > 0 {
if current_time - prev_state_sync > time::Duration::seconds(5 * 60) {
if let Some(peer) = peers.most_work_peer() {
if let Ok(p) = peer.try_read() {
// just to handle corner case of a too early start
if header_head.height > horizon {
debug!(
LOGGER,
"Header head before txhashset request: {} / {}",
header_head.height,
header_head.last_block_h
);
// ask for txhashset at horizon
let mut txhashset_head =
chain.get_block_header(&header_head.prev_block_h).unwrap();
for _ in 0..horizon - 2 {
txhashset_head = chain
.get_block_header(&txhashset_head.previous)
.unwrap();
}
p.send_txhashset_request(
txhashset_head.height,
txhashset_head.hash(),
).unwrap();
prev_state_sync = current_time;
}
}
// run fast sync if applicable, every 5 min
if fast_sync_enabled && header_head.height == highest_height {
if current_time - prev_fast_sync > time::Duration::seconds(5 * 60) {
fast_sync(peers.clone(), chain.clone(), &header_head);
prev_fast_sync = current_time;
}
}
syncing = true;
}
currently_syncing.store(syncing, Ordering::Relaxed);
@ -198,14 +186,16 @@ fn body_sync(peers: Arc<Peers>, chain: Arc<chain::Chain>) {
let peer = peers.more_work_peer();
if let Some(peer) = peer {
if let Ok(peer) = peer.try_read() {
let _ = peer.send_block_request(hash);
if let Err(e) = peer.send_block_request(hash) {
debug!(LOGGER, "Skipped request to {}: {:?}", peer.info.addr, e);
}
}
}
}
}
}
pub fn header_sync(peers: Arc<Peers>, chain: Arc<chain::Chain>) {
fn header_sync(peers: Arc<Peers>, chain: Arc<chain::Chain>) {
if let Ok(header_head) = chain.get_header_head() {
let difficulty = header_head.total_difficulty;
@ -220,6 +210,29 @@ pub fn header_sync(peers: Arc<Peers>, chain: Arc<chain::Chain>) {
}
}
fn fast_sync(peers: Arc<Peers>, chain: Arc<chain::Chain>, header_head: &chain::Tip) {
let horizon = global::cut_through_horizon() as u64;
if let Some(peer) = peers.most_work_peer() {
if let Ok(p) = peer.try_read() {
debug!(
LOGGER,
"Header head before txhashset request: {} / {}",
header_head.height,
header_head.last_block_h
);
// ask for txhashset at horizon
let mut txhashset_head = chain.get_block_header(&header_head.prev_block_h).unwrap();
for _ in 0..horizon.saturating_sub(20) {
txhashset_head = chain.get_block_header(&txhashset_head.previous).unwrap();
}
p.send_txhashset_request(txhashset_head.height, txhashset_head.hash())
.unwrap();
}
}
}
/// Request some block headers from a peer to advance us.
fn request_headers(peer: Arc<RwLock<Peer>>, chain: Arc<chain::Chain>) -> Result<(), Error> {
let locator = get_locator(chain)?;
@ -245,15 +258,11 @@ fn needs_syncing(
currently_syncing: &AtomicBool,
peers: Arc<Peers>,
chain: Arc<chain::Chain>,
header_only: bool,
) -> bool {
let local_diff = if header_only {
chain.total_header_difficulty().unwrap()
} else {
chain.total_difficulty()
};
) -> (bool, u64) {
let local_diff = chain.total_difficulty();
let peer = peers.most_work_peer();
let is_syncing = currently_syncing.load(Ordering::Relaxed);
let mut most_work_height = 0;
// if we're already syncing, we're caught up if no peer has a higher
// difficulty than us
@ -262,8 +271,9 @@ fn needs_syncing(
if let Ok(peer) = peer.try_read() {
debug!(
LOGGER,
"needs_syncing {} {} {}", local_diff, peer.info.total_difficulty, header_only
"needs_syncing {} {}", local_diff, peer.info.total_difficulty
);
most_work_height = peer.info.height;
if peer.info.total_difficulty <= local_diff {
let ch = chain.head().unwrap();
@ -274,19 +284,20 @@ fn needs_syncing(
ch.height,
ch.last_block_h
);
if !header_only {
let _ = chain.reset_head();
}
return false;
let _ = chain.reset_head();
return (false, 0);
}
}
} else {
warn!(LOGGER, "sync: no peers available, disabling sync");
return false;
return (false, 0);
}
} else {
if let Some(peer) = peer {
if let Ok(peer) = peer.try_read() {
most_work_height = peer.info.height;
// sum the last 5 difficulties to give us the threshold
let threshold = chain
.difficulty_iter()
@ -302,12 +313,12 @@ fn needs_syncing(
peer.info.total_difficulty,
threshold,
);
return true;
return (true, most_work_height);
}
}
}
}
is_syncing
(is_syncing, most_work_height)
}
/// We build a locator based on sync_head.

View file

@ -155,7 +155,7 @@ impl Tracker {
T: ser::Writeable,
{
let buf = write_to_buf(body, msg_type);
self.send_channel.send(buf)?;
self.send_channel.try_send(buf)?;
Ok(())
}
}

View file

@ -65,6 +65,7 @@ pub enum Error {
us: Hash,
peer: Hash,
},
Send(String),
}
impl From<ser::Error> for Error {
@ -82,9 +83,9 @@ impl From<io::Error> for Error {
Error::Connection(e)
}
}
impl<T> From<mpsc::SendError<T>> for Error {
fn from(_e: mpsc::SendError<T>) -> Error {
Error::ConnectionClose
impl<T> From<mpsc::TrySendError<T>> for Error {
fn from(e: mpsc::TrySendError<T>) -> Error {
Error::Send(e.to_string())
}
}
// impl From<TimerError> for Error {