diff --git a/chain/src/chain.rs b/chain/src/chain.rs index d15bb8bf6..c959b0b78 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -25,7 +25,7 @@ use util::RwLock; use lmdb; use lru_cache::LruCache; -use core::core::hash::{Hash, Hashed}; +use core::core::hash::{Hash, Hashed, ZERO_HASH}; use core::core::merkle_proof::MerkleProof; use core::core::verifier_cache::VerifierCache; use core::core::{Block, BlockHeader, BlockSums, Output, OutputIdentifier, Transaction, TxKernel}; @@ -700,6 +700,73 @@ impl Chain { Ok(()) } + /// Check chain status whether a txhashset downloading is needed + pub fn check_txhashset_needed(&self, caller: String, hashes: &mut Option>) -> bool { + let horizon = global::cut_through_horizon() as u64; + let body_head = self.head().unwrap(); + let header_head = self.header_head().unwrap(); + let sync_head = self.get_sync_head().unwrap(); + + debug!( + "{}: body_head - {}, {}, header_head - {}, {}, sync_head - {}, {}", + caller, + body_head.last_block_h, + body_head.height, + header_head.last_block_h, + header_head.height, + sync_head.last_block_h, + sync_head.height, + ); + + if body_head.total_difficulty >= header_head.total_difficulty { + debug!( + "{}: no need. header_head.total_difficulty: {} <= body_head.total_difficulty: {}", + caller, header_head.total_difficulty, body_head.total_difficulty, + ); + return false; + } + + let mut oldest_height = 0; + let mut oldest_hash = ZERO_HASH; + + let mut current = self.get_block_header(&header_head.last_block_h); + if current.is_err() { + error!( + "{}: header_head not found in chain db: {} at {}", + caller, header_head.last_block_h, header_head.height, + ); + } + + while let Ok(header) = current { + // break out of the while loop when we find a header common + // between the header chain and the current body chain + if let Ok(_) = self.is_on_current_chain(&header) { + break; + } + + oldest_height = header.height; + oldest_hash = header.hash(); + if let Some(hs) = hashes { + hs.push(oldest_hash); + } + current = self.get_previous_header(&header); + } + + if oldest_height < header_head.height.saturating_sub(horizon) { + if oldest_height > 0 { + debug!( + "{}: oldest block which is not on local chain: {} at {}", + caller, oldest_hash, oldest_height, + ); + return true; + } else { + error!("{}: something is wrong! oldest_height is 0", caller); + return false; + }; + } + return false; + } + /// Writes a reading view on a txhashset state that's been provided to us. /// If we're willing to accept that new state, the data stream will be /// read as a zip file, unzipped and the resulting state files should be @@ -712,13 +779,11 @@ impl Chain { ) -> Result<(), Error> { status.on_setup(); - // Initial check based on relative heights of current head and header_head. - { - let head = self.head().unwrap(); - let header_head = self.header_head().unwrap(); - if header_head.height - head.height < global::cut_through_horizon() as u64 { - return Err(ErrorKind::InvalidTxHashSet("not needed".to_owned()).into()); - } + // Initial check whether this txhashset is needed or not + let mut hashes: Option> = None; + if !self.check_txhashset_needed("txhashset_write".to_owned(), &mut hashes) { + warn!("txhashset_write: txhashset received but it's not needed! ignored."); + return Err(ErrorKind::InvalidTxHashSet("not needed".to_owned()).into()); } let header = self.get_block_header(&h)?; @@ -769,6 +834,9 @@ impl Chain { batch.save_body_head(&tip)?; batch.save_header_height(&header)?; batch.build_by_height_index(&header, true)?; + + // Reset the body tail to the body head after a txhashset write + batch.save_body_tail(&tip)?; } // Commit all the changes to the db. @@ -819,12 +887,13 @@ impl Chain { let horizon = global::cut_through_horizon() as u64; let head = self.head()?; + let tail = self.tail()?; let cutoff = head.height.saturating_sub(horizon); debug!( - "compact_blocks_db: head height: {}, horizon: {}, cutoff: {}", - head.height, horizon, cutoff, + "compact_blocks_db: head height: {}, tail height: {}, horizon: {}, cutoff: {}", + head.height, tail.height, horizon, cutoff, ); if cutoff == 0 { @@ -859,8 +928,13 @@ impl Chain { Err(e) => return Err(From::from(e)), } } + let tail = batch.get_header_by_height(head.height - horizon)?; + batch.save_body_tail(&Tip::from_header(&tail))?; batch.commit()?; - debug!("compact_blocks_db: removed {} blocks.", count); + debug!( + "compact_blocks_db: removed {} blocks. tail height: {}", + count, tail.height + ); Ok(()) } @@ -943,6 +1017,13 @@ impl Chain { .map_err(|e| ErrorKind::StoreErr(e, "chain head".to_owned()).into()) } + /// Tail of the block chain in this node after compact (cross-block cut-through) + pub fn tail(&self) -> Result { + self.store + .tail() + .map_err(|e| ErrorKind::StoreErr(e, "chain tail".to_owned()).into()) + } + /// Tip (head) of the header chain. pub fn header_head(&self) -> Result { self.store diff --git a/chain/src/pipe.rs b/chain/src/pipe.rs index 13213df6b..d6304661f 100644 --- a/chain/src/pipe.rs +++ b/chain/src/pipe.rs @@ -199,6 +199,10 @@ pub fn process_block(b: &Block, ctx: &mut BlockContext) -> Result, E // so we can maintain multiple (in progress) forks. add_block(b, &ctx.batch)?; + if ctx.batch.tail().is_err() { + update_body_tail(&b.header, &ctx.batch)?; + } + // Update the chain head if total work is increased. let res = update_head(b, ctx)?; Ok(res) @@ -552,6 +556,16 @@ fn add_block(b: &Block, batch: &store::Batch) -> Result<(), Error> { Ok(()) } +/// Update the block chain tail so we can know the exact tail of full blocks in this node +fn update_body_tail(bh: &BlockHeader, batch: &store::Batch) -> Result<(), Error> { + let tip = Tip::from_header(bh); + batch + .save_body_tail(&tip) + .map_err(|e| ErrorKind::StoreErr(e, "pipe save body tail".to_owned()))?; + debug!("body tail {} @ {}", bh.hash(), bh.height); + Ok(()) +} + /// Officially adds the block header to our header chain. fn add_block_header(bh: &BlockHeader, batch: &store::Batch) -> Result<(), Error> { batch diff --git a/chain/src/store.rs b/chain/src/store.rs index 6b8f1669f..5390f0a1c 100644 --- a/chain/src/store.rs +++ b/chain/src/store.rs @@ -36,6 +36,7 @@ const STORE_SUBPATH: &'static str = "chain"; const BLOCK_HEADER_PREFIX: u8 = 'h' as u8; const BLOCK_PREFIX: u8 = 'b' as u8; const HEAD_PREFIX: u8 = 'H' as u8; +const TAIL_PREFIX: u8 = 'T' as u8; const HEADER_HEAD_PREFIX: u8 = 'I' as u8; const SYNC_HEAD_PREFIX: u8 = 's' as u8; const HEADER_HEIGHT_PREFIX: u8 = '8' as u8; @@ -70,6 +71,10 @@ impl ChainStore { option_to_not_found(self.db.get_ser(&vec![HEAD_PREFIX]), "HEAD") } + pub fn tail(&self) -> Result { + option_to_not_found(self.db.get_ser(&vec![TAIL_PREFIX]), "TAIL") + } + /// Header of the block at the head of the block chain (not the same thing as header_head). pub fn head_header(&self) -> Result { self.get_block_header(&self.head()?.last_block_h) @@ -225,6 +230,10 @@ impl<'a> Batch<'a> { option_to_not_found(self.db.get_ser(&vec![HEAD_PREFIX]), "HEAD") } + pub fn tail(&self) -> Result { + option_to_not_found(self.db.get_ser(&vec![TAIL_PREFIX]), "TAIL") + } + /// Header of the block at the head of the block chain (not the same thing as header_head). pub fn head_header(&self) -> Result { self.get_block_header(&self.head()?.last_block_h) @@ -248,6 +257,10 @@ impl<'a> Batch<'a> { self.db.put_ser(&vec![HEAD_PREFIX], t) } + pub fn save_body_tail(&self, t: &Tip) -> Result<(), Error> { + self.db.put_ser(&vec![TAIL_PREFIX], t) + } + pub fn save_header_head(&self, t: &Tip) -> Result<(), Error> { self.db.put_ser(&vec![HEADER_HEAD_PREFIX], t) } diff --git a/core/src/consensus.rs b/core/src/consensus.rs index 4e1c53137..9c99c7ec3 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -92,6 +92,14 @@ pub const MAX_SECONDARY_SCALING: u64 = 8 << 11; /// easier to reason about. pub const CUT_THROUGH_HORIZON: u32 = WEEK_HEIGHT as u32; +/// Default number of blocks in the past to determine the height where we request +/// a txhashset (and full blocks from). Needs to be long enough to not overlap with +/// a long reorg. +/// Rational behind the value is the longest bitcoin fork was about 30 blocks, so 5h. +/// We add an order of magnitude to be safe and round to 2x24h of blocks to make it +/// easier to reason about. +pub const STATE_SYNC_THRESHOLD: u32 = 2 * DAY_HEIGHT as u32; + /// Weight of an input when counted against the max block weight capacity pub const BLOCK_INPUT_WEIGHT: usize = 1; diff --git a/core/src/global.rs b/core/src/global.rs index e3a2c6315..d4eb22406 100644 --- a/core/src/global.rs +++ b/core/src/global.rs @@ -20,7 +20,7 @@ use consensus::HeaderInfo; use consensus::{ graph_weight, BASE_EDGE_BITS, BLOCK_TIME_SEC, COINBASE_MATURITY, CUT_THROUGH_HORIZON, DAY_HEIGHT, DIFFICULTY_ADJUST_WINDOW, INITIAL_DIFFICULTY, PROOFSIZE, SECOND_POW_EDGE_BITS, - UNIT_DIFFICULTY + STATE_SYNC_THRESHOLD, UNIT_DIFFICULTY, }; use pow::{self, CuckatooContext, EdgeType, PoWContext}; /// An enum collecting sets of parameters used throughout the @@ -51,7 +51,10 @@ pub const AUTOMATED_TESTING_COINBASE_MATURITY: u64 = 3; pub const USER_TESTING_COINBASE_MATURITY: u64 = 3; /// Testing cut through horizon in blocks -pub const TESTING_CUT_THROUGH_HORIZON: u32 = 20; +pub const TESTING_CUT_THROUGH_HORIZON: u32 = 70; + +/// Testing state sync threshold in blocks +pub const TESTING_STATE_SYNC_THRESHOLD: u32 = 20; /// Testing initial graph weight pub const TESTING_INITIAL_GRAPH_WEIGHT: u32 = 1; @@ -237,6 +240,16 @@ pub fn cut_through_horizon() -> u32 { } } +/// Threshold at which we can request a txhashset (and full blocks from) +pub fn state_sync_threshold() -> u32 { + let param_ref = CHAIN_TYPE.read(); + match *param_ref { + ChainTypes::AutomatedTesting => TESTING_STATE_SYNC_THRESHOLD, + ChainTypes::UserTesting => TESTING_STATE_SYNC_THRESHOLD, + _ => STATE_SYNC_THRESHOLD, + } +} + /// Are we in automated testing mode? pub fn is_automated_testing_mode() -> bool { let param_ref = CHAIN_TYPE.read(); diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index 73027a79b..446f0bbc8 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -225,6 +225,7 @@ impl Peers { /// Unban a peer, checks if it exists and banned then unban pub fn unban_peer(&self, peer_addr: &SocketAddr) { + debug!("unban_peer: peer {}", peer_addr); match self.get_peer(*peer_addr) { Ok(_) => { if self.is_banned(*peer_addr) { diff --git a/p2p/src/serv.rs b/p2p/src/serv.rs index 1b5115844..2878a8995 100644 --- a/p2p/src/serv.rs +++ b/p2p/src/serv.rs @@ -39,6 +39,7 @@ pub struct Server { handshake: Arc, pub peers: Arc, stop: Arc, + pause: Arc, } // TODO TLS @@ -51,13 +52,15 @@ impl Server { adapter: Arc, genesis: Hash, stop: Arc, + pause: Arc, ) -> Result { Ok(Server { config: config.clone(), capabilities: capab, handshake: Arc::new(Handshake::new(genesis, config.clone())), peers: Arc::new(Peers::new(PeerStore::new(db_env)?, adapter, config)), - stop: stop, + stop, + pause, }) } @@ -71,6 +74,12 @@ impl Server { let sleep_time = Duration::from_millis(1); loop { + // Pause peer ingress connection request. Only for tests. + if self.pause.load(Ordering::Relaxed) { + thread::sleep(Duration::from_secs(1)); + continue; + } + match listener.accept() { Ok((stream, peer_addr)) => { if !self.check_banned(&stream) { @@ -185,6 +194,14 @@ impl Server { self.stop.store(true, Ordering::Relaxed); self.peers.stop(); } + + /// Pause means: stop all the current peers connection, only for tests. + /// Note: + /// 1. must pause the 'seed' thread also, to avoid the new egress peer connection + /// 2. must pause the 'p2p-server' thread also, to avoid the new ingress peer connection. + pub fn pause(&self) { + self.peers.stop(); + } } /// A no-op network adapter used for testing. diff --git a/p2p/tests/peer_handshake.rs b/p2p/tests/peer_handshake.rs index b71725aeb..223a9b4cd 100644 --- a/p2p/tests/peer_handshake.rs +++ b/p2p/tests/peer_handshake.rs @@ -58,6 +58,7 @@ fn peer_handshake() { net_adapter.clone(), Hash::from_vec(&vec![]), Arc::new(AtomicBool::new(false)), + Arc::new(AtomicBool::new(false)), ).unwrap(), ); diff --git a/servers/src/common/types.rs b/servers/src/common/types.rs index fe4a6e93a..e87249869 100644 --- a/servers/src/common/types.rs +++ b/servers/src/common/types.rs @@ -261,6 +261,8 @@ pub enum SyncStatus { }, /// Finalizing the new state TxHashsetSave, + /// State sync finalized + TxHashsetDone, /// Downloading blocks BodySync { current_height: u64, @@ -383,9 +385,6 @@ impl chain::TxHashsetWriteStatus for SyncState { } fn on_done(&self) { - self.update(SyncStatus::BodySync { - current_height: 0, - highest_height: 0, - }); + self.update(SyncStatus::TxHashsetDone); } } diff --git a/servers/src/grin/seed.rs b/servers/src/grin/seed.rs index 5f0f6b1f9..a5ea00db0 100644 --- a/servers/src/grin/seed.rs +++ b/servers/src/grin/seed.rs @@ -41,6 +41,7 @@ pub fn connect_and_monitor( seed_list: Box Vec + Send>, preferred_peers: Option>, stop: Arc, + pause: Arc, ) { let _ = thread::Builder::new() .name("seed".to_string()) @@ -65,6 +66,12 @@ pub fn connect_and_monitor( let mut start_attempt = 0; while !stop.load(Ordering::Relaxed) { + // Pause egress peer connection request. Only for tests. + if pause.load(Ordering::Relaxed) { + thread::sleep(time::Duration::from_secs(1)); + continue; + } + // Check for and remove expired peers from the storage if Utc::now() - prev_expire_check > Duration::hours(1) { peers.remove_expired(); diff --git a/servers/src/grin/server.rs b/servers/src/grin/server.rs index 90e40fffe..255ce1c41 100644 --- a/servers/src/grin/server.rs +++ b/servers/src/grin/server.rs @@ -58,6 +58,8 @@ pub struct Server { state_info: ServerStateInfo, /// Stop flag pub stop: Arc, + /// Pause flag + pub pause: Arc, } impl Server { @@ -111,6 +113,7 @@ impl Server { }; let stop = Arc::new(AtomicBool::new(false)); + let pause = Arc::new(AtomicBool::new(false)); // Shared cache for verification results. // We cache rangeproof verification and kernel signature verification. @@ -173,6 +176,7 @@ impl Server { net_adapter.clone(), genesis.hash(), stop.clone(), + pause.clone(), )?); chain_adapter.init(p2p_server.peers.clone()); pool_net_adapter.init(p2p_server.peers.clone()); @@ -203,6 +207,7 @@ impl Server { seeder, peers_preferred, stop.clone(), + pause.clone(), ); } @@ -254,6 +259,7 @@ impl Server { ..Default::default() }, stop, + pause, }) } @@ -425,6 +431,18 @@ impl Server { self.stop.store(true, Ordering::Relaxed); } + /// Pause the p2p server. + pub fn pause(&self) { + self.pause.store(true, Ordering::Relaxed); + thread::sleep(time::Duration::from_secs(1)); + self.p2p.pause(); + } + + /// Resume the p2p server. + pub fn resume(&self) { + self.pause.store(false, Ordering::Relaxed); + } + /// Stops the test miner without stopping the p2p layer pub fn stop_test_miner(&self, stop: Arc) { stop.store(true, Ordering::Relaxed); diff --git a/servers/src/grin/sync/body_sync.rs b/servers/src/grin/sync/body_sync.rs index 5e4f2994c..783c4aa61 100644 --- a/servers/src/grin/sync/body_sync.rs +++ b/servers/src/grin/sync/body_sync.rs @@ -19,8 +19,7 @@ use std::sync::Arc; use chain; use common::types::{SyncState, SyncStatus}; -use core::core::hash::Hashed; -use core::global; +use core::core::hash::Hash; use p2p; pub struct BodySync { @@ -50,83 +49,39 @@ impl BodySync { } } - /// Check whether a body sync is needed and run it if so + /// Check whether a body sync is needed and run it if so. + /// Return true if txhashset download is needed (when requested block is under the horizon). pub fn check_run(&mut self, head: &chain::Tip, highest_height: u64) -> bool { - // if fast_sync disabled or not needed, run the body_sync every 5s + // run the body_sync every 5s if self.body_sync_due() { - self.body_sync(); + if self.body_sync() { + return true; + } self.sync_state.update(SyncStatus::BodySync { current_height: head.height, highest_height: highest_height, }); - return true; } false } - fn body_sync(&mut self) { - let horizon = global::cut_through_horizon() as u64; - let body_head = self.chain.head().unwrap(); - let header_head = self.chain.header_head().unwrap(); - let sync_head = self.chain.get_sync_head().unwrap(); - - debug!( - "body_sync: body_head - {}, {}, header_head - {}, {}, sync_head - {}, {}", - body_head.last_block_h, - body_head.height, - header_head.last_block_h, - header_head.height, - sync_head.last_block_h, - sync_head.height, - ); - - let mut hashes = vec![]; - let mut oldest_height = 0; - - if header_head.total_difficulty > body_head.total_difficulty { - let mut current = self.chain.get_block_header(&header_head.last_block_h); - - //+ remove me after #1880 root cause found - if current.is_err() { - error!( - "body_sync: header_head not found in chain db: {} at {}", - header_head.last_block_h, header_head.height, - ); - } - //- - - while let Ok(header) = current { - // break out of the while loop when we find a header common - // between the header chain and the current body chain - if let Ok(_) = self.chain.is_on_current_chain(&header) { - break; - } - - hashes.push(header.hash()); - oldest_height = header.height; - current = self.chain.get_previous_header(&header); - } - } - //+ remove me after #1880 root cause found - else { + /// Return true if txhashset download is needed (when requested block is under the horizon). + fn body_sync(&mut self) -> bool { + let mut hashes: Option> = Some(vec![]); + if self + .chain + .check_txhashset_needed("body_sync".to_owned(), &mut hashes) + { debug!( - "body_sync: header_head.total_difficulty: {}, body_head.total_difficulty: {}", - header_head.total_difficulty, body_head.total_difficulty, + "body_sync: cannot sync full blocks earlier than horizon. will request txhashset", ); + return true; } - //- + let mut hashes = hashes.unwrap(); hashes.reverse(); - if oldest_height < header_head.height.saturating_sub(horizon) { - debug!( - "body_sync: cannot sync full blocks earlier than horizon. oldest_height: {}", - oldest_height, - ); - return; - } - let peers = self.peers.more_work_peers(); // if we have 5 peers to sync from then ask for 50 blocks total (peer_count * @@ -147,6 +102,9 @@ impl BodySync { .collect::>(); if hashes_to_get.len() > 0 { + let body_head = self.chain.head().unwrap(); + let header_head = self.chain.header_head().unwrap(); + debug!( "block_sync: {}/{} requesting blocks {:?} from {} peers", body_head.height, @@ -170,6 +128,7 @@ impl BodySync { } } } + return false; } // Should we run block body sync and ask for more full blocks? diff --git a/servers/src/grin/sync/header_sync.rs b/servers/src/grin/sync/header_sync.rs index 9d3049e9e..b5e239322 100644 --- a/servers/src/grin/sync/header_sync.rs +++ b/servers/src/grin/sync/header_sync.rs @@ -51,7 +51,9 @@ impl HeaderSync { } let enable_header_sync = match self.sync_state.status() { - SyncStatus::BodySync { .. } | SyncStatus::HeaderSync { .. } => true, + SyncStatus::BodySync { .. } + | SyncStatus::HeaderSync { .. } + | SyncStatus::TxHashsetDone => true, SyncStatus::NoSync | SyncStatus::Initial | SyncStatus::AwaitingPeers(_) => { // Reset sync_head to header_head on transition to HeaderSync, // but ONLY on initial transition to HeaderSync state. diff --git a/servers/src/grin/sync/state_sync.rs b/servers/src/grin/sync/state_sync.rs index dd6cd121b..e1c3fd022 100644 --- a/servers/src/grin/sync/state_sync.rs +++ b/servers/src/grin/sync/state_sync.rs @@ -33,8 +33,8 @@ pub struct StateSync { peers: Arc, chain: Arc, - prev_fast_sync: Option>, - fast_sync_peer: Option>, + prev_state_sync: Option>, + state_sync_peer: Option>, } impl StateSync { @@ -47,8 +47,8 @@ impl StateSync { sync_state, peers, chain, - prev_fast_sync: None, - fast_sync_peer: None, + prev_state_sync: None, + state_sync_peer: None, } } @@ -59,13 +59,12 @@ impl StateSync { &mut self, header_head: &chain::Tip, head: &chain::Tip, + tail: &chain::Tip, highest_height: u64, ) -> bool { - let need_state_sync = - highest_height.saturating_sub(head.height) > global::cut_through_horizon() as u64; - if !need_state_sync { - return false; - } + trace!("state_sync: head.height: {}, tail.height: {}. header_head.height: {}, highest_height: {}", + head.height, tail.height, header_head.height, highest_height, + ); let mut sync_need_restart = false; @@ -73,47 +72,63 @@ impl StateSync { { let clone = self.sync_state.sync_error(); if let Some(ref sync_error) = *clone.read() { - error!("fast_sync: error = {:?}. restart fast sync", sync_error); + error!("state_sync: error = {:?}. restart fast sync", sync_error); sync_need_restart = true; } drop(clone); } // check peer connection status of this sync - if let Some(ref peer) = self.fast_sync_peer { + if let Some(ref peer) = self.state_sync_peer { if let SyncStatus::TxHashsetDownload { .. } = self.sync_state.status() { if !peer.is_connected() { sync_need_restart = true; info!( - "fast_sync: peer connection lost: {:?}. restart", + "state_sync: peer connection lost: {:?}. restart", peer.info.addr, ); } } } - if sync_need_restart { - self.fast_sync_reset(); + // if txhashset downloaded and validated successfully, we switch to BodySync state, + // and we need call state_sync_reset() to make it ready for next possible state sync. + let done = if let SyncStatus::TxHashsetDone = self.sync_state.status() { + self.sync_state.update(SyncStatus::BodySync { + current_height: 0, + highest_height: 0, + }); + true + } else { + false + }; + + if sync_need_restart || done { + self.state_sync_reset(); self.sync_state.clear_sync_error(); } + if done { + return false; + } + // run fast sync if applicable, normally only run one-time, except restart in error if header_head.height == highest_height { - let (go, download_timeout) = self.fast_sync_due(); + let (go, download_timeout) = self.state_sync_due(); if let SyncStatus::TxHashsetDownload { .. } = self.sync_state.status() { if download_timeout { - error!("fast_sync: TxHashsetDownload status timeout in 10 minutes!"); + error!("state_sync: TxHashsetDownload status timeout in 10 minutes!"); self.sync_state .set_sync_error(Error::P2P(p2p::Error::Timeout)); } } if go { - self.fast_sync_peer = None; + self.state_sync_peer = None; match self.request_state(&header_head) { Ok(peer) => { - self.fast_sync_peer = Some(peer); + self.state_sync_peer = Some(peer); } Err(e) => self.sync_state.set_sync_error(Error::P2P(e)), } @@ -141,28 +156,27 @@ impl StateSync { } fn request_state(&self, header_head: &chain::Tip) -> Result, p2p::Error> { - let horizon = global::cut_through_horizon() as u64; + let threshold = global::state_sync_threshold() as u64; if let Some(peer) = self.peers.most_work_peer() { - // ask for txhashset at 90% of horizon, this still leaves time for download - // and validation to happen and stay within horizon + // ask for txhashset at state_sync_threshold let mut txhashset_head = self .chain .get_block_header(&header_head.prev_block_h) .unwrap(); - for _ in 0..(horizon - horizon / 10) { + for _ in 0..threshold { txhashset_head = self.chain.get_previous_header(&txhashset_head).unwrap(); } let bhash = txhashset_head.hash(); debug!( - "fast_sync: before txhashset request, header head: {} / {}, txhashset_head: {} / {}", + "state_sync: before txhashset request, header head: {} / {}, txhashset_head: {} / {}", header_head.height, header_head.last_block_h, txhashset_head.height, bhash ); if let Err(e) = peer.send_txhashset_request(txhashset_head.height, bhash) { - error!("fast_sync: send_txhashset_request err! {:?}", e); + error!("state_sync: send_txhashset_request err! {:?}", e); return Err(e); } return Ok(peer.clone()); @@ -171,13 +185,13 @@ impl StateSync { } // For now this is a one-time thing (it can be slow) at initial startup. - fn fast_sync_due(&mut self) -> (bool, bool) { + fn state_sync_due(&mut self) -> (bool, bool) { let now = Utc::now(); let mut download_timeout = false; - match self.prev_fast_sync { + match self.prev_state_sync { None => { - self.prev_fast_sync = Some(now); + self.prev_state_sync = Some(now); (true, download_timeout) } Some(prev) => { @@ -189,8 +203,8 @@ impl StateSync { } } - fn fast_sync_reset(&mut self) { - self.prev_fast_sync = None; - self.fast_sync_peer = None; + fn state_sync_reset(&mut self) { + self.prev_state_sync = None; + self.state_sync_peer = None; } } diff --git a/servers/src/grin/sync/syncer.rs b/servers/src/grin/sync/syncer.rs index 77cda73a0..3f6496c8f 100644 --- a/servers/src/grin/sync/syncer.rs +++ b/servers/src/grin/sync/syncer.rs @@ -141,13 +141,34 @@ impl SyncRunner { // if syncing is needed let head = self.chain.head().unwrap(); + let tail = self.chain.tail().unwrap_or_else(|_| head.clone()); let header_head = self.chain.header_head().unwrap(); // run each sync stage, each of them deciding whether they're needed - // except for body sync that only runs if state sync is off or done + // except for state sync that only runs if body sync return true (means txhashset is needed) header_sync.check_run(&header_head, highest_height); - if !state_sync.check_run(&header_head, &head, highest_height) { - body_sync.check_run(&head, highest_height); + + let mut check_state_sync = false; + match self.sync_state.status() { + SyncStatus::TxHashsetDownload { .. } + | SyncStatus::TxHashsetSetup + | SyncStatus::TxHashsetValidation { .. } + | SyncStatus::TxHashsetSave + | SyncStatus::TxHashsetDone => check_state_sync = true, + _ => { + // skip body sync if header chain is not synced. + if header_head.height < highest_height { + continue; + } + + if body_sync.check_run(&head, highest_height) { + check_state_sync = true; + } + } + } + + if check_state_sync { + state_sync.check_run(&header_head, &head, &tail, highest_height); } } } diff --git a/servers/tests/simulnet.rs b/servers/tests/simulnet.rs index ca65b7270..5be41f6b1 100644 --- a/servers/tests/simulnet.rs +++ b/servers/tests/simulnet.rs @@ -25,7 +25,9 @@ extern crate log; mod framework; +use std::cmp; use std::default::Default; +use std::process::exit; use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::{thread, time}; @@ -373,49 +375,509 @@ fn simulate_fast_sync() { thread::sleep(time::Duration::from_millis(1_000)); } +/// Preparation: +/// Creates 6 disconnected servers: A, B, C, D, E and F, mine 80 blocks on A, +/// Compact server A. +/// Connect all servers, check all get state_sync_threshold full blocks using fast sync. +/// Disconnect all servers from each other. +/// +/// Test case 1: nodes that just synced is able to handle forks of up to state_sync_threshold +/// Mine state_sync_threshold-7 blocks on A +/// Mine state_sync_threshold-1 blocks on C (long fork), connect C to server A +/// check server A can sync to C without txhashset download. +/// +/// Test case 2: nodes with history in between state_sync_threshold and cut_through_horizon will +/// be able to handle forks larger than state_sync_threshold but not as large as cut_through_horizon. +/// Mine 20 blocks on A (then A has 59 blocks in local chain) +/// Mine cut_through_horizon-1 blocks on D (longer fork), connect D to servers A, then fork point +/// is at A's body head.height - 39, and 20 < 39 < 70. +/// check server A can sync without txhashset download. +/// +/// Test case 3: nodes that have enough history is able to handle forks of up to cut_through_horizon +/// Mine cut_through_horizon+10 blocks on E, connect E to servers A and B +/// check server A can sync to E without txhashset download. +/// check server B can sync to E but need txhashset download. +/// +/// Test case 4: nodes which had a success state sync can have a new state sync if needed. +/// Mine cut_through_horizon+20 blocks on F (longer fork than E), connect F to servers B +/// check server B can sync to F with txhashset download. +/// +/// Test case 5: normal sync (not a fork) should not trigger a txhashset download +/// Mine cut_through_horizon-10 blocks on F, connect F to servers B +/// check server B can sync to F without txhashset download. +/// +/// Test case 6: far behind sync (not a fork) should trigger a txhashset download +/// Mine cut_through_horizon+1 blocks on F, connect F to servers B +/// check server B can sync to F with txhashset download. +/// +/// #[ignore] #[test] -fn simulate_fast_sync_double() { +fn simulate_long_fork() { util::init_test_logger(); + println!("starting simulate_long_fork"); // we actually set the chain_type in the ServerConfig below global::set_mining_mode(ChainTypes::AutomatedTesting); - framework::clean_all_output("grin-double-fast1"); - framework::clean_all_output("grin-double-fast2"); + let test_name_dir = "grin-long-fork"; + framework::clean_all_output(test_name_dir); - let s1 = servers::Server::new(framework::config(3000, "grin-double-fast1", 3000)).unwrap(); - // mine a few blocks on server 1 - s1.start_test_miner(None, s1.stop.clone()); - thread::sleep(time::Duration::from_secs(8)); - - { - let mut conf = config(3001, "grin-double-fast2", 3000); - conf.archive_mode = Some(false); - let s2 = servers::Server::new(conf).unwrap(); - while s2.head().height != s2.header_head().height || s2.head().height < 20 { - thread::sleep(time::Duration::from_millis(1000)); - } - s2.stop(); + let s = long_fork_test_preparation(); + for si in &s { + si.pause(); } - // locks files don't seem to be cleaned properly until process exit - std::fs::remove_file("target/tmp/grin-double-fast2/grin-sync-1001/chain/LOCK").unwrap(); - std::fs::remove_file("target/tmp/grin-double-fast2/grin-sync-1001/peers/LOCK").unwrap(); - thread::sleep(time::Duration::from_secs(20)); + thread::sleep(time::Duration::from_millis(1_000)); - let mut conf = config(3001, "grin-double-fast2", 3000); - conf.archive_mode = Some(false); - let s2 = servers::Server::new(conf).unwrap(); - while s2.head().height != s2.header_head().height || s2.head().height < 50 { - thread::sleep(time::Duration::from_millis(1000)); + long_fork_test_case_1(&s); + thread::sleep(time::Duration::from_millis(1_000)); + + long_fork_test_case_2(&s); + thread::sleep(time::Duration::from_millis(1_000)); + + long_fork_test_case_3(&s); + thread::sleep(time::Duration::from_millis(1_000)); + + long_fork_test_case_4(&s); + thread::sleep(time::Duration::from_millis(1_000)); + + long_fork_test_case_5(&s); + + // Clean up + for si in &s { + si.stop(); } - s1.stop(); - s2.stop(); // wait servers fully stop before start next automated test thread::sleep(time::Duration::from_millis(1_000)); } +fn long_fork_test_preparation() -> Vec { + println!("preparation: mine 80 blocks, create 6 servers and sync all of them"); + + let mut s: Vec = vec![]; + + // start server A and mine 80 blocks to get beyond the fast sync horizon + let mut conf = framework::config(2100, "grin-long-fork", 2100); + conf.archive_mode = Some(false); + conf.api_secret_path = None; + let s0 = servers::Server::new(conf).unwrap(); + thread::sleep(time::Duration::from_millis(1_000)); + s.push(s0); + let stop = Arc::new(AtomicBool::new(false)); + s[0].start_test_miner(None, stop.clone()); + + while s[0].head().height < global::cut_through_horizon() as u64 + 10 { + thread::sleep(time::Duration::from_millis(1_000)); + } + s[0].stop_test_miner(stop); + thread::sleep(time::Duration::from_millis(1_000)); + + // Get the current header from s0. + let s0_header = s[0].chain.head().unwrap(); + + // check the tail after compacting + let _ = s[0].chain.compact(); + let s0_tail = s[0].chain.tail().unwrap(); + assert_eq!( + s0_header.height - global::cut_through_horizon() as u64, + s0_tail.height + ); + + for i in 1..6 { + let mut conf = config(2100 + i, "grin-long-fork", 2100); + conf.archive_mode = Some(false); + conf.api_secret_path = None; + let si = servers::Server::new(conf).unwrap(); + s.push(si); + } + thread::sleep(time::Duration::from_millis(1_000)); + + // Wait for s[1..5] to sync up to and including the header from s0. + let mut total_wait = 0; + let mut min_height = 0; + while min_height < s0_header.height { + thread::sleep(time::Duration::from_millis(1_000)); + total_wait += 1; + if total_wait >= 60 { + println!( + "simulate_long_fork (preparation) test fail on timeout! minimum height: {}, s0 height: {}", + min_height, + s0_header.height, + ); + exit(1); + } + min_height = s0_header.height; + for i in 1..6 { + min_height = cmp::min(s[i].head().height, min_height); + } + } + + // Confirm both s0 and s1 see a consistent header at that height. + let s1_header = s[1].chain.head().unwrap(); + assert_eq!(s0_header, s1_header); + println!( + "preparation done. all 5 servers head.height: {}", + s0_header.height + ); + + // Wait for peers fully connection + let mut total_wait = 0; + let mut min_peers = 0; + while min_peers < 4 { + thread::sleep(time::Duration::from_millis(1_000)); + total_wait += 1; + if total_wait >= 60 { + println!( + "simulate_long_fork (preparation) test fail on timeout! minimum connected peers: {}", + min_peers, + ); + exit(1); + } + min_peers = 4; + for i in 0..5 { + let peers_connected = get_connected_peers(&"127.0.0.1".to_owned(), 22100 + i); + min_peers = cmp::min(min_peers, peers_connected.len()); + } + } + + return s; +} + +fn long_fork_test_mining(blocks: u64, n: u16, s: &servers::Server) { + // Get the current header from node. + let sn_header = s.chain.head().unwrap(); + + // Mining + let stop = Arc::new(AtomicBool::new(false)); + s.start_test_miner(None, stop.clone()); + + while s.head().height < sn_header.height + blocks { + thread::sleep(time::Duration::from_millis(1)); + } + s.stop_test_miner(stop); + thread::sleep(time::Duration::from_millis(1_000)); + println!( + "{} blocks mined on s{}. s{}.height: {} (old height: {})", + s.head().height - sn_header.height, + n, + n, + s.head().height, + sn_header.height, + ); + + let _ = s.chain.compact(); + let sn_header = s.chain.head().unwrap(); + let sn_tail = s.chain.tail().unwrap(); + println!( + "after compacting, s{}.head().height: {}, s{}.tail().height: {}", + n, sn_header.height, n, sn_tail.height, + ); +} + +fn long_fork_test_case_1(s: &Vec) { + println!("\ntest case 1 start"); + + // Mine state_sync_threshold-7 blocks on s0 + long_fork_test_mining(global::state_sync_threshold() as u64 - 7, 0, &s[0]); + + // Mine state_sync_threshold-1 blocks on s2 (long fork), a fork with more work than s0 chain + long_fork_test_mining(global::state_sync_threshold() as u64 - 1, 2, &s[2]); + + let s2_header = s[2].chain.head().unwrap(); + let s0_header = s[0].chain.head().unwrap(); + let s0_tail = s[0].chain.tail().unwrap(); + println!( + "test case 1: s0 start syncing with s2... s0.head().height: {}, s2.head().height: {}", + s0_header.height, s2_header.height, + ); + s[0].resume(); + s[2].resume(); + + // Check server s0 can sync to s2 without txhashset download. + let mut total_wait = 0; + while s[0].head().height < s2_header.height { + thread::sleep(time::Duration::from_millis(1_000)); + total_wait += 1; + if total_wait >= 120 { + println!( + "test case 1: test fail on timeout! s0 height: {}, s2 height: {}", + s[0].head().height, + s2_header.height, + ); + exit(1); + } + } + let s0_tail_new = s[0].chain.tail().unwrap(); + assert_eq!(s0_tail_new.height, s0_tail.height); + println!( + "test case 1: s0.head().height: {}, s2_header.height: {}", + s[0].head().height, + s2_header.height, + ); + assert_eq!(s[0].head().last_block_h, s2_header.last_block_h); + + s[0].pause(); + s[2].stop(); + println!("test case 1 passed") +} + +fn long_fork_test_case_2(s: &Vec) { + println!("\ntest case 2 start"); + + // Mine 20 blocks on s0 + long_fork_test_mining(20, 0, &s[0]); + + // Mine cut_through_horizon-1 blocks on s3 (longer fork) + long_fork_test_mining(global::cut_through_horizon() as u64 - 1, 3, &s[3]); + let s3_header = s[3].chain.head().unwrap(); + let s0_header = s[0].chain.head().unwrap(); + let s0_tail = s[0].chain.tail().unwrap(); + println!( + "test case 2: s0 start syncing with s3. s0.head().height: {}, s3.head().height: {}", + s0_header.height, s3_header.height, + ); + s[0].resume(); + s[3].resume(); + + // Check server s0 can sync to s3 without txhashset download. + let mut total_wait = 0; + while s[0].head().height < s3_header.height { + thread::sleep(time::Duration::from_millis(1_000)); + total_wait += 1; + if total_wait >= 120 { + println!( + "test case 2: test fail on timeout! s0 height: {}, s3 height: {}", + s[0].head().height, + s3_header.height, + ); + exit(1); + } + } + let s0_tail_new = s[0].chain.tail().unwrap(); + assert_eq!(s0_tail_new.height, s0_tail.height); + assert_eq!(s[0].head().hash(), s3_header.hash()); + + let _ = s[0].chain.compact(); + let s0_header = s[0].chain.head().unwrap(); + let s0_tail = s[0].chain.tail().unwrap(); + println!( + "test case 2: after compacting, s0.head().height: {}, s0.tail().height: {}", + s0_header.height, s0_tail.height, + ); + + s[0].pause(); + s[3].stop(); + println!("test case 2 passed") +} + +fn long_fork_test_case_3(s: &Vec) { + println!("\ntest case 3 start"); + + // Mine cut_through_horizon+1 blocks on s4 + long_fork_test_mining(global::cut_through_horizon() as u64 + 10, 4, &s[4]); + + let s4_header = s[4].chain.head().unwrap(); + let s0_header = s[0].chain.head().unwrap(); + let s0_tail = s[0].chain.tail().unwrap(); + let s1_header = s[1].chain.head().unwrap(); + let s1_tail = s[1].chain.tail().unwrap(); + println!( + "test case 3: s0/1 start syncing with s4. s0.head().height: {}, s0.tail().height: {}, s1.head().height: {}, s1.tail().height: {}, s4.head().height: {}", + s0_header.height, s0_tail.height, + s1_header.height, s1_tail.height, + s4_header.height, + ); + s[0].resume(); + s[4].resume(); + + // Check server s0 can sync to s4. + let mut total_wait = 0; + while s[0].head().height < s4_header.height { + thread::sleep(time::Duration::from_millis(1_000)); + total_wait += 1; + if total_wait >= 120 { + println!( + "test case 3: test fail on timeout! s0 height: {}, s4 height: {}", + s[0].head().height, + s4_header.height, + ); + exit(1); + } + } + assert_eq!(s[0].head().hash(), s4_header.hash()); + + s[0].stop(); + s[1].resume(); + + // Check server s1 can sync to s4 but with txhashset download. + let mut total_wait = 0; + while s[1].head().height < s4_header.height { + thread::sleep(time::Duration::from_millis(1_000)); + total_wait += 1; + if total_wait >= 120 { + println!( + "test case 3: test fail on timeout! s1 height: {}, s4 height: {}", + s[1].head().height, + s4_header.height, + ); + exit(1); + } + } + let s1_tail_new = s[1].chain.tail().unwrap(); + println!( + "test case 3: s[1].tail().height: {}, old height: {}", + s1_tail_new.height, s1_tail.height + ); + assert_ne!(s1_tail_new.height, s1_tail.height); + assert_eq!(s[1].head().hash(), s4_header.hash()); + + s[1].pause(); + s[4].pause(); + println!("test case 3 passed") +} + +fn long_fork_test_case_4(s: &Vec) { + println!("\ntest case 4 start"); + + let _ = s[1].chain.compact(); + + // Mine cut_through_horizon+20 blocks on s5 (longer fork than s4) + long_fork_test_mining(global::cut_through_horizon() as u64 + 20, 5, &s[5]); + + let s5_header = s[5].chain.head().unwrap(); + let s1_header = s[1].chain.head().unwrap(); + let s1_tail = s[1].chain.tail().unwrap(); + println!( + "test case 4: s1 start syncing with s5. s1.head().height: {}, s1.tail().height: {}, s5.head().height: {}", + s1_header.height, s1_tail.height, + s5_header.height, + ); + s[1].resume(); + s[5].resume(); + + // Check server s1 can sync to s5 with a new txhashset download. + let mut total_wait = 0; + while s[1].head().height < s5_header.height { + thread::sleep(time::Duration::from_millis(1_000)); + total_wait += 1; + if total_wait >= 120 { + println!( + "test case 4: test fail on timeout! s1 height: {}, s5 height: {}", + s[1].head().height, + s5_header.height, + ); + exit(1); + } + } + let s1_tail_new = s[1].chain.tail().unwrap(); + println!( + "test case 4: s[1].tail().height: {}, old height: {}", + s1_tail_new.height, s1_tail.height + ); + assert_ne!(s1_tail_new.height, s1_tail.height); + assert_eq!(s[1].head().hash(), s5_header.hash()); + + s[1].pause(); + s[5].pause(); + + println!("test case 4 passed") +} + +fn long_fork_test_case_5(s: &Vec) { + println!("\ntest case 5 start"); + + let _ = s[1].chain.compact(); + + // Mine cut_through_horizon-10 blocks on s5 + long_fork_test_mining(global::cut_through_horizon() as u64 - 10, 5, &s[5]); + + let s5_header = s[5].chain.head().unwrap(); + let s1_header = s[1].chain.head().unwrap(); + let s1_tail = s[1].chain.tail().unwrap(); + println!( + "test case 5: s1 start syncing with s5. s1.head().height: {}, s1.tail().height: {}, s5.head().height: {}", + s1_header.height, s1_tail.height, + s5_header.height, + ); + s[1].resume(); + s[5].resume(); + + // Check server s1 can sync to s5 without a txhashset download (normal body sync) + let mut total_wait = 0; + while s[1].head().height < s5_header.height { + thread::sleep(time::Duration::from_millis(1_000)); + total_wait += 1; + if total_wait >= 120 { + println!( + "test case 5: test fail on timeout! s1 height: {}, s5 height: {}", + s[1].head().height, + s5_header.height, + ); + exit(1); + } + } + let s1_tail_new = s[1].chain.tail().unwrap(); + println!( + "test case 5: s[1].tail().height: {}, old height: {}", + s1_tail_new.height, s1_tail.height + ); + assert_eq!(s1_tail_new.height, s1_tail.height); + assert_eq!(s[1].head().hash(), s5_header.hash()); + + s[1].pause(); + s[5].pause(); + + println!("test case 5 passed") +} + +fn long_fork_test_case_6(s: &Vec) { + println!("\ntest case 6 start"); + + let _ = s[1].chain.compact(); + + // Mine cut_through_horizon+1 blocks on s5 + long_fork_test_mining(global::cut_through_horizon() as u64 + 1, 5, &s[5]); + + let s5_header = s[5].chain.head().unwrap(); + let s1_header = s[1].chain.head().unwrap(); + let s1_tail = s[1].chain.tail().unwrap(); + println!( + "test case 6: s1 start syncing with s5. s1.head().height: {}, s1.tail().height: {}, s5.head().height: {}", + s1_header.height, s1_tail.height, + s5_header.height, + ); + s[1].resume(); + s[5].resume(); + + // Check server s1 can sync to s5 without a txhashset download (normal body sync) + let mut total_wait = 0; + while s[1].head().height < s5_header.height { + thread::sleep(time::Duration::from_millis(1_000)); + total_wait += 1; + if total_wait >= 120 { + println!( + "test case 6: test fail on timeout! s1 height: {}, s5 height: {}", + s[1].head().height, + s5_header.height, + ); + exit(1); + } + } + let s1_tail_new = s[1].chain.tail().unwrap(); + println!( + "test case 6: s[1].tail().height: {}, old height: {}", + s1_tail_new.height, s1_tail.height + ); + assert_eq!(s1_tail_new.height, s1_tail.height); + assert_eq!(s[1].head().hash(), s5_header.hash()); + + s[1].pause(); + s[5].pause(); + + println!("test case 6 passed") +} + pub fn create_wallet( dir: &str, client: HTTPWalletClient, @@ -528,3 +990,14 @@ fn replicate_tx_fluff_failure() { Ok(()) }).unwrap(); } + +fn get_connected_peers( + base_addr: &String, + api_server_port: u16, +) -> Vec { + let url = format!( + "http://{}:{}/v1/peers/connected", + base_addr, api_server_port + ); + api::client::get::>(url.as_str(), None).unwrap() +} diff --git a/src/bin/tui/status.rs b/src/bin/tui/status.rs index 1d1794988..d123b9520 100644 --- a/src/bin/tui/status.rs +++ b/src/bin/tui/status.rs @@ -125,7 +125,7 @@ impl TUIStatusListener for TUIStatusView { let fin = Utc::now().timestamp_nanos(); let dur_ms = (fin - start) as f64 * NANO_TO_MILLIS; - format!("Downloading {}(MB) chain state for fast sync: {}% at {:.1?}(kB/s), step 2/4", + format!("Downloading {}(MB) chain state for state sync: {}% at {:.1?}(kB/s), step 2/4", total_size / 1_000_000, percent, if dur_ms > 1.0f64 { downloaded_size as f64 / dur_ms as f64 } else { 0f64 }, @@ -135,7 +135,7 @@ impl TUIStatusListener for TUIStatusView { let fin = Utc::now().timestamp_millis(); let dur_secs = (fin - start) / 1000; - format!("Downloading chain state for fast sync. Waiting remote peer to start: {}s, step 2/4", + format!("Downloading chain state for state sync. Waiting remote peer to start: {}s, step 2/4", dur_secs, ) } @@ -164,7 +164,10 @@ impl TUIStatusListener for TUIStatusView { format!("Validating chain state: {}%, step 3/4", percent) } SyncStatus::TxHashsetSave => { - "Finalizing chain state for fast sync, step 3/4".to_string() + "Finalizing chain state for state sync, step 3/4".to_string() + } + SyncStatus::TxHashsetDone => { + "Finalized chain state for state sync, step 3/4".to_string() } SyncStatus::BodySync { current_height,