refactor the state_sync to handle the long fork (#1902)

* split horizon into two explicit values for cut through and txhashset request

* let node which has 2-7 days of history be able to handle forks larger than 2 days

* add test simulate_long_fork

* add pause/resume feature on p2p for tests

* refactor the state_sync

* ignore the test case simulate_long_fork for normal Travis-CI

* refactor function check_txhashset_needed to be shared with body_sync

* fix: state TxHashsetDone should allow header sync
This commit is contained in:
Gary Yu 2018-11-10 11:27:52 +08:00 committed by GitHub
parent 408fcc386e
commit 9af9ca9518
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 788 additions and 144 deletions

View file

@ -25,7 +25,7 @@ use util::RwLock;
use lmdb; use lmdb;
use lru_cache::LruCache; use lru_cache::LruCache;
use core::core::hash::{Hash, Hashed}; use core::core::hash::{Hash, Hashed, ZERO_HASH};
use core::core::merkle_proof::MerkleProof; use core::core::merkle_proof::MerkleProof;
use core::core::verifier_cache::VerifierCache; use core::core::verifier_cache::VerifierCache;
use core::core::{Block, BlockHeader, BlockSums, Output, OutputIdentifier, Transaction, TxKernel}; use core::core::{Block, BlockHeader, BlockSums, Output, OutputIdentifier, Transaction, TxKernel};
@ -700,6 +700,73 @@ impl Chain {
Ok(()) Ok(())
} }
/// Check chain status whether a txhashset downloading is needed
pub fn check_txhashset_needed(&self, caller: String, hashes: &mut Option<Vec<Hash>>) -> bool {
let horizon = global::cut_through_horizon() as u64;
let body_head = self.head().unwrap();
let header_head = self.header_head().unwrap();
let sync_head = self.get_sync_head().unwrap();
debug!(
"{}: body_head - {}, {}, header_head - {}, {}, sync_head - {}, {}",
caller,
body_head.last_block_h,
body_head.height,
header_head.last_block_h,
header_head.height,
sync_head.last_block_h,
sync_head.height,
);
if body_head.total_difficulty >= header_head.total_difficulty {
debug!(
"{}: no need. header_head.total_difficulty: {} <= body_head.total_difficulty: {}",
caller, header_head.total_difficulty, body_head.total_difficulty,
);
return false;
}
let mut oldest_height = 0;
let mut oldest_hash = ZERO_HASH;
let mut current = self.get_block_header(&header_head.last_block_h);
if current.is_err() {
error!(
"{}: header_head not found in chain db: {} at {}",
caller, header_head.last_block_h, header_head.height,
);
}
while let Ok(header) = current {
// break out of the while loop when we find a header common
// between the header chain and the current body chain
if let Ok(_) = self.is_on_current_chain(&header) {
break;
}
oldest_height = header.height;
oldest_hash = header.hash();
if let Some(hs) = hashes {
hs.push(oldest_hash);
}
current = self.get_previous_header(&header);
}
if oldest_height < header_head.height.saturating_sub(horizon) {
if oldest_height > 0 {
debug!(
"{}: oldest block which is not on local chain: {} at {}",
caller, oldest_hash, oldest_height,
);
return true;
} else {
error!("{}: something is wrong! oldest_height is 0", caller);
return false;
};
}
return false;
}
/// Writes a reading view on a txhashset state that's been provided to us. /// Writes a reading view on a txhashset state that's been provided to us.
/// If we're willing to accept that new state, the data stream will be /// If we're willing to accept that new state, the data stream will be
/// read as a zip file, unzipped and the resulting state files should be /// read as a zip file, unzipped and the resulting state files should be
@ -712,14 +779,12 @@ impl Chain {
) -> Result<(), Error> { ) -> Result<(), Error> {
status.on_setup(); status.on_setup();
// Initial check based on relative heights of current head and header_head. // Initial check whether this txhashset is needed or not
{ let mut hashes: Option<Vec<Hash>> = None;
let head = self.head().unwrap(); if !self.check_txhashset_needed("txhashset_write".to_owned(), &mut hashes) {
let header_head = self.header_head().unwrap(); warn!("txhashset_write: txhashset received but it's not needed! ignored.");
if header_head.height - head.height < global::cut_through_horizon() as u64 {
return Err(ErrorKind::InvalidTxHashSet("not needed".to_owned()).into()); return Err(ErrorKind::InvalidTxHashSet("not needed".to_owned()).into());
} }
}
let header = self.get_block_header(&h)?; let header = self.get_block_header(&h)?;
txhashset::zip_write(self.db_root.clone(), txhashset_data, &header)?; txhashset::zip_write(self.db_root.clone(), txhashset_data, &header)?;
@ -769,6 +834,9 @@ impl Chain {
batch.save_body_head(&tip)?; batch.save_body_head(&tip)?;
batch.save_header_height(&header)?; batch.save_header_height(&header)?;
batch.build_by_height_index(&header, true)?; batch.build_by_height_index(&header, true)?;
// Reset the body tail to the body head after a txhashset write
batch.save_body_tail(&tip)?;
} }
// Commit all the changes to the db. // Commit all the changes to the db.
@ -819,12 +887,13 @@ impl Chain {
let horizon = global::cut_through_horizon() as u64; let horizon = global::cut_through_horizon() as u64;
let head = self.head()?; let head = self.head()?;
let tail = self.tail()?;
let cutoff = head.height.saturating_sub(horizon); let cutoff = head.height.saturating_sub(horizon);
debug!( debug!(
"compact_blocks_db: head height: {}, horizon: {}, cutoff: {}", "compact_blocks_db: head height: {}, tail height: {}, horizon: {}, cutoff: {}",
head.height, horizon, cutoff, head.height, tail.height, horizon, cutoff,
); );
if cutoff == 0 { if cutoff == 0 {
@ -859,8 +928,13 @@ impl Chain {
Err(e) => return Err(From::from(e)), Err(e) => return Err(From::from(e)),
} }
} }
let tail = batch.get_header_by_height(head.height - horizon)?;
batch.save_body_tail(&Tip::from_header(&tail))?;
batch.commit()?; batch.commit()?;
debug!("compact_blocks_db: removed {} blocks.", count); debug!(
"compact_blocks_db: removed {} blocks. tail height: {}",
count, tail.height
);
Ok(()) Ok(())
} }
@ -943,6 +1017,13 @@ impl Chain {
.map_err(|e| ErrorKind::StoreErr(e, "chain head".to_owned()).into()) .map_err(|e| ErrorKind::StoreErr(e, "chain head".to_owned()).into())
} }
/// Tail of the block chain in this node after compact (cross-block cut-through)
pub fn tail(&self) -> Result<Tip, Error> {
self.store
.tail()
.map_err(|e| ErrorKind::StoreErr(e, "chain tail".to_owned()).into())
}
/// Tip (head) of the header chain. /// Tip (head) of the header chain.
pub fn header_head(&self) -> Result<Tip, Error> { pub fn header_head(&self) -> Result<Tip, Error> {
self.store self.store

View file

@ -199,6 +199,10 @@ pub fn process_block(b: &Block, ctx: &mut BlockContext) -> Result<Option<Tip>, E
// so we can maintain multiple (in progress) forks. // so we can maintain multiple (in progress) forks.
add_block(b, &ctx.batch)?; add_block(b, &ctx.batch)?;
if ctx.batch.tail().is_err() {
update_body_tail(&b.header, &ctx.batch)?;
}
// Update the chain head if total work is increased. // Update the chain head if total work is increased.
let res = update_head(b, ctx)?; let res = update_head(b, ctx)?;
Ok(res) Ok(res)
@ -552,6 +556,16 @@ fn add_block(b: &Block, batch: &store::Batch) -> Result<(), Error> {
Ok(()) Ok(())
} }
/// Update the block chain tail so we can know the exact tail of full blocks in this node
fn update_body_tail(bh: &BlockHeader, batch: &store::Batch) -> Result<(), Error> {
let tip = Tip::from_header(bh);
batch
.save_body_tail(&tip)
.map_err(|e| ErrorKind::StoreErr(e, "pipe save body tail".to_owned()))?;
debug!("body tail {} @ {}", bh.hash(), bh.height);
Ok(())
}
/// Officially adds the block header to our header chain. /// Officially adds the block header to our header chain.
fn add_block_header(bh: &BlockHeader, batch: &store::Batch) -> Result<(), Error> { fn add_block_header(bh: &BlockHeader, batch: &store::Batch) -> Result<(), Error> {
batch batch

View file

@ -36,6 +36,7 @@ const STORE_SUBPATH: &'static str = "chain";
const BLOCK_HEADER_PREFIX: u8 = 'h' as u8; const BLOCK_HEADER_PREFIX: u8 = 'h' as u8;
const BLOCK_PREFIX: u8 = 'b' as u8; const BLOCK_PREFIX: u8 = 'b' as u8;
const HEAD_PREFIX: u8 = 'H' as u8; const HEAD_PREFIX: u8 = 'H' as u8;
const TAIL_PREFIX: u8 = 'T' as u8;
const HEADER_HEAD_PREFIX: u8 = 'I' as u8; const HEADER_HEAD_PREFIX: u8 = 'I' as u8;
const SYNC_HEAD_PREFIX: u8 = 's' as u8; const SYNC_HEAD_PREFIX: u8 = 's' as u8;
const HEADER_HEIGHT_PREFIX: u8 = '8' as u8; const HEADER_HEIGHT_PREFIX: u8 = '8' as u8;
@ -70,6 +71,10 @@ impl ChainStore {
option_to_not_found(self.db.get_ser(&vec![HEAD_PREFIX]), "HEAD") option_to_not_found(self.db.get_ser(&vec![HEAD_PREFIX]), "HEAD")
} }
pub fn tail(&self) -> Result<Tip, Error> {
option_to_not_found(self.db.get_ser(&vec![TAIL_PREFIX]), "TAIL")
}
/// Header of the block at the head of the block chain (not the same thing as header_head). /// Header of the block at the head of the block chain (not the same thing as header_head).
pub fn head_header(&self) -> Result<BlockHeader, Error> { pub fn head_header(&self) -> Result<BlockHeader, Error> {
self.get_block_header(&self.head()?.last_block_h) self.get_block_header(&self.head()?.last_block_h)
@ -225,6 +230,10 @@ impl<'a> Batch<'a> {
option_to_not_found(self.db.get_ser(&vec![HEAD_PREFIX]), "HEAD") option_to_not_found(self.db.get_ser(&vec![HEAD_PREFIX]), "HEAD")
} }
pub fn tail(&self) -> Result<Tip, Error> {
option_to_not_found(self.db.get_ser(&vec![TAIL_PREFIX]), "TAIL")
}
/// Header of the block at the head of the block chain (not the same thing as header_head). /// Header of the block at the head of the block chain (not the same thing as header_head).
pub fn head_header(&self) -> Result<BlockHeader, Error> { pub fn head_header(&self) -> Result<BlockHeader, Error> {
self.get_block_header(&self.head()?.last_block_h) self.get_block_header(&self.head()?.last_block_h)
@ -248,6 +257,10 @@ impl<'a> Batch<'a> {
self.db.put_ser(&vec![HEAD_PREFIX], t) self.db.put_ser(&vec![HEAD_PREFIX], t)
} }
pub fn save_body_tail(&self, t: &Tip) -> Result<(), Error> {
self.db.put_ser(&vec![TAIL_PREFIX], t)
}
pub fn save_header_head(&self, t: &Tip) -> Result<(), Error> { pub fn save_header_head(&self, t: &Tip) -> Result<(), Error> {
self.db.put_ser(&vec![HEADER_HEAD_PREFIX], t) self.db.put_ser(&vec![HEADER_HEAD_PREFIX], t)
} }

View file

@ -92,6 +92,14 @@ pub const MAX_SECONDARY_SCALING: u64 = 8 << 11;
/// easier to reason about. /// easier to reason about.
pub const CUT_THROUGH_HORIZON: u32 = WEEK_HEIGHT as u32; pub const CUT_THROUGH_HORIZON: u32 = WEEK_HEIGHT as u32;
/// Default number of blocks in the past to determine the height where we request
/// a txhashset (and full blocks from). Needs to be long enough to not overlap with
/// a long reorg.
/// Rational behind the value is the longest bitcoin fork was about 30 blocks, so 5h.
/// We add an order of magnitude to be safe and round to 2x24h of blocks to make it
/// easier to reason about.
pub const STATE_SYNC_THRESHOLD: u32 = 2 * DAY_HEIGHT as u32;
/// Weight of an input when counted against the max block weight capacity /// Weight of an input when counted against the max block weight capacity
pub const BLOCK_INPUT_WEIGHT: usize = 1; pub const BLOCK_INPUT_WEIGHT: usize = 1;

View file

@ -20,7 +20,7 @@ use consensus::HeaderInfo;
use consensus::{ use consensus::{
graph_weight, BASE_EDGE_BITS, BLOCK_TIME_SEC, COINBASE_MATURITY, CUT_THROUGH_HORIZON, graph_weight, BASE_EDGE_BITS, BLOCK_TIME_SEC, COINBASE_MATURITY, CUT_THROUGH_HORIZON,
DAY_HEIGHT, DIFFICULTY_ADJUST_WINDOW, INITIAL_DIFFICULTY, PROOFSIZE, SECOND_POW_EDGE_BITS, DAY_HEIGHT, DIFFICULTY_ADJUST_WINDOW, INITIAL_DIFFICULTY, PROOFSIZE, SECOND_POW_EDGE_BITS,
UNIT_DIFFICULTY STATE_SYNC_THRESHOLD, UNIT_DIFFICULTY,
}; };
use pow::{self, CuckatooContext, EdgeType, PoWContext}; use pow::{self, CuckatooContext, EdgeType, PoWContext};
/// An enum collecting sets of parameters used throughout the /// An enum collecting sets of parameters used throughout the
@ -51,7 +51,10 @@ pub const AUTOMATED_TESTING_COINBASE_MATURITY: u64 = 3;
pub const USER_TESTING_COINBASE_MATURITY: u64 = 3; pub const USER_TESTING_COINBASE_MATURITY: u64 = 3;
/// Testing cut through horizon in blocks /// Testing cut through horizon in blocks
pub const TESTING_CUT_THROUGH_HORIZON: u32 = 20; pub const TESTING_CUT_THROUGH_HORIZON: u32 = 70;
/// Testing state sync threshold in blocks
pub const TESTING_STATE_SYNC_THRESHOLD: u32 = 20;
/// Testing initial graph weight /// Testing initial graph weight
pub const TESTING_INITIAL_GRAPH_WEIGHT: u32 = 1; pub const TESTING_INITIAL_GRAPH_WEIGHT: u32 = 1;
@ -237,6 +240,16 @@ pub fn cut_through_horizon() -> u32 {
} }
} }
/// Threshold at which we can request a txhashset (and full blocks from)
pub fn state_sync_threshold() -> u32 {
let param_ref = CHAIN_TYPE.read();
match *param_ref {
ChainTypes::AutomatedTesting => TESTING_STATE_SYNC_THRESHOLD,
ChainTypes::UserTesting => TESTING_STATE_SYNC_THRESHOLD,
_ => STATE_SYNC_THRESHOLD,
}
}
/// Are we in automated testing mode? /// Are we in automated testing mode?
pub fn is_automated_testing_mode() -> bool { pub fn is_automated_testing_mode() -> bool {
let param_ref = CHAIN_TYPE.read(); let param_ref = CHAIN_TYPE.read();

View file

@ -225,6 +225,7 @@ impl Peers {
/// Unban a peer, checks if it exists and banned then unban /// Unban a peer, checks if it exists and banned then unban
pub fn unban_peer(&self, peer_addr: &SocketAddr) { pub fn unban_peer(&self, peer_addr: &SocketAddr) {
debug!("unban_peer: peer {}", peer_addr);
match self.get_peer(*peer_addr) { match self.get_peer(*peer_addr) {
Ok(_) => { Ok(_) => {
if self.is_banned(*peer_addr) { if self.is_banned(*peer_addr) {

View file

@ -39,6 +39,7 @@ pub struct Server {
handshake: Arc<Handshake>, handshake: Arc<Handshake>,
pub peers: Arc<Peers>, pub peers: Arc<Peers>,
stop: Arc<AtomicBool>, stop: Arc<AtomicBool>,
pause: Arc<AtomicBool>,
} }
// TODO TLS // TODO TLS
@ -51,13 +52,15 @@ impl Server {
adapter: Arc<ChainAdapter>, adapter: Arc<ChainAdapter>,
genesis: Hash, genesis: Hash,
stop: Arc<AtomicBool>, stop: Arc<AtomicBool>,
pause: Arc<AtomicBool>,
) -> Result<Server, Error> { ) -> Result<Server, Error> {
Ok(Server { Ok(Server {
config: config.clone(), config: config.clone(),
capabilities: capab, capabilities: capab,
handshake: Arc::new(Handshake::new(genesis, config.clone())), handshake: Arc::new(Handshake::new(genesis, config.clone())),
peers: Arc::new(Peers::new(PeerStore::new(db_env)?, adapter, config)), peers: Arc::new(Peers::new(PeerStore::new(db_env)?, adapter, config)),
stop: stop, stop,
pause,
}) })
} }
@ -71,6 +74,12 @@ impl Server {
let sleep_time = Duration::from_millis(1); let sleep_time = Duration::from_millis(1);
loop { loop {
// Pause peer ingress connection request. Only for tests.
if self.pause.load(Ordering::Relaxed) {
thread::sleep(Duration::from_secs(1));
continue;
}
match listener.accept() { match listener.accept() {
Ok((stream, peer_addr)) => { Ok((stream, peer_addr)) => {
if !self.check_banned(&stream) { if !self.check_banned(&stream) {
@ -185,6 +194,14 @@ impl Server {
self.stop.store(true, Ordering::Relaxed); self.stop.store(true, Ordering::Relaxed);
self.peers.stop(); self.peers.stop();
} }
/// Pause means: stop all the current peers connection, only for tests.
/// Note:
/// 1. must pause the 'seed' thread also, to avoid the new egress peer connection
/// 2. must pause the 'p2p-server' thread also, to avoid the new ingress peer connection.
pub fn pause(&self) {
self.peers.stop();
}
} }
/// A no-op network adapter used for testing. /// A no-op network adapter used for testing.

View file

@ -58,6 +58,7 @@ fn peer_handshake() {
net_adapter.clone(), net_adapter.clone(),
Hash::from_vec(&vec![]), Hash::from_vec(&vec![]),
Arc::new(AtomicBool::new(false)), Arc::new(AtomicBool::new(false)),
Arc::new(AtomicBool::new(false)),
).unwrap(), ).unwrap(),
); );

View file

@ -261,6 +261,8 @@ pub enum SyncStatus {
}, },
/// Finalizing the new state /// Finalizing the new state
TxHashsetSave, TxHashsetSave,
/// State sync finalized
TxHashsetDone,
/// Downloading blocks /// Downloading blocks
BodySync { BodySync {
current_height: u64, current_height: u64,
@ -383,9 +385,6 @@ impl chain::TxHashsetWriteStatus for SyncState {
} }
fn on_done(&self) { fn on_done(&self) {
self.update(SyncStatus::BodySync { self.update(SyncStatus::TxHashsetDone);
current_height: 0,
highest_height: 0,
});
} }
} }

View file

@ -41,6 +41,7 @@ pub fn connect_and_monitor(
seed_list: Box<Fn() -> Vec<SocketAddr> + Send>, seed_list: Box<Fn() -> Vec<SocketAddr> + Send>,
preferred_peers: Option<Vec<SocketAddr>>, preferred_peers: Option<Vec<SocketAddr>>,
stop: Arc<AtomicBool>, stop: Arc<AtomicBool>,
pause: Arc<AtomicBool>,
) { ) {
let _ = thread::Builder::new() let _ = thread::Builder::new()
.name("seed".to_string()) .name("seed".to_string())
@ -65,6 +66,12 @@ pub fn connect_and_monitor(
let mut start_attempt = 0; let mut start_attempt = 0;
while !stop.load(Ordering::Relaxed) { while !stop.load(Ordering::Relaxed) {
// Pause egress peer connection request. Only for tests.
if pause.load(Ordering::Relaxed) {
thread::sleep(time::Duration::from_secs(1));
continue;
}
// Check for and remove expired peers from the storage // Check for and remove expired peers from the storage
if Utc::now() - prev_expire_check > Duration::hours(1) { if Utc::now() - prev_expire_check > Duration::hours(1) {
peers.remove_expired(); peers.remove_expired();

View file

@ -58,6 +58,8 @@ pub struct Server {
state_info: ServerStateInfo, state_info: ServerStateInfo,
/// Stop flag /// Stop flag
pub stop: Arc<AtomicBool>, pub stop: Arc<AtomicBool>,
/// Pause flag
pub pause: Arc<AtomicBool>,
} }
impl Server { impl Server {
@ -111,6 +113,7 @@ impl Server {
}; };
let stop = Arc::new(AtomicBool::new(false)); let stop = Arc::new(AtomicBool::new(false));
let pause = Arc::new(AtomicBool::new(false));
// Shared cache for verification results. // Shared cache for verification results.
// We cache rangeproof verification and kernel signature verification. // We cache rangeproof verification and kernel signature verification.
@ -173,6 +176,7 @@ impl Server {
net_adapter.clone(), net_adapter.clone(),
genesis.hash(), genesis.hash(),
stop.clone(), stop.clone(),
pause.clone(),
)?); )?);
chain_adapter.init(p2p_server.peers.clone()); chain_adapter.init(p2p_server.peers.clone());
pool_net_adapter.init(p2p_server.peers.clone()); pool_net_adapter.init(p2p_server.peers.clone());
@ -203,6 +207,7 @@ impl Server {
seeder, seeder,
peers_preferred, peers_preferred,
stop.clone(), stop.clone(),
pause.clone(),
); );
} }
@ -254,6 +259,7 @@ impl Server {
..Default::default() ..Default::default()
}, },
stop, stop,
pause,
}) })
} }
@ -425,6 +431,18 @@ impl Server {
self.stop.store(true, Ordering::Relaxed); self.stop.store(true, Ordering::Relaxed);
} }
/// Pause the p2p server.
pub fn pause(&self) {
self.pause.store(true, Ordering::Relaxed);
thread::sleep(time::Duration::from_secs(1));
self.p2p.pause();
}
/// Resume the p2p server.
pub fn resume(&self) {
self.pause.store(false, Ordering::Relaxed);
}
/// Stops the test miner without stopping the p2p layer /// Stops the test miner without stopping the p2p layer
pub fn stop_test_miner(&self, stop: Arc<AtomicBool>) { pub fn stop_test_miner(&self, stop: Arc<AtomicBool>) {
stop.store(true, Ordering::Relaxed); stop.store(true, Ordering::Relaxed);

View file

@ -19,8 +19,7 @@ use std::sync::Arc;
use chain; use chain;
use common::types::{SyncState, SyncStatus}; use common::types::{SyncState, SyncStatus};
use core::core::hash::Hashed; use core::core::hash::Hash;
use core::global;
use p2p; use p2p;
pub struct BodySync { pub struct BodySync {
@ -50,83 +49,39 @@ impl BodySync {
} }
} }
/// Check whether a body sync is needed and run it if so /// Check whether a body sync is needed and run it if so.
/// Return true if txhashset download is needed (when requested block is under the horizon).
pub fn check_run(&mut self, head: &chain::Tip, highest_height: u64) -> bool { pub fn check_run(&mut self, head: &chain::Tip, highest_height: u64) -> bool {
// if fast_sync disabled or not needed, run the body_sync every 5s // run the body_sync every 5s
if self.body_sync_due() { if self.body_sync_due() {
self.body_sync(); if self.body_sync() {
return true;
}
self.sync_state.update(SyncStatus::BodySync { self.sync_state.update(SyncStatus::BodySync {
current_height: head.height, current_height: head.height,
highest_height: highest_height, highest_height: highest_height,
}); });
return true;
} }
false false
} }
fn body_sync(&mut self) { /// Return true if txhashset download is needed (when requested block is under the horizon).
let horizon = global::cut_through_horizon() as u64; fn body_sync(&mut self) -> bool {
let body_head = self.chain.head().unwrap(); let mut hashes: Option<Vec<Hash>> = Some(vec![]);
let header_head = self.chain.header_head().unwrap(); if self
let sync_head = self.chain.get_sync_head().unwrap(); .chain
.check_txhashset_needed("body_sync".to_owned(), &mut hashes)
{
debug!( debug!(
"body_sync: body_head - {}, {}, header_head - {}, {}, sync_head - {}, {}", "body_sync: cannot sync full blocks earlier than horizon. will request txhashset",
body_head.last_block_h,
body_head.height,
header_head.last_block_h,
header_head.height,
sync_head.last_block_h,
sync_head.height,
); );
return true;
let mut hashes = vec![];
let mut oldest_height = 0;
if header_head.total_difficulty > body_head.total_difficulty {
let mut current = self.chain.get_block_header(&header_head.last_block_h);
//+ remove me after #1880 root cause found
if current.is_err() {
error!(
"body_sync: header_head not found in chain db: {} at {}",
header_head.last_block_h, header_head.height,
);
}
//-
while let Ok(header) = current {
// break out of the while loop when we find a header common
// between the header chain and the current body chain
if let Ok(_) = self.chain.is_on_current_chain(&header) {
break;
} }
hashes.push(header.hash()); let mut hashes = hashes.unwrap();
oldest_height = header.height;
current = self.chain.get_previous_header(&header);
}
}
//+ remove me after #1880 root cause found
else {
debug!(
"body_sync: header_head.total_difficulty: {}, body_head.total_difficulty: {}",
header_head.total_difficulty, body_head.total_difficulty,
);
}
//-
hashes.reverse(); hashes.reverse();
if oldest_height < header_head.height.saturating_sub(horizon) {
debug!(
"body_sync: cannot sync full blocks earlier than horizon. oldest_height: {}",
oldest_height,
);
return;
}
let peers = self.peers.more_work_peers(); let peers = self.peers.more_work_peers();
// if we have 5 peers to sync from then ask for 50 blocks total (peer_count * // if we have 5 peers to sync from then ask for 50 blocks total (peer_count *
@ -147,6 +102,9 @@ impl BodySync {
.collect::<Vec<_>>(); .collect::<Vec<_>>();
if hashes_to_get.len() > 0 { if hashes_to_get.len() > 0 {
let body_head = self.chain.head().unwrap();
let header_head = self.chain.header_head().unwrap();
debug!( debug!(
"block_sync: {}/{} requesting blocks {:?} from {} peers", "block_sync: {}/{} requesting blocks {:?} from {} peers",
body_head.height, body_head.height,
@ -170,6 +128,7 @@ impl BodySync {
} }
} }
} }
return false;
} }
// Should we run block body sync and ask for more full blocks? // Should we run block body sync and ask for more full blocks?

View file

@ -51,7 +51,9 @@ impl HeaderSync {
} }
let enable_header_sync = match self.sync_state.status() { let enable_header_sync = match self.sync_state.status() {
SyncStatus::BodySync { .. } | SyncStatus::HeaderSync { .. } => true, SyncStatus::BodySync { .. }
| SyncStatus::HeaderSync { .. }
| SyncStatus::TxHashsetDone => true,
SyncStatus::NoSync | SyncStatus::Initial | SyncStatus::AwaitingPeers(_) => { SyncStatus::NoSync | SyncStatus::Initial | SyncStatus::AwaitingPeers(_) => {
// Reset sync_head to header_head on transition to HeaderSync, // Reset sync_head to header_head on transition to HeaderSync,
// but ONLY on initial transition to HeaderSync state. // but ONLY on initial transition to HeaderSync state.

View file

@ -33,8 +33,8 @@ pub struct StateSync {
peers: Arc<p2p::Peers>, peers: Arc<p2p::Peers>,
chain: Arc<chain::Chain>, chain: Arc<chain::Chain>,
prev_fast_sync: Option<DateTime<Utc>>, prev_state_sync: Option<DateTime<Utc>>,
fast_sync_peer: Option<Arc<Peer>>, state_sync_peer: Option<Arc<Peer>>,
} }
impl StateSync { impl StateSync {
@ -47,8 +47,8 @@ impl StateSync {
sync_state, sync_state,
peers, peers,
chain, chain,
prev_fast_sync: None, prev_state_sync: None,
fast_sync_peer: None, state_sync_peer: None,
} }
} }
@ -59,13 +59,12 @@ impl StateSync {
&mut self, &mut self,
header_head: &chain::Tip, header_head: &chain::Tip,
head: &chain::Tip, head: &chain::Tip,
tail: &chain::Tip,
highest_height: u64, highest_height: u64,
) -> bool { ) -> bool {
let need_state_sync = trace!("state_sync: head.height: {}, tail.height: {}. header_head.height: {}, highest_height: {}",
highest_height.saturating_sub(head.height) > global::cut_through_horizon() as u64; head.height, tail.height, header_head.height, highest_height,
if !need_state_sync { );
return false;
}
let mut sync_need_restart = false; let mut sync_need_restart = false;
@ -73,47 +72,63 @@ impl StateSync {
{ {
let clone = self.sync_state.sync_error(); let clone = self.sync_state.sync_error();
if let Some(ref sync_error) = *clone.read() { if let Some(ref sync_error) = *clone.read() {
error!("fast_sync: error = {:?}. restart fast sync", sync_error); error!("state_sync: error = {:?}. restart fast sync", sync_error);
sync_need_restart = true; sync_need_restart = true;
} }
drop(clone); drop(clone);
} }
// check peer connection status of this sync // check peer connection status of this sync
if let Some(ref peer) = self.fast_sync_peer { if let Some(ref peer) = self.state_sync_peer {
if let SyncStatus::TxHashsetDownload { .. } = self.sync_state.status() { if let SyncStatus::TxHashsetDownload { .. } = self.sync_state.status() {
if !peer.is_connected() { if !peer.is_connected() {
sync_need_restart = true; sync_need_restart = true;
info!( info!(
"fast_sync: peer connection lost: {:?}. restart", "state_sync: peer connection lost: {:?}. restart",
peer.info.addr, peer.info.addr,
); );
} }
} }
} }
if sync_need_restart { // if txhashset downloaded and validated successfully, we switch to BodySync state,
self.fast_sync_reset(); // and we need call state_sync_reset() to make it ready for next possible state sync.
let done = if let SyncStatus::TxHashsetDone = self.sync_state.status() {
self.sync_state.update(SyncStatus::BodySync {
current_height: 0,
highest_height: 0,
});
true
} else {
false
};
if sync_need_restart || done {
self.state_sync_reset();
self.sync_state.clear_sync_error(); self.sync_state.clear_sync_error();
} }
if done {
return false;
}
// run fast sync if applicable, normally only run one-time, except restart in error // run fast sync if applicable, normally only run one-time, except restart in error
if header_head.height == highest_height { if header_head.height == highest_height {
let (go, download_timeout) = self.fast_sync_due(); let (go, download_timeout) = self.state_sync_due();
if let SyncStatus::TxHashsetDownload { .. } = self.sync_state.status() { if let SyncStatus::TxHashsetDownload { .. } = self.sync_state.status() {
if download_timeout { if download_timeout {
error!("fast_sync: TxHashsetDownload status timeout in 10 minutes!"); error!("state_sync: TxHashsetDownload status timeout in 10 minutes!");
self.sync_state self.sync_state
.set_sync_error(Error::P2P(p2p::Error::Timeout)); .set_sync_error(Error::P2P(p2p::Error::Timeout));
} }
} }
if go { if go {
self.fast_sync_peer = None; self.state_sync_peer = None;
match self.request_state(&header_head) { match self.request_state(&header_head) {
Ok(peer) => { Ok(peer) => {
self.fast_sync_peer = Some(peer); self.state_sync_peer = Some(peer);
} }
Err(e) => self.sync_state.set_sync_error(Error::P2P(e)), Err(e) => self.sync_state.set_sync_error(Error::P2P(e)),
} }
@ -141,28 +156,27 @@ impl StateSync {
} }
fn request_state(&self, header_head: &chain::Tip) -> Result<Arc<Peer>, p2p::Error> { fn request_state(&self, header_head: &chain::Tip) -> Result<Arc<Peer>, p2p::Error> {
let horizon = global::cut_through_horizon() as u64; let threshold = global::state_sync_threshold() as u64;
if let Some(peer) = self.peers.most_work_peer() { if let Some(peer) = self.peers.most_work_peer() {
// ask for txhashset at 90% of horizon, this still leaves time for download // ask for txhashset at state_sync_threshold
// and validation to happen and stay within horizon
let mut txhashset_head = self let mut txhashset_head = self
.chain .chain
.get_block_header(&header_head.prev_block_h) .get_block_header(&header_head.prev_block_h)
.unwrap(); .unwrap();
for _ in 0..(horizon - horizon / 10) { for _ in 0..threshold {
txhashset_head = self.chain.get_previous_header(&txhashset_head).unwrap(); txhashset_head = self.chain.get_previous_header(&txhashset_head).unwrap();
} }
let bhash = txhashset_head.hash(); let bhash = txhashset_head.hash();
debug!( debug!(
"fast_sync: before txhashset request, header head: {} / {}, txhashset_head: {} / {}", "state_sync: before txhashset request, header head: {} / {}, txhashset_head: {} / {}",
header_head.height, header_head.height,
header_head.last_block_h, header_head.last_block_h,
txhashset_head.height, txhashset_head.height,
bhash bhash
); );
if let Err(e) = peer.send_txhashset_request(txhashset_head.height, bhash) { if let Err(e) = peer.send_txhashset_request(txhashset_head.height, bhash) {
error!("fast_sync: send_txhashset_request err! {:?}", e); error!("state_sync: send_txhashset_request err! {:?}", e);
return Err(e); return Err(e);
} }
return Ok(peer.clone()); return Ok(peer.clone());
@ -171,13 +185,13 @@ impl StateSync {
} }
// For now this is a one-time thing (it can be slow) at initial startup. // For now this is a one-time thing (it can be slow) at initial startup.
fn fast_sync_due(&mut self) -> (bool, bool) { fn state_sync_due(&mut self) -> (bool, bool) {
let now = Utc::now(); let now = Utc::now();
let mut download_timeout = false; let mut download_timeout = false;
match self.prev_fast_sync { match self.prev_state_sync {
None => { None => {
self.prev_fast_sync = Some(now); self.prev_state_sync = Some(now);
(true, download_timeout) (true, download_timeout)
} }
Some(prev) => { Some(prev) => {
@ -189,8 +203,8 @@ impl StateSync {
} }
} }
fn fast_sync_reset(&mut self) { fn state_sync_reset(&mut self) {
self.prev_fast_sync = None; self.prev_state_sync = None;
self.fast_sync_peer = None; self.state_sync_peer = None;
} }
} }

View file

@ -141,13 +141,34 @@ impl SyncRunner {
// if syncing is needed // if syncing is needed
let head = self.chain.head().unwrap(); let head = self.chain.head().unwrap();
let tail = self.chain.tail().unwrap_or_else(|_| head.clone());
let header_head = self.chain.header_head().unwrap(); let header_head = self.chain.header_head().unwrap();
// run each sync stage, each of them deciding whether they're needed // run each sync stage, each of them deciding whether they're needed
// except for body sync that only runs if state sync is off or done // except for state sync that only runs if body sync return true (means txhashset is needed)
header_sync.check_run(&header_head, highest_height); header_sync.check_run(&header_head, highest_height);
if !state_sync.check_run(&header_head, &head, highest_height) {
body_sync.check_run(&head, highest_height); let mut check_state_sync = false;
match self.sync_state.status() {
SyncStatus::TxHashsetDownload { .. }
| SyncStatus::TxHashsetSetup
| SyncStatus::TxHashsetValidation { .. }
| SyncStatus::TxHashsetSave
| SyncStatus::TxHashsetDone => check_state_sync = true,
_ => {
// skip body sync if header chain is not synced.
if header_head.height < highest_height {
continue;
}
if body_sync.check_run(&head, highest_height) {
check_state_sync = true;
}
}
}
if check_state_sync {
state_sync.check_run(&header_head, &head, &tail, highest_height);
} }
} }
} }

View file

@ -25,7 +25,9 @@ extern crate log;
mod framework; mod framework;
use std::cmp;
use std::default::Default; use std::default::Default;
use std::process::exit;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::Arc; use std::sync::Arc;
use std::{thread, time}; use std::{thread, time};
@ -373,49 +375,509 @@ fn simulate_fast_sync() {
thread::sleep(time::Duration::from_millis(1_000)); thread::sleep(time::Duration::from_millis(1_000));
} }
/// Preparation:
/// Creates 6 disconnected servers: A, B, C, D, E and F, mine 80 blocks on A,
/// Compact server A.
/// Connect all servers, check all get state_sync_threshold full blocks using fast sync.
/// Disconnect all servers from each other.
///
/// Test case 1: nodes that just synced is able to handle forks of up to state_sync_threshold
/// Mine state_sync_threshold-7 blocks on A
/// Mine state_sync_threshold-1 blocks on C (long fork), connect C to server A
/// check server A can sync to C without txhashset download.
///
/// Test case 2: nodes with history in between state_sync_threshold and cut_through_horizon will
/// be able to handle forks larger than state_sync_threshold but not as large as cut_through_horizon.
/// Mine 20 blocks on A (then A has 59 blocks in local chain)
/// Mine cut_through_horizon-1 blocks on D (longer fork), connect D to servers A, then fork point
/// is at A's body head.height - 39, and 20 < 39 < 70.
/// check server A can sync without txhashset download.
///
/// Test case 3: nodes that have enough history is able to handle forks of up to cut_through_horizon
/// Mine cut_through_horizon+10 blocks on E, connect E to servers A and B
/// check server A can sync to E without txhashset download.
/// check server B can sync to E but need txhashset download.
///
/// Test case 4: nodes which had a success state sync can have a new state sync if needed.
/// Mine cut_through_horizon+20 blocks on F (longer fork than E), connect F to servers B
/// check server B can sync to F with txhashset download.
///
/// Test case 5: normal sync (not a fork) should not trigger a txhashset download
/// Mine cut_through_horizon-10 blocks on F, connect F to servers B
/// check server B can sync to F without txhashset download.
///
/// Test case 6: far behind sync (not a fork) should trigger a txhashset download
/// Mine cut_through_horizon+1 blocks on F, connect F to servers B
/// check server B can sync to F with txhashset download.
///
///
#[ignore] #[ignore]
#[test] #[test]
fn simulate_fast_sync_double() { fn simulate_long_fork() {
util::init_test_logger(); util::init_test_logger();
println!("starting simulate_long_fork");
// we actually set the chain_type in the ServerConfig below // we actually set the chain_type in the ServerConfig below
global::set_mining_mode(ChainTypes::AutomatedTesting); global::set_mining_mode(ChainTypes::AutomatedTesting);
framework::clean_all_output("grin-double-fast1"); let test_name_dir = "grin-long-fork";
framework::clean_all_output("grin-double-fast2"); framework::clean_all_output(test_name_dir);
let s1 = servers::Server::new(framework::config(3000, "grin-double-fast1", 3000)).unwrap(); let s = long_fork_test_preparation();
// mine a few blocks on server 1 for si in &s {
s1.start_test_miner(None, s1.stop.clone()); si.pause();
thread::sleep(time::Duration::from_secs(8)); }
thread::sleep(time::Duration::from_millis(1_000));
{ long_fork_test_case_1(&s);
let mut conf = config(3001, "grin-double-fast2", 3000); thread::sleep(time::Duration::from_millis(1_000));
conf.archive_mode = Some(false);
let s2 = servers::Server::new(conf).unwrap();
while s2.head().height != s2.header_head().height || s2.head().height < 20 {
thread::sleep(time::Duration::from_millis(1000));
}
s2.stop();
}
// locks files don't seem to be cleaned properly until process exit
std::fs::remove_file("target/tmp/grin-double-fast2/grin-sync-1001/chain/LOCK").unwrap();
std::fs::remove_file("target/tmp/grin-double-fast2/grin-sync-1001/peers/LOCK").unwrap();
thread::sleep(time::Duration::from_secs(20));
let mut conf = config(3001, "grin-double-fast2", 3000); long_fork_test_case_2(&s);
conf.archive_mode = Some(false); thread::sleep(time::Duration::from_millis(1_000));
let s2 = servers::Server::new(conf).unwrap();
while s2.head().height != s2.header_head().height || s2.head().height < 50 { long_fork_test_case_3(&s);
thread::sleep(time::Duration::from_millis(1000)); thread::sleep(time::Duration::from_millis(1_000));
long_fork_test_case_4(&s);
thread::sleep(time::Duration::from_millis(1_000));
long_fork_test_case_5(&s);
// Clean up
for si in &s {
si.stop();
} }
s1.stop();
s2.stop();
// wait servers fully stop before start next automated test // wait servers fully stop before start next automated test
thread::sleep(time::Duration::from_millis(1_000)); thread::sleep(time::Duration::from_millis(1_000));
} }
fn long_fork_test_preparation() -> Vec<servers::Server> {
println!("preparation: mine 80 blocks, create 6 servers and sync all of them");
let mut s: Vec<servers::Server> = vec![];
// start server A and mine 80 blocks to get beyond the fast sync horizon
let mut conf = framework::config(2100, "grin-long-fork", 2100);
conf.archive_mode = Some(false);
conf.api_secret_path = None;
let s0 = servers::Server::new(conf).unwrap();
thread::sleep(time::Duration::from_millis(1_000));
s.push(s0);
let stop = Arc::new(AtomicBool::new(false));
s[0].start_test_miner(None, stop.clone());
while s[0].head().height < global::cut_through_horizon() as u64 + 10 {
thread::sleep(time::Duration::from_millis(1_000));
}
s[0].stop_test_miner(stop);
thread::sleep(time::Duration::from_millis(1_000));
// Get the current header from s0.
let s0_header = s[0].chain.head().unwrap();
// check the tail after compacting
let _ = s[0].chain.compact();
let s0_tail = s[0].chain.tail().unwrap();
assert_eq!(
s0_header.height - global::cut_through_horizon() as u64,
s0_tail.height
);
for i in 1..6 {
let mut conf = config(2100 + i, "grin-long-fork", 2100);
conf.archive_mode = Some(false);
conf.api_secret_path = None;
let si = servers::Server::new(conf).unwrap();
s.push(si);
}
thread::sleep(time::Duration::from_millis(1_000));
// Wait for s[1..5] to sync up to and including the header from s0.
let mut total_wait = 0;
let mut min_height = 0;
while min_height < s0_header.height {
thread::sleep(time::Duration::from_millis(1_000));
total_wait += 1;
if total_wait >= 60 {
println!(
"simulate_long_fork (preparation) test fail on timeout! minimum height: {}, s0 height: {}",
min_height,
s0_header.height,
);
exit(1);
}
min_height = s0_header.height;
for i in 1..6 {
min_height = cmp::min(s[i].head().height, min_height);
}
}
// Confirm both s0 and s1 see a consistent header at that height.
let s1_header = s[1].chain.head().unwrap();
assert_eq!(s0_header, s1_header);
println!(
"preparation done. all 5 servers head.height: {}",
s0_header.height
);
// Wait for peers fully connection
let mut total_wait = 0;
let mut min_peers = 0;
while min_peers < 4 {
thread::sleep(time::Duration::from_millis(1_000));
total_wait += 1;
if total_wait >= 60 {
println!(
"simulate_long_fork (preparation) test fail on timeout! minimum connected peers: {}",
min_peers,
);
exit(1);
}
min_peers = 4;
for i in 0..5 {
let peers_connected = get_connected_peers(&"127.0.0.1".to_owned(), 22100 + i);
min_peers = cmp::min(min_peers, peers_connected.len());
}
}
return s;
}
fn long_fork_test_mining(blocks: u64, n: u16, s: &servers::Server) {
// Get the current header from node.
let sn_header = s.chain.head().unwrap();
// Mining
let stop = Arc::new(AtomicBool::new(false));
s.start_test_miner(None, stop.clone());
while s.head().height < sn_header.height + blocks {
thread::sleep(time::Duration::from_millis(1));
}
s.stop_test_miner(stop);
thread::sleep(time::Duration::from_millis(1_000));
println!(
"{} blocks mined on s{}. s{}.height: {} (old height: {})",
s.head().height - sn_header.height,
n,
n,
s.head().height,
sn_header.height,
);
let _ = s.chain.compact();
let sn_header = s.chain.head().unwrap();
let sn_tail = s.chain.tail().unwrap();
println!(
"after compacting, s{}.head().height: {}, s{}.tail().height: {}",
n, sn_header.height, n, sn_tail.height,
);
}
fn long_fork_test_case_1(s: &Vec<servers::Server>) {
println!("\ntest case 1 start");
// Mine state_sync_threshold-7 blocks on s0
long_fork_test_mining(global::state_sync_threshold() as u64 - 7, 0, &s[0]);
// Mine state_sync_threshold-1 blocks on s2 (long fork), a fork with more work than s0 chain
long_fork_test_mining(global::state_sync_threshold() as u64 - 1, 2, &s[2]);
let s2_header = s[2].chain.head().unwrap();
let s0_header = s[0].chain.head().unwrap();
let s0_tail = s[0].chain.tail().unwrap();
println!(
"test case 1: s0 start syncing with s2... s0.head().height: {}, s2.head().height: {}",
s0_header.height, s2_header.height,
);
s[0].resume();
s[2].resume();
// Check server s0 can sync to s2 without txhashset download.
let mut total_wait = 0;
while s[0].head().height < s2_header.height {
thread::sleep(time::Duration::from_millis(1_000));
total_wait += 1;
if total_wait >= 120 {
println!(
"test case 1: test fail on timeout! s0 height: {}, s2 height: {}",
s[0].head().height,
s2_header.height,
);
exit(1);
}
}
let s0_tail_new = s[0].chain.tail().unwrap();
assert_eq!(s0_tail_new.height, s0_tail.height);
println!(
"test case 1: s0.head().height: {}, s2_header.height: {}",
s[0].head().height,
s2_header.height,
);
assert_eq!(s[0].head().last_block_h, s2_header.last_block_h);
s[0].pause();
s[2].stop();
println!("test case 1 passed")
}
fn long_fork_test_case_2(s: &Vec<servers::Server>) {
println!("\ntest case 2 start");
// Mine 20 blocks on s0
long_fork_test_mining(20, 0, &s[0]);
// Mine cut_through_horizon-1 blocks on s3 (longer fork)
long_fork_test_mining(global::cut_through_horizon() as u64 - 1, 3, &s[3]);
let s3_header = s[3].chain.head().unwrap();
let s0_header = s[0].chain.head().unwrap();
let s0_tail = s[0].chain.tail().unwrap();
println!(
"test case 2: s0 start syncing with s3. s0.head().height: {}, s3.head().height: {}",
s0_header.height, s3_header.height,
);
s[0].resume();
s[3].resume();
// Check server s0 can sync to s3 without txhashset download.
let mut total_wait = 0;
while s[0].head().height < s3_header.height {
thread::sleep(time::Duration::from_millis(1_000));
total_wait += 1;
if total_wait >= 120 {
println!(
"test case 2: test fail on timeout! s0 height: {}, s3 height: {}",
s[0].head().height,
s3_header.height,
);
exit(1);
}
}
let s0_tail_new = s[0].chain.tail().unwrap();
assert_eq!(s0_tail_new.height, s0_tail.height);
assert_eq!(s[0].head().hash(), s3_header.hash());
let _ = s[0].chain.compact();
let s0_header = s[0].chain.head().unwrap();
let s0_tail = s[0].chain.tail().unwrap();
println!(
"test case 2: after compacting, s0.head().height: {}, s0.tail().height: {}",
s0_header.height, s0_tail.height,
);
s[0].pause();
s[3].stop();
println!("test case 2 passed")
}
fn long_fork_test_case_3(s: &Vec<servers::Server>) {
println!("\ntest case 3 start");
// Mine cut_through_horizon+1 blocks on s4
long_fork_test_mining(global::cut_through_horizon() as u64 + 10, 4, &s[4]);
let s4_header = s[4].chain.head().unwrap();
let s0_header = s[0].chain.head().unwrap();
let s0_tail = s[0].chain.tail().unwrap();
let s1_header = s[1].chain.head().unwrap();
let s1_tail = s[1].chain.tail().unwrap();
println!(
"test case 3: s0/1 start syncing with s4. s0.head().height: {}, s0.tail().height: {}, s1.head().height: {}, s1.tail().height: {}, s4.head().height: {}",
s0_header.height, s0_tail.height,
s1_header.height, s1_tail.height,
s4_header.height,
);
s[0].resume();
s[4].resume();
// Check server s0 can sync to s4.
let mut total_wait = 0;
while s[0].head().height < s4_header.height {
thread::sleep(time::Duration::from_millis(1_000));
total_wait += 1;
if total_wait >= 120 {
println!(
"test case 3: test fail on timeout! s0 height: {}, s4 height: {}",
s[0].head().height,
s4_header.height,
);
exit(1);
}
}
assert_eq!(s[0].head().hash(), s4_header.hash());
s[0].stop();
s[1].resume();
// Check server s1 can sync to s4 but with txhashset download.
let mut total_wait = 0;
while s[1].head().height < s4_header.height {
thread::sleep(time::Duration::from_millis(1_000));
total_wait += 1;
if total_wait >= 120 {
println!(
"test case 3: test fail on timeout! s1 height: {}, s4 height: {}",
s[1].head().height,
s4_header.height,
);
exit(1);
}
}
let s1_tail_new = s[1].chain.tail().unwrap();
println!(
"test case 3: s[1].tail().height: {}, old height: {}",
s1_tail_new.height, s1_tail.height
);
assert_ne!(s1_tail_new.height, s1_tail.height);
assert_eq!(s[1].head().hash(), s4_header.hash());
s[1].pause();
s[4].pause();
println!("test case 3 passed")
}
fn long_fork_test_case_4(s: &Vec<servers::Server>) {
println!("\ntest case 4 start");
let _ = s[1].chain.compact();
// Mine cut_through_horizon+20 blocks on s5 (longer fork than s4)
long_fork_test_mining(global::cut_through_horizon() as u64 + 20, 5, &s[5]);
let s5_header = s[5].chain.head().unwrap();
let s1_header = s[1].chain.head().unwrap();
let s1_tail = s[1].chain.tail().unwrap();
println!(
"test case 4: s1 start syncing with s5. s1.head().height: {}, s1.tail().height: {}, s5.head().height: {}",
s1_header.height, s1_tail.height,
s5_header.height,
);
s[1].resume();
s[5].resume();
// Check server s1 can sync to s5 with a new txhashset download.
let mut total_wait = 0;
while s[1].head().height < s5_header.height {
thread::sleep(time::Duration::from_millis(1_000));
total_wait += 1;
if total_wait >= 120 {
println!(
"test case 4: test fail on timeout! s1 height: {}, s5 height: {}",
s[1].head().height,
s5_header.height,
);
exit(1);
}
}
let s1_tail_new = s[1].chain.tail().unwrap();
println!(
"test case 4: s[1].tail().height: {}, old height: {}",
s1_tail_new.height, s1_tail.height
);
assert_ne!(s1_tail_new.height, s1_tail.height);
assert_eq!(s[1].head().hash(), s5_header.hash());
s[1].pause();
s[5].pause();
println!("test case 4 passed")
}
fn long_fork_test_case_5(s: &Vec<servers::Server>) {
println!("\ntest case 5 start");
let _ = s[1].chain.compact();
// Mine cut_through_horizon-10 blocks on s5
long_fork_test_mining(global::cut_through_horizon() as u64 - 10, 5, &s[5]);
let s5_header = s[5].chain.head().unwrap();
let s1_header = s[1].chain.head().unwrap();
let s1_tail = s[1].chain.tail().unwrap();
println!(
"test case 5: s1 start syncing with s5. s1.head().height: {}, s1.tail().height: {}, s5.head().height: {}",
s1_header.height, s1_tail.height,
s5_header.height,
);
s[1].resume();
s[5].resume();
// Check server s1 can sync to s5 without a txhashset download (normal body sync)
let mut total_wait = 0;
while s[1].head().height < s5_header.height {
thread::sleep(time::Duration::from_millis(1_000));
total_wait += 1;
if total_wait >= 120 {
println!(
"test case 5: test fail on timeout! s1 height: {}, s5 height: {}",
s[1].head().height,
s5_header.height,
);
exit(1);
}
}
let s1_tail_new = s[1].chain.tail().unwrap();
println!(
"test case 5: s[1].tail().height: {}, old height: {}",
s1_tail_new.height, s1_tail.height
);
assert_eq!(s1_tail_new.height, s1_tail.height);
assert_eq!(s[1].head().hash(), s5_header.hash());
s[1].pause();
s[5].pause();
println!("test case 5 passed")
}
fn long_fork_test_case_6(s: &Vec<servers::Server>) {
println!("\ntest case 6 start");
let _ = s[1].chain.compact();
// Mine cut_through_horizon+1 blocks on s5
long_fork_test_mining(global::cut_through_horizon() as u64 + 1, 5, &s[5]);
let s5_header = s[5].chain.head().unwrap();
let s1_header = s[1].chain.head().unwrap();
let s1_tail = s[1].chain.tail().unwrap();
println!(
"test case 6: s1 start syncing with s5. s1.head().height: {}, s1.tail().height: {}, s5.head().height: {}",
s1_header.height, s1_tail.height,
s5_header.height,
);
s[1].resume();
s[5].resume();
// Check server s1 can sync to s5 without a txhashset download (normal body sync)
let mut total_wait = 0;
while s[1].head().height < s5_header.height {
thread::sleep(time::Duration::from_millis(1_000));
total_wait += 1;
if total_wait >= 120 {
println!(
"test case 6: test fail on timeout! s1 height: {}, s5 height: {}",
s[1].head().height,
s5_header.height,
);
exit(1);
}
}
let s1_tail_new = s[1].chain.tail().unwrap();
println!(
"test case 6: s[1].tail().height: {}, old height: {}",
s1_tail_new.height, s1_tail.height
);
assert_eq!(s1_tail_new.height, s1_tail.height);
assert_eq!(s[1].head().hash(), s5_header.hash());
s[1].pause();
s[5].pause();
println!("test case 6 passed")
}
pub fn create_wallet( pub fn create_wallet(
dir: &str, dir: &str,
client: HTTPWalletClient, client: HTTPWalletClient,
@ -528,3 +990,14 @@ fn replicate_tx_fluff_failure() {
Ok(()) Ok(())
}).unwrap(); }).unwrap();
} }
fn get_connected_peers(
base_addr: &String,
api_server_port: u16,
) -> Vec<p2p::types::PeerInfoDisplay> {
let url = format!(
"http://{}:{}/v1/peers/connected",
base_addr, api_server_port
);
api::client::get::<Vec<p2p::types::PeerInfoDisplay>>(url.as_str(), None).unwrap()
}

View file

@ -125,7 +125,7 @@ impl TUIStatusListener for TUIStatusView {
let fin = Utc::now().timestamp_nanos(); let fin = Utc::now().timestamp_nanos();
let dur_ms = (fin - start) as f64 * NANO_TO_MILLIS; let dur_ms = (fin - start) as f64 * NANO_TO_MILLIS;
format!("Downloading {}(MB) chain state for fast sync: {}% at {:.1?}(kB/s), step 2/4", format!("Downloading {}(MB) chain state for state sync: {}% at {:.1?}(kB/s), step 2/4",
total_size / 1_000_000, total_size / 1_000_000,
percent, percent,
if dur_ms > 1.0f64 { downloaded_size as f64 / dur_ms as f64 } else { 0f64 }, if dur_ms > 1.0f64 { downloaded_size as f64 / dur_ms as f64 } else { 0f64 },
@ -135,7 +135,7 @@ impl TUIStatusListener for TUIStatusView {
let fin = Utc::now().timestamp_millis(); let fin = Utc::now().timestamp_millis();
let dur_secs = (fin - start) / 1000; let dur_secs = (fin - start) / 1000;
format!("Downloading chain state for fast sync. Waiting remote peer to start: {}s, step 2/4", format!("Downloading chain state for state sync. Waiting remote peer to start: {}s, step 2/4",
dur_secs, dur_secs,
) )
} }
@ -164,7 +164,10 @@ impl TUIStatusListener for TUIStatusView {
format!("Validating chain state: {}%, step 3/4", percent) format!("Validating chain state: {}%, step 3/4", percent)
} }
SyncStatus::TxHashsetSave => { SyncStatus::TxHashsetSave => {
"Finalizing chain state for fast sync, step 3/4".to_string() "Finalizing chain state for state sync, step 3/4".to_string()
}
SyncStatus::TxHashsetDone => {
"Finalized chain state for state sync, step 3/4".to_string()
} }
SyncStatus::BodySync { SyncStatus::BodySync {
current_height, current_height,