diff --git a/Cargo.lock b/Cargo.lock index 6bd2825ce..e968f8a05 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -114,9 +114,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "backtrace" -version = "0.3.65" +version = "0.3.66" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11a17d453482a265fd5f8479f2a3f405566e6ca627837aaddb85af8b1ab8ef61" +checksum = "cab84319d616cfb654d03394f38ab7e6f0919e181b1b57e1fd15e7fb4077d9a7" dependencies = [ "addr2line", "cc", @@ -1177,9 +1177,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.12.1" +version = "0.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db0d4cf898abf0081f964436dc980e96670a0f36863e4b83aaacdb65c9d7ccc3" +checksum = "607c8a29735385251a339424dd462993c0fed8fa09d378f259377df08c126022" [[package]] name = "heck" @@ -1834,9 +1834,9 @@ dependencies = [ [[package]] name = "object" -version = "0.28.4" +version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e42c982f2d955fac81dd7e1d0e1426a7d702acd9c98d19ab01083a6a0328c424" +checksum = "21158b2c33aa6d4561f1c0a6ea283ca92bc54802a93b263e910746d679a7eb53" dependencies = [ "memchr", ] @@ -1849,9 +1849,9 @@ checksum = "4eae0151b9dacf24fcc170d9995e511669a082856a91f958a2fe380bfab3fb22" [[package]] name = "once_cell" -version = "1.12.1" +version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac8b1a9b2518dc799a2271eff1688707eb315f0d4697aa6b0871369ca4c4da55" +checksum = "18a6dbe30758c9f83eb00cbea4ac95966305f5a7772f3f42ebfc7fc7eddbd8e1" [[package]] name = "opaque-debug" @@ -2235,9 +2235,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.5.6" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d83f127d94bdbcda4c8cc2e50f6f84f4b611f69c902699ca385a39c3a75f9ff1" +checksum = "4c4eb3267174b8c6c2f654116623910a0fef09c4753f8dd83db29c48a0df988b" dependencies = [ "aho-corasick", "memchr", @@ -2246,9 +2246,9 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.6.26" +version = "0.6.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49b3de9ec5dc0a3417da371aab17d729997c15010e7fd24ff707773a33bddb64" +checksum = "a3f87b73ce11b1619a3c6332f45341e0047173771e8b8b73f87bfeefb7b56244" [[package]] name = "remove_dir_all" @@ -2410,9 +2410,9 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" [[package]] name = "serde" -version = "1.0.138" +version = "1.0.139" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1578c6245786b9d168c5447eeacfb96856573ca56c9d68fdcf394be134882a47" +checksum = "0171ebb889e45aa68b44aee0859b3eede84c6f5f5c228e6f140c0b2a0a46cad6" dependencies = [ "serde_derive", ] @@ -2429,9 +2429,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.138" +version = "1.0.139" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "023e9b1467aef8a10fb88f25611870ada9800ef7e22afce356bb0d2387b6f27c" +checksum = "dc1d3230c1de7932af58ad8ffbe1d784bd55efd5a9d84ac24f69c72d83543dfb" dependencies = [ "proc-macro2 1.0.40", "quote 1.0.20", @@ -2451,9 +2451,9 @@ dependencies = [ [[package]] name = "serde_yaml" -version = "0.8.24" +version = "0.8.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "707d15895415db6628332b737c838b88c598522e4dc70647e59b72312924aebc" +checksum = "1ec0091e1f5aa338283ce049bd9dfefd55e1f168ac233e85c1ffe0038fb48cbe" dependencies = [ "indexmap", "ryu", diff --git a/api/src/handlers/chain_api.rs b/api/src/handlers/chain_api.rs index 9fcb077f6..a79aad2c5 100644 --- a/api/src/handlers/chain_api.rs +++ b/api/src/handlers/chain_api.rs @@ -80,7 +80,7 @@ impl ChainResetHandler { pub fn reset_chain_head(&self, hash: Hash) -> Result<(), Error> { let chain = w(&self.chain)?; let header = chain.get_block_header(&hash)?; - chain.reset_chain_head(&header)?; + chain.reset_chain_head(&header, true)?; // Reset the sync status and clear out any sync error. w(&self.sync_state)?.reset(); diff --git a/chain/src/chain.rs b/chain/src/chain.rs index cd3db4300..25b24dc64 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -226,7 +226,14 @@ impl Chain { /// Reset both head and header_head to the provided header. /// Handles simple rewind and more complex fork scenarios. /// Used by the reset_chain_head owner api endpoint. - pub fn reset_chain_head>(&self, head: T) -> Result<(), Error> { + /// Caller can choose not to rewind headers, which can be used + /// during PIBD scenarios where it's desirable to restart the PIBD process + /// without re-downloading the header chain + pub fn reset_chain_head>( + &self, + head: T, + rewind_headers: bool, + ) -> Result<(), Error> { let head = head.into(); let mut header_pmmr = self.header_pmmr.write(); @@ -247,19 +254,44 @@ impl Chain { }, )?; - // If the rewind of full blocks was successful then we can rewind the header MMR. - // Rewind and reapply headers to reset the header MMR. - txhashset::header_extending(&mut header_pmmr, &mut batch, |ext, batch| { - self.rewind_and_apply_header_fork(&header, ext, batch)?; - batch.save_header_head(&head)?; - Ok(()) - })?; + if rewind_headers { + // If the rewind of full blocks was successful then we can rewind the header MMR. + // Rewind and reapply headers to reset the header MMR. + txhashset::header_extending(&mut header_pmmr, &mut batch, |ext, batch| { + self.rewind_and_apply_header_fork(&header, ext, batch)?; + batch.save_header_head(&head)?; + Ok(()) + })?; + } batch.commit()?; Ok(()) } + /// Reset prune lists (when PIBD resets and rolls back the + /// entire chain, the prune list needs to be manually wiped + /// as it's currently not included as part of rewind) + pub fn reset_prune_lists(&self) -> Result<(), Error> { + let mut header_pmmr = self.header_pmmr.write(); + let mut txhashset = self.txhashset.write(); + let mut batch = self.store.batch()?; + + txhashset::extending(&mut header_pmmr, &mut txhashset, &mut batch, |ext, _| { + let extension = &mut ext.extension; + extension.reset_prune_lists(); + Ok(()) + })?; + Ok(()) + } + + /// Reset PIBD head + pub fn reset_pibd_head(&self) -> Result<(), Error> { + let batch = self.store.batch()?; + batch.save_pibd_head(&self.genesis().into())?; + Ok(()) + } + /// Are we running with archive_mode enabled? pub fn archive_mode(&self) -> bool { self.archive_mode @@ -275,6 +307,11 @@ impl Chain { self.txhashset.clone() } + /// return genesis header + pub fn genesis(&self) -> BlockHeader { + self.genesis.clone() + } + /// Shared store instance. pub fn store(&self) -> Arc { self.store.clone() @@ -665,8 +702,15 @@ impl Chain { // ensure the view is consistent. txhashset::extending_readonly(&mut header_pmmr, &mut txhashset, |ext, batch| { self.rewind_and_apply_fork(&header, ext, batch)?; - ext.extension - .validate(&self.genesis, fast_validation, &NoStatus, &header)?; + ext.extension.validate( + &self.genesis, + fast_validation, + &NoStatus, + None, + None, + &header, + None, + )?; Ok(()) }) } @@ -867,21 +911,22 @@ impl Chain { /// instantiate desegmenter (in same lazy fashion as segmenter, though this should not be as /// expensive an operation) - pub fn desegmenter(&self, archive_header: &BlockHeader) -> Result { + pub fn desegmenter( + &self, + archive_header: &BlockHeader, + ) -> Result>>, Error> { // Use our cached desegmenter if we have one and the associated header matches. - if let Some(d) = self.pibd_desegmenter.read().as_ref() { + if let Some(d) = self.pibd_desegmenter.write().as_ref() { if d.header() == archive_header { - return Ok(d.clone()); + return Ok(self.pibd_desegmenter.clone()); } } - // If no desegmenter or headers don't match init - // TODO: (Check whether we can do this.. we *should* be able to modify this as the desegmenter - // is in flight and we cross a horizon boundary, but needs more thinking) + let desegmenter = self.init_desegmenter(archive_header)?; let mut cache = self.pibd_desegmenter.write(); *cache = Some(desegmenter.clone()); - return Ok(desegmenter); + Ok(self.pibd_desegmenter.clone()) } /// initialize a desegmenter, which is capable of extending the hashset by appending @@ -898,6 +943,7 @@ impl Chain { self.txhashset(), self.header_pmmr.clone(), header.clone(), + self.genesis.clone(), self.store.clone(), )) } @@ -923,6 +969,17 @@ impl Chain { self.get_header_by_height(txhashset_height) } + /// Return the Block Header at the txhashset horizon, considering only the + /// contents of the header PMMR + pub fn txhashset_archive_header_header_only(&self) -> Result { + let header_head = self.header_head()?; + let threshold = global::state_sync_threshold() as u64; + let archive_interval = global::txhashset_archive_interval(); + let mut txhashset_height = header_head.height.saturating_sub(threshold); + txhashset_height = txhashset_height.saturating_sub(txhashset_height % archive_interval); + self.get_header_by_height(txhashset_height) + } + // Special handling to make sure the whole kernel set matches each of its // roots in each block header, without truncation. We go back header by // header, rewind and check each root. This fixes a potential weakness in @@ -1028,7 +1085,7 @@ impl Chain { txhashset_data: File, status: &dyn TxHashsetWriteStatus, ) -> Result { - status.on_setup(); + status.on_setup(None, None, None, None); // Initial check whether this txhashset is needed or not let fork_point = self.fork_point()?; @@ -1068,7 +1125,7 @@ impl Chain { let header_pmmr = self.header_pmmr.read(); let batch = self.store.batch()?; - txhashset.verify_kernel_pos_index(&self.genesis, &header_pmmr, &batch)?; + txhashset.verify_kernel_pos_index(&self.genesis, &header_pmmr, &batch, None, None)?; } // all good, prepare a new batch and update all the required records @@ -1087,7 +1144,7 @@ impl Chain { // Validate the extension, generating the utxo_sum and kernel_sum. // Full validation, including rangeproofs and kernel signature verification. let (utxo_sum, kernel_sum) = - extension.validate(&self.genesis, false, status, &header)?; + extension.validate(&self.genesis, false, status, None, None, &header, None)?; // Save the block_sums (utxo_sum, kernel_sum) to the db for use later. batch.save_block_sums( @@ -1161,6 +1218,7 @@ impl Chain { fn remove_historical_blocks( &self, header_pmmr: &txhashset::PMMRHandle, + archive_header: BlockHeader, batch: &store::Batch<'_>, ) -> Result<(), Error> { if self.archive_mode() { @@ -1181,7 +1239,6 @@ impl Chain { // TODO: Check this, compaction selects a different horizon // block from txhashset horizon/PIBD segmenter when using // Automated testing chain - let archive_header = self.txhashset_archive_header()?; if archive_header.height < cutoff { cutoff = archive_header.height; horizon = head.height - archive_header.height; @@ -1241,6 +1298,10 @@ impl Chain { } } + // Retrieve archive header here, so as not to attempt a read + // lock while removing historical blocks + let archive_header = self.txhashset_archive_header()?; + // Take a write lock on the txhashet and start a new writeable db batch. let header_pmmr = self.header_pmmr.read(); let mut txhashset = self.txhashset.write(); @@ -1260,7 +1321,7 @@ impl Chain { // If we are not in archival mode remove historical blocks from the db. if !self.archive_mode() { - self.remove_historical_blocks(&header_pmmr, &batch)?; + self.remove_historical_blocks(&header_pmmr, archive_header, &batch)?; } // Make sure our output_pos index is consistent with the UTXO set. @@ -1616,9 +1677,31 @@ fn setup_head( // Note: We are rewinding and validating against a writeable extension. // If validation is successful we will truncate the backend files // to match the provided block header. - let header = batch.get_block_header(&head.last_block_h)?; + let mut pibd_in_progress = false; + let header = { + let head = batch.get_block_header(&head.last_block_h)?; + let pibd_tip = store.pibd_head()?; + let pibd_head = batch.get_block_header(&pibd_tip.last_block_h)?; + if pibd_head.height > head.height { + pibd_in_progress = true; + pibd_head + } else { + head + } + }; let res = txhashset::extending(header_pmmr, txhashset, &mut batch, |ext, batch| { + // If we're still downloading via PIBD, don't worry about sums and validations just yet + // We still want to rewind to the last completed block to ensure a consistent state + if pibd_in_progress { + debug!( + "init: PIBD appears to be in progress at height {}, hash {}, not validating, will attempt to continue", + header.height, + header.hash() + ); + return Ok(()); + } + pipe::rewind_and_apply_fork(&header, ext, batch, &|_| Ok(()))?; let extension = &mut ext.extension; diff --git a/chain/src/error.rs b/chain/src/error.rs index 8ab2e2aff..3ed9b3c88 100644 --- a/chain/src/error.rs +++ b/chain/src/error.rs @@ -177,12 +177,19 @@ pub enum Error { /// Conversion source: segment::SegmentError, }, + /// We've decided to halt the PIBD process due to lack of supporting peers or + /// otherwise failing to progress for a certain amount of time + #[error("Aborting PIBD error")] + AbortingPIBDError, /// The segmenter is associated to a different block header #[error("Segmenter header mismatch")] SegmenterHeaderMismatch, /// Segment height not within allowed range #[error("Invalid segment height")] InvalidSegmentHeight, + /// Other issue with segment + #[error("Invalid segment: {0}")] + InvalidSegment(String), } impl Error { diff --git a/chain/src/lib.rs b/chain/src/lib.rs index 7d83ed5c2..1ece28456 100644 --- a/chain/src/lib.rs +++ b/chain/src/lib.rs @@ -38,6 +38,7 @@ use grin_util as util; mod chain; mod error; pub mod linked_list; +pub mod pibd_params; pub mod pipe; pub mod store; pub mod txhashset; diff --git a/chain/src/pibd_params.rs b/chain/src/pibd_params.rs new file mode 100644 index 000000000..44fa77693 --- /dev/null +++ b/chain/src/pibd_params.rs @@ -0,0 +1,45 @@ +// Copyright 2022 The Grin Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Set of static definitions for all parameters related to PIBD and Desegmentation +//! Note these are for experimentation via compilation, not meant to be exposed as +//! configuration parameters anywhere + +/// Bitmap segment height assumed for requests and segment calculation +pub const BITMAP_SEGMENT_HEIGHT: u8 = 9; + +/// Output segment height assumed for requests and segment calculation +pub const OUTPUT_SEGMENT_HEIGHT: u8 = 11; + +/// Rangeproof segment height assumed for requests and segment calculation +pub const RANGEPROOF_SEGMENT_HEIGHT: u8 = 11; + +/// Kernel segment height assumed for requests and segment calculation +pub const KERNEL_SEGMENT_HEIGHT: u8 = 11; + +/// Maximum number of received segments to cache (across all trees) before we stop requesting others +pub const MAX_CACHED_SEGMENTS: usize = 15; + +/// How long the state sync should wait after requesting a segment from a peer before +/// deciding the segment isn't going to arrive. The syncer will then re-request the segment +pub const SEGMENT_REQUEST_TIMEOUT_SECS: i64 = 60; + +/// Number of simultaneous requests for segments we should make. Note this is currently +/// divisible by 3 to try and evenly spread requests amount the 3 main MMRs (Bitmap segments +/// will always be requested first) +pub const SEGMENT_REQUEST_COUNT: usize = 15; + +/// If the syncer hasn't seen a max work peer that supports PIBD in this number of seconds +/// give up and revert back to the txhashset.zip download method +pub const TXHASHSET_ZIP_FALLBACK_TIME_SECS: i64 = 60; diff --git a/chain/src/store.rs b/chain/src/store.rs index 59c014d4a..e11d1f486 100644 --- a/chain/src/store.rs +++ b/chain/src/store.rs @@ -17,6 +17,7 @@ use crate::core::consensus::HeaderDifficultyInfo; use crate::core::core::hash::{Hash, Hashed}; use crate::core::core::{Block, BlockHeader, BlockSums}; +use crate::core::global; use crate::core::pow::Difficulty; use crate::core::ser::{DeserializationMode, ProtocolVersion, Readable, Writeable}; use crate::linked_list::MultiIndex; @@ -35,6 +36,7 @@ const BLOCK_HEADER_PREFIX: u8 = b'h'; const BLOCK_PREFIX: u8 = b'b'; const HEAD_PREFIX: u8 = b'H'; const TAIL_PREFIX: u8 = b'T'; +const PIBD_HEAD_PREFIX: u8 = b'I'; const HEADER_HEAD_PREFIX: u8 = b'G'; const OUTPUT_POS_PREFIX: u8 = b'p'; @@ -75,6 +77,18 @@ impl ChainStore { option_to_not_found(self.db.get_ser(&[TAIL_PREFIX], None), || "TAIL".to_owned()) } + /// The current PIBD head (will differ from the other heads. Return genesis block if PIBD head doesn't exist). + pub fn pibd_head(&self) -> Result { + let res = option_to_not_found(self.db.get_ser(&[PIBD_HEAD_PREFIX], None), || { + "PIBD_HEAD".to_owned() + }); + + match res { + Ok(r) => Ok(r), + Err(_) => Ok(Tip::from_header(&global::get_genesis_block().header)), + } + } + /// Header of the block at the head of the block chain (not the same thing as header_head). pub fn head_header(&self) -> Result { self.get_block_header(&self.head()?.last_block_h) @@ -201,6 +215,11 @@ impl<'a> Batch<'a> { self.db.put_ser(&[HEADER_HEAD_PREFIX], t) } + /// Save PIBD head to db. + pub fn save_pibd_head(&self, t: &Tip) -> Result<(), Error> { + self.db.put_ser(&[PIBD_HEAD_PREFIX], t) + } + /// get block pub fn get_block(&self, h: &Hash) -> Result { option_to_not_found(self.db.get_ser(&to_key(BLOCK_PREFIX, h), None), || { diff --git a/chain/src/txhashset/bitmap_accumulator.rs b/chain/src/txhashset/bitmap_accumulator.rs index 3bd42605e..b0d8a0bc1 100644 --- a/chain/src/txhashset/bitmap_accumulator.rs +++ b/chain/src/txhashset/bitmap_accumulator.rs @@ -194,10 +194,11 @@ impl BitmapAccumulator { /// Return a raw in-memory bitmap of this accumulator pub fn as_bitmap(&self) -> Result { let mut bitmap = Bitmap::create(); - for (chunk_count, chunk_index) in self.backend.leaf_idx_iter(0).enumerate() { + for (chunk_index, chunk_pos) in self.backend.leaf_pos_iter().enumerate() { //TODO: Unwrap - let chunk = self.backend.get_data(chunk_index).unwrap(); - bitmap.add_many(&chunk.set_iter(chunk_count * 1024).collect::>()); + let chunk = self.backend.get_data(chunk_pos as u64).unwrap(); + let additive = chunk.set_iter(chunk_index * 1024).collect::>(); + bitmap.add_many(&additive); } Ok(bitmap) } @@ -417,7 +418,11 @@ impl Writeable for BitmapBlock { writer.write_u8((length / BitmapChunk::LEN_BITS) as u8)?; let count_pos = self.inner.iter().filter(|&v| v).count() as u32; - let count_neg = Self::NBITS - count_pos; + + // Negative count needs to be adjusted if the block is not full, + // which affects the choice of serialization mode and size written + let count_neg = length as u32 - count_pos; + let threshold = Self::NBITS / 16; if count_pos < threshold { // Write positive indices @@ -517,17 +522,19 @@ mod tests { use rand::thread_rng; use std::io::Cursor; - fn test_roundtrip(entries: usize, inverse: bool, encoding: u8, length: usize) { + fn test_roundtrip(entries: usize, inverse: bool, encoding: u8, length: usize, n_blocks: usize) { let mut rng = thread_rng(); - let mut block = BitmapBlock::new(64); + let mut block = BitmapBlock::new(n_blocks); if inverse { block.inner.negate(); } + let range_size = n_blocks * BitmapChunk::LEN_BITS as usize; + // Flip `entries` bits in random spots let mut count = 0; while count < entries { - let idx = rng.gen_range(0, BitmapBlock::NBITS as usize); + let idx = rng.gen_range(0, range_size); if block.inner.get(idx).unwrap() == inverse { count += 1; block.inner.set(idx, !inverse); @@ -561,21 +568,35 @@ mod tests { fn block_ser_roundtrip() { let threshold = BitmapBlock::NBITS as usize / 16; let entries = thread_rng().gen_range(threshold, 4 * threshold); - test_roundtrip(entries, false, 0, 2 + BitmapBlock::NBITS as usize / 8); - test_roundtrip(entries, true, 0, 2 + BitmapBlock::NBITS as usize / 8); + test_roundtrip(entries, false, 0, 2 + BitmapBlock::NBITS as usize / 8, 64); + test_roundtrip(entries, true, 0, 2 + BitmapBlock::NBITS as usize / 8, 64); } #[test] fn sparse_block_ser_roundtrip() { let entries = thread_rng().gen_range(BitmapChunk::LEN_BITS, BitmapBlock::NBITS as usize / 16); - test_roundtrip(entries, false, 1, 4 + 2 * entries); + test_roundtrip(entries, false, 1, 4 + 2 * entries, 64); + } + + #[test] + fn sparse_unfull_block_ser_roundtrip() { + let entries = + thread_rng().gen_range(BitmapChunk::LEN_BITS, BitmapBlock::NBITS as usize / 16); + test_roundtrip(entries, false, 1, 4 + 2 * entries, 61); } #[test] fn abdundant_block_ser_roundtrip() { let entries = thread_rng().gen_range(BitmapChunk::LEN_BITS, BitmapBlock::NBITS as usize / 16); - test_roundtrip(entries, true, 2, 4 + 2 * entries); + test_roundtrip(entries, true, 2, 4 + 2 * entries, 64); + } + + #[test] + fn abdundant_unfull_block_ser_roundtrip() { + let entries = + thread_rng().gen_range(BitmapChunk::LEN_BITS, BitmapBlock::NBITS as usize / 16); + test_roundtrip(entries, true, 2, 4 + 2 * entries, 61); } } diff --git a/chain/src/txhashset/desegmenter.rs b/chain/src/txhashset/desegmenter.rs index 0c3c80cfa..0de446bc1 100644 --- a/chain/src/txhashset/desegmenter.rs +++ b/chain/src/txhashset/desegmenter.rs @@ -17,14 +17,20 @@ use std::sync::Arc; -use crate::core::core::hash::Hash; -use crate::core::core::pmmr; -use crate::core::core::{BlockHeader, OutputIdentifier, Segment, TxKernel}; +use crate::core::core::hash::{Hash, Hashed}; +use crate::core::core::{pmmr, pmmr::ReadablePMMR}; +use crate::core::core::{ + BlockHeader, BlockSums, OutputIdentifier, Segment, SegmentIdentifier, SegmentType, + SegmentTypeIdentifier, TxKernel, +}; use crate::error::Error; use crate::txhashset::{BitmapAccumulator, BitmapChunk, TxHashSet}; +use crate::types::{Tip, TxHashsetWriteStatus}; use crate::util::secp::pedersen::RangeProof; -use crate::util::RwLock; +use crate::util::{RwLock, StopState}; +use crate::SyncState; +use crate::pibd_params; use crate::store; use crate::txhashset; @@ -38,16 +44,30 @@ pub struct Desegmenter { archive_header: BlockHeader, store: Arc, + genesis: BlockHeader, + + default_bitmap_segment_height: u8, + default_output_segment_height: u8, + default_rangeproof_segment_height: u8, + default_kernel_segment_height: u8, + bitmap_accumulator: BitmapAccumulator, - bitmap_segments: Vec>, - output_segments: Vec>, - rangeproof_segments: Vec>, - kernel_segments: Vec>, + bitmap_segment_cache: Vec>, + output_segment_cache: Vec>, + rangeproof_segment_cache: Vec>, + kernel_segment_cache: Vec>, bitmap_mmr_leaf_count: u64, bitmap_mmr_size: u64, - // In-memory 'raw' bitmap corresponding to contents of bitmap accumulator + + /// Maximum number of segments to cache before we stop requesting others + max_cached_segments: usize, + + /// In-memory 'raw' bitmap corresponding to contents of bitmap accumulator bitmap_cache: Option, + + /// Flag indicating there are no more segments to request + all_segments_complete: bool, } impl Desegmenter { @@ -56,46 +76,491 @@ impl Desegmenter { txhashset: Arc>, header_pmmr: Arc>>, archive_header: BlockHeader, + genesis: BlockHeader, store: Arc, ) -> Desegmenter { + trace!("Creating new desegmenter"); let mut retval = Desegmenter { txhashset, header_pmmr, archive_header, store, + genesis, bitmap_accumulator: BitmapAccumulator::new(), - bitmap_segments: vec![], - output_segments: vec![], - rangeproof_segments: vec![], - kernel_segments: vec![], + default_bitmap_segment_height: pibd_params::BITMAP_SEGMENT_HEIGHT, + default_output_segment_height: pibd_params::OUTPUT_SEGMENT_HEIGHT, + default_rangeproof_segment_height: pibd_params::RANGEPROOF_SEGMENT_HEIGHT, + default_kernel_segment_height: pibd_params::KERNEL_SEGMENT_HEIGHT, + bitmap_segment_cache: vec![], + output_segment_cache: vec![], + rangeproof_segment_cache: vec![], + kernel_segment_cache: vec![], bitmap_mmr_leaf_count: 0, bitmap_mmr_size: 0, + max_cached_segments: pibd_params::MAX_CACHED_SEGMENTS, + bitmap_cache: None, + + all_segments_complete: false, }; retval.calc_bitmap_mmr_sizes(); retval } + /// Reset all state + pub fn reset(&mut self) { + self.all_segments_complete = false; + self.bitmap_segment_cache = vec![]; + self.output_segment_cache = vec![]; + self.rangeproof_segment_cache = vec![]; + self.kernel_segment_cache = vec![]; + self.bitmap_mmr_leaf_count = 0; + self.bitmap_mmr_size = 0; + self.bitmap_cache = None; + self.bitmap_accumulator = BitmapAccumulator::new(); + self.calc_bitmap_mmr_sizes(); + } + /// Return reference to the header used for validation pub fn header(&self) -> &BlockHeader { &self.archive_header } + /// Return size of bitmap mmr pub fn expected_bitmap_mmr_size(&self) -> u64 { self.bitmap_mmr_size } + /// Whether we have all the segments we need + pub fn is_complete(&self) -> bool { + self.all_segments_complete + } + + /// Check progress, update status if needed, returns true if all required + /// segments are in place + pub fn check_progress(&self, status: Arc) -> Result { + let mut latest_block_height = 0; + + let local_output_mmr_size; + let local_kernel_mmr_size; + let local_rangeproof_mmr_size; + { + let txhashset = self.txhashset.read(); + local_output_mmr_size = txhashset.output_mmr_size(); + local_kernel_mmr_size = txhashset.kernel_mmr_size(); + local_rangeproof_mmr_size = txhashset.rangeproof_mmr_size(); + } + + // going to try presenting PIBD progress as total leaves downloaded + // total segments probably doesn't make much sense since the segment + // sizes will be able to change over time, and representative block height + // can be too lopsided if one pmmr completes faster, so perhaps just + // use total leaves downloaded and display as a percentage + let completed_leaves = pmmr::n_leaves(local_output_mmr_size) + + pmmr::n_leaves(local_rangeproof_mmr_size) + + pmmr::n_leaves(local_kernel_mmr_size); + + // Find latest 'complete' header. + // First take lesser of rangeproof and output mmr sizes + let latest_output_size = std::cmp::min(local_output_mmr_size, local_rangeproof_mmr_size); + + // Find first header in which 'output_mmr_size' and 'kernel_mmr_size' are greater than + // given sizes + + let res = { + let header_pmmr = self.header_pmmr.read(); + header_pmmr.get_first_header_with( + latest_output_size, + local_kernel_mmr_size, + latest_block_height, + self.store.clone(), + ) + }; + + if let Some(h) = res { + latest_block_height = h.height; + + // TODO: Unwraps + let tip = Tip::from_header(&h); + let batch = self.store.batch()?; + batch.save_pibd_head(&tip)?; + batch.commit()?; + + status.update_pibd_progress( + false, + false, + completed_leaves, + latest_block_height, + &self.archive_header, + ); + if local_kernel_mmr_size == self.archive_header.kernel_mmr_size + && local_output_mmr_size == self.archive_header.output_mmr_size + && local_rangeproof_mmr_size == self.archive_header.output_mmr_size + && self.bitmap_cache.is_some() + { + // All is complete + return Ok(true); + } + } + + Ok(false) + } + + /// Once the PIBD set is downloaded, we need to ensure that the respective leaf sets + /// match the bitmap (particularly in the case of outputs being spent after a PIBD catch-up) + pub fn check_update_leaf_set_state(&self) -> Result<(), Error> { + let mut header_pmmr = self.header_pmmr.write(); + let mut txhashset = self.txhashset.write(); + let mut _batch = self.store.batch()?; + txhashset::extending(&mut header_pmmr, &mut txhashset, &mut _batch, |ext, _| { + let extension = &mut ext.extension; + if let Some(b) = &self.bitmap_cache { + extension.update_leaf_sets(&b)?; + } + Ok(()) + })?; + Ok(()) + } + + /// This is largely copied from chain.rs txhashset_write and related functions, + /// the idea being that the txhashset version will eventually be removed + pub fn validate_complete_state( + &self, + status: Arc, + stop_state: Arc, + ) -> Result<(), Error> { + // Quick root check first: + { + let txhashset = self.txhashset.read(); + txhashset.roots().validate(&self.archive_header)?; + } + + // TODO: Possibly Keep track of this in the DB so we can pick up where we left off if needed + let last_rangeproof_validation_pos = 0; + + // Validate kernel history + { + debug!("desegmenter validation: rewinding and validating kernel history (readonly)"); + let txhashset = self.txhashset.read(); + let mut count = 0; + let mut current = self.archive_header.clone(); + let total = current.height; + txhashset::rewindable_kernel_view(&txhashset, |view, batch| { + while current.height > 0 { + view.rewind(¤t)?; + view.validate_root()?; + current = batch.get_previous_header(¤t)?; + count += 1; + if current.height % 100000 == 0 || current.height == total { + status.on_setup(Some(total - current.height), Some(total), None, None); + } + if stop_state.is_stopped() { + return Ok(()); + } + } + Ok(()) + })?; + debug!( + "desegmenter validation: validated kernel root on {} headers", + count, + ); + } + + if stop_state.is_stopped() { + return Ok(()); + } + + // Check kernel MMR root for every block header. + // Check NRD relative height rules for full kernel history. + + { + let header_pmmr = self.header_pmmr.read(); + let txhashset = self.txhashset.read(); + let batch = self.store.batch()?; + txhashset.verify_kernel_pos_index( + &self.genesis, + &header_pmmr, + &batch, + Some(status.clone()), + Some(stop_state.clone()), + )?; + } + + if stop_state.is_stopped() { + return Ok(()); + } + + status.on_setup(None, None, None, None); + // Prepare a new batch and update all the required records + { + debug!("desegmenter validation: rewinding a 2nd time (writeable)"); + let mut header_pmmr = self.header_pmmr.write(); + let mut txhashset = self.txhashset.write(); + let mut batch = self.store.batch()?; + txhashset::extending( + &mut header_pmmr, + &mut txhashset, + &mut batch, + |ext, batch| { + let extension = &mut ext.extension; + extension.rewind(&self.archive_header, batch)?; + + // Validate the extension, generating the utxo_sum and kernel_sum. + // Full validation, including rangeproofs and kernel signature verification. + let (utxo_sum, kernel_sum) = extension.validate( + &self.genesis, + false, + &*status, + Some(last_rangeproof_validation_pos), + None, + &self.archive_header, + Some(stop_state.clone()), + )?; + + if stop_state.is_stopped() { + return Ok(()); + } + + // Save the block_sums (utxo_sum, kernel_sum) to the db for use later. + batch.save_block_sums( + &self.archive_header.hash(), + BlockSums { + utxo_sum, + kernel_sum, + }, + )?; + + Ok(()) + }, + )?; + + if stop_state.is_stopped() { + return Ok(()); + } + + debug!("desegmenter_validation: finished validating and rebuilding"); + status.on_save(); + + { + // Save the new head to the db and rebuild the header by height index. + let tip = Tip::from_header(&self.archive_header); + + batch.save_body_head(&tip)?; + + // Reset the body tail to the body head after a txhashset write + batch.save_body_tail(&tip)?; + } + + // Rebuild our output_pos index in the db based on fresh UTXO set. + txhashset.init_output_pos_index(&header_pmmr, &batch)?; + + // Rebuild our NRD kernel_pos index based on recent kernel history. + txhashset.init_recent_kernel_pos_index(&header_pmmr, &batch)?; + + // Commit all the changes to the db. + batch.commit()?; + + debug!("desegmenter_validation: finished committing the batch (head etc.)"); + + status.on_done(); + } + Ok(()) + } + + /// Apply next set of segments that are ready to be appended to their respective trees, + /// and kick off any validations that can happen. + pub fn apply_next_segments(&mut self) -> Result<(), Error> { + let next_bmp_idx = self.next_required_bitmap_segment_index(); + if let Some(bmp_idx) = next_bmp_idx { + if let Some((idx, _seg)) = self + .bitmap_segment_cache + .iter() + .enumerate() + .find(|s| s.1.identifier().idx == bmp_idx) + { + self.apply_bitmap_segment(idx)?; + } + } else { + // Check if we need to finalize bitmap + if self.bitmap_cache == None { + // Should have all the pieces now, finalize the bitmap cache + self.finalize_bitmap()?; + } + + // Check if we can apply the next output segment(s) + if let Some(next_output_idx) = self.next_required_output_segment_index() { + if let Some((idx, _seg)) = self + .output_segment_cache + .iter() + .enumerate() + .find(|s| s.1.identifier().idx == next_output_idx) + { + self.apply_output_segment(idx)?; + } + } else { + if self.output_segment_cache.len() >= self.max_cached_segments { + self.output_segment_cache = vec![]; + } + } + // Check if we can apply the next rangeproof segment + if let Some(next_rp_idx) = self.next_required_rangeproof_segment_index() { + if let Some((idx, _seg)) = self + .rangeproof_segment_cache + .iter() + .enumerate() + .find(|s| s.1.identifier().idx == next_rp_idx) + { + self.apply_rangeproof_segment(idx)?; + } + } else { + if self.rangeproof_segment_cache.len() >= self.max_cached_segments { + self.rangeproof_segment_cache = vec![]; + } + } + // Check if we can apply the next kernel segment + if let Some(next_kernel_idx) = self.next_required_kernel_segment_index() { + if let Some((idx, _seg)) = self + .kernel_segment_cache + .iter() + .enumerate() + .find(|s| s.1.identifier().idx == next_kernel_idx) + { + self.apply_kernel_segment(idx)?; + } + } else { + if self.kernel_segment_cache.len() >= self.max_cached_segments { + self.kernel_segment_cache = vec![]; + } + } + } + Ok(()) + } + + /// Return list of the next preferred segments the desegmenter needs based on + /// the current real state of the underlying elements + pub fn next_desired_segments(&mut self, max_elements: usize) -> Vec { + let mut return_vec = vec![]; + // First check for required bitmap elements + if self.bitmap_cache.is_none() { + // Get current size of bitmap MMR + let local_pmmr_size = self.bitmap_accumulator.readonly_pmmr().unpruned_size(); + // Get iterator over expected bitmap elements + let mut identifier_iter = SegmentIdentifier::traversal_iter( + self.bitmap_mmr_size, + self.default_bitmap_segment_height, + ); + // Advance iterator to next expected segment + while let Some(id) = identifier_iter.next() { + if id.segment_pos_range(self.bitmap_mmr_size).1 > local_pmmr_size { + if !self.has_bitmap_segment_with_id(id) { + return_vec.push(SegmentTypeIdentifier::new(SegmentType::Bitmap, id)); + if return_vec.len() >= max_elements { + return return_vec; + } + } + } + } + } else { + // We have all required bitmap segments and have recreated our local + // bitmap, now continue with other segments, evenly spreading requests + // among MMRs + let local_output_mmr_size; + let local_kernel_mmr_size; + let local_rangeproof_mmr_size; + { + let txhashset = self.txhashset.read(); + local_output_mmr_size = txhashset.output_mmr_size(); + local_kernel_mmr_size = txhashset.kernel_mmr_size(); + local_rangeproof_mmr_size = txhashset.rangeproof_mmr_size(); + } + // TODO: Fix, alternative approach, this is very inefficient + let mut output_identifier_iter = SegmentIdentifier::traversal_iter( + self.archive_header.output_mmr_size, + self.default_output_segment_height, + ); + + let mut elems_added = 0; + while let Some(output_id) = output_identifier_iter.next() { + // Advance output iterator to next needed position + let (_first, last) = + output_id.segment_pos_range(self.archive_header.output_mmr_size); + if last <= local_output_mmr_size { + continue; + } + if self.output_segment_cache.len() >= self.max_cached_segments { + break; + } + if !self.has_output_segment_with_id(output_id) { + return_vec.push(SegmentTypeIdentifier::new(SegmentType::Output, output_id)); + elems_added += 1; + } + if elems_added == max_elements / 3 { + break; + } + } + + let mut rangeproof_identifier_iter = SegmentIdentifier::traversal_iter( + self.archive_header.output_mmr_size, + self.default_rangeproof_segment_height, + ); + + elems_added = 0; + while let Some(rp_id) = rangeproof_identifier_iter.next() { + let (_first, last) = rp_id.segment_pos_range(self.archive_header.output_mmr_size); + // Advance rangeproof iterator to next needed position + if last <= local_rangeproof_mmr_size { + continue; + } + if self.rangeproof_segment_cache.len() >= self.max_cached_segments { + break; + } + if !self.has_rangeproof_segment_with_id(rp_id) { + return_vec.push(SegmentTypeIdentifier::new(SegmentType::RangeProof, rp_id)); + elems_added += 1; + } + if elems_added == max_elements / 3 { + break; + } + } + + let mut kernel_identifier_iter = SegmentIdentifier::traversal_iter( + self.archive_header.kernel_mmr_size, + self.default_kernel_segment_height, + ); + + elems_added = 0; + while let Some(k_id) = kernel_identifier_iter.next() { + // Advance kernel iterator to next needed position + let (_first, last) = k_id.segment_pos_range(self.archive_header.kernel_mmr_size); + // Advance rangeproof iterator to next needed position + if last <= local_kernel_mmr_size { + continue; + } + if self.kernel_segment_cache.len() >= self.max_cached_segments { + break; + } + if !self.has_kernel_segment_with_id(k_id) { + return_vec.push(SegmentTypeIdentifier::new(SegmentType::Kernel, k_id)); + elems_added += 1; + } + if elems_added == max_elements / 3 { + break; + } + } + } + if return_vec.is_empty() && self.bitmap_cache.is_some() { + self.all_segments_complete = true; + } + return_vec + } + /// 'Finalize' the bitmap accumulator, storing an in-memory copy of the bitmap for /// use in further validation and setting the accumulator on the underlying txhashset - /// TODO: Could be called automatically when we have the calculated number of - /// required segments for the archive header - /// TODO: Accumulator will likely need to be stored locally to deal with server - /// being shut down and restarted pub fn finalize_bitmap(&mut self) -> Result<(), Error> { - debug!( - "pibd_desgmenter: caching bitmap - accumulator root: {}", + trace!( + "pibd_desegmenter: finalizing and caching bitmap - accumulator root: {}", self.bitmap_accumulator.root() ); self.bitmap_cache = Some(self.bitmap_accumulator.as_bitmap()?); @@ -110,7 +575,6 @@ impl Desegmenter { &mut batch, |ext, _batch| { let extension = &mut ext.extension; - // TODO: Unwrap extension.set_bitmap_accumulator(self.bitmap_accumulator.clone()); Ok(()) }, @@ -124,29 +588,74 @@ impl Desegmenter { // Number of leaves (BitmapChunks) self.bitmap_mmr_leaf_count = (pmmr::n_leaves(self.archive_header.output_mmr_size) + 1023) / 1024; - debug!( - "pibd_desgmenter - expected number of leaves in bitmap MMR: {}", + trace!( + "pibd_desegmenter - expected number of leaves in bitmap MMR: {}", self.bitmap_mmr_leaf_count ); // Total size of Bitmap PMMR - self.bitmap_mmr_size = pmmr::peaks(self.bitmap_mmr_leaf_count) - .last() - .unwrap_or(&pmmr::insertion_to_pmmr_index(self.bitmap_mmr_leaf_count)) - .clone(); - debug!( - "pibd_desgmenter - expected size of bitmap MMR: {}", + self.bitmap_mmr_size = + 1 + pmmr::peaks(pmmr::insertion_to_pmmr_index(self.bitmap_mmr_leaf_count)) + .last() + .unwrap_or( + &(pmmr::peaks(pmmr::insertion_to_pmmr_index( + self.bitmap_mmr_leaf_count - 1, + )) + .last() + .unwrap()), + ) + .clone(); + + trace!( + "pibd_desegmenter - expected size of bitmap MMR: {}", self.bitmap_mmr_size ); } + /// Cache a bitmap segment if we don't already have it + fn cache_bitmap_segment(&mut self, in_seg: Segment) { + if self + .bitmap_segment_cache + .iter() + .find(|i| i.identifier() == in_seg.identifier()) + .is_none() + { + self.bitmap_segment_cache.push(in_seg); + } + } + + /// Whether our list already contains this bitmap segment + fn has_bitmap_segment_with_id(&self, seg_id: SegmentIdentifier) -> bool { + self.bitmap_segment_cache + .iter() + .find(|i| i.identifier() == seg_id) + .is_some() + } + + /// Return an identifier for the next segment we need for the bitmap pmmr + fn next_required_bitmap_segment_index(&self) -> Option { + let local_bitmap_pmmr_size = self.bitmap_accumulator.readonly_pmmr().unpruned_size(); + let cur_segment_count = SegmentIdentifier::count_segments_required( + local_bitmap_pmmr_size, + self.default_bitmap_segment_height, + ); + let total_segment_count = SegmentIdentifier::count_segments_required( + self.bitmap_mmr_size, + self.default_bitmap_segment_height, + ); + if cur_segment_count == total_segment_count { + None + } else { + Some(cur_segment_count as u64) + } + } + /// Adds and validates a bitmap chunk - /// TODO: Still experimenting, this expects chunks received to be in order pub fn add_bitmap_segment( &mut self, segment: Segment, output_root_hash: Hash, ) -> Result<(), Error> { - debug!("pibd_desegmenter: add bitmap segment"); + trace!("pibd_desegmenter: add bitmap segment"); segment.validate_with( self.bitmap_mmr_size, // Last MMR pos at the height being validated, in this case of the bitmap root None, @@ -155,7 +664,20 @@ impl Desegmenter { output_root_hash, // Other root true, )?; - // All okay, add leaves to bitmap accumulator + trace!("pibd_desegmenter: adding segment to cache"); + // All okay, add to our cached list of bitmap segments + self.cache_bitmap_segment(segment); + Ok(()) + } + + /// Apply a bitmap segment at the index cache + pub fn apply_bitmap_segment(&mut self, idx: usize) -> Result<(), Error> { + let segment = self.bitmap_segment_cache.remove(idx); + trace!( + "pibd_desegmenter: apply bitmap segment at segment idx {}", + segment.identifier().idx + ); + // Add leaves to bitmap accumulator let (_sid, _hash_pos, _hashes, _leaf_pos, leaf_data, _proof) = segment.parts(); for chunk in leaf_data.into_iter() { self.bitmap_accumulator.append_chunk(chunk)?; @@ -163,18 +685,33 @@ impl Desegmenter { Ok(()) } - /// Adds a output segment - /// TODO: Still experimenting, expects chunks received to be in order - pub fn add_output_segment(&self, segment: Segment) -> Result<(), Error> { - debug!("pibd_desegmenter: add output segment"); - segment.validate_with( - self.archive_header.output_mmr_size, // Last MMR pos at the height being validated - self.bitmap_cache.as_ref(), - self.archive_header.output_root, // Output root we're checking for - self.archive_header.output_mmr_size, - self.bitmap_accumulator.root(), // Other root - false, - )?; + /// Whether our list already contains this bitmap segment + fn has_output_segment_with_id(&self, seg_id: SegmentIdentifier) -> bool { + self.output_segment_cache + .iter() + .find(|i| i.identifier() == seg_id) + .is_some() + } + + /// Cache an output segment if we don't already have it + fn cache_output_segment(&mut self, in_seg: Segment) { + if self + .output_segment_cache + .iter() + .find(|i| i.identifier() == in_seg.identifier()) + .is_none() + { + self.output_segment_cache.push(in_seg); + } + } + + /// Apply an output segment at the index cache + pub fn apply_output_segment(&mut self, idx: usize) -> Result<(), Error> { + let segment = self.output_segment_cache.remove(idx); + trace!( + "pibd_desegmenter: applying output segment at segment idx {}", + segment.identifier().idx + ); let mut header_pmmr = self.header_pmmr.write(); let mut txhashset = self.txhashset.write(); let mut batch = self.store.batch()?; @@ -191,15 +728,103 @@ impl Desegmenter { Ok(()) } - /// Adds a Rangeproof segment - /// TODO: Still experimenting, expects chunks received to be in order - pub fn add_rangeproof_segment(&self, segment: Segment) -> Result<(), Error> { - debug!("pibd_desegmenter: add rangeproof segment"); - segment.validate( + /// Return an identifier for the next segment we need for the output pmmr + fn next_required_output_segment_index(&self) -> Option { + let local_output_mmr_size; + { + let txhashset = self.txhashset.read(); + local_output_mmr_size = txhashset.output_mmr_size(); + } + + // Special case here. If the mmr size is 1, this is a fresh chain + // with naught but a humble genesis block. We need segment 0, (and + // also need to skip the genesis block when applying the segment) + // note this is implementation-specific, the code for creating + // a new chain creates the genesis block pmmr entries by default + + let mut cur_segment_count = if local_output_mmr_size == 1 { + 0 + } else { + SegmentIdentifier::count_segments_required( + local_output_mmr_size, + self.default_output_segment_height, + ) + }; + + // When resuming, we need to ensure we're getting the previous segment if needed + let theoretical_pmmr_size = + SegmentIdentifier::pmmr_size(cur_segment_count, self.default_output_segment_height); + if local_output_mmr_size < theoretical_pmmr_size { + cur_segment_count -= 1; + } + + let total_segment_count = SegmentIdentifier::count_segments_required( + self.archive_header.output_mmr_size, + self.default_output_segment_height, + ); + trace!( + "Next required output segment is {} of {}", + cur_segment_count, + total_segment_count + ); + if cur_segment_count == total_segment_count { + None + } else { + Some(cur_segment_count as u64) + } + } + + /// Adds a output segment + pub fn add_output_segment( + &mut self, + segment: Segment, + _bitmap_root: Option, + ) -> Result<(), Error> { + trace!("pibd_desegmenter: add output segment"); + // TODO: This, something very wrong, probably need to reset entire body sync + // check bitmap root matches what we already have + /*if bitmap_root != Some(self.bitmap_accumulator.root()) { + + }*/ + segment.validate_with( self.archive_header.output_mmr_size, // Last MMR pos at the height being validated self.bitmap_cache.as_ref(), - self.archive_header.range_proof_root, // Range proof root we're checking for + self.archive_header.output_root, // Output root we're checking for + self.archive_header.output_mmr_size, + self.bitmap_accumulator.root(), // Other root + false, )?; + self.cache_output_segment(segment); + Ok(()) + } + + /// Whether our list already contains this rangeproof segment + fn has_rangeproof_segment_with_id(&self, seg_id: SegmentIdentifier) -> bool { + self.rangeproof_segment_cache + .iter() + .find(|i| i.identifier() == seg_id) + .is_some() + } + + /// Cache a RangeProof segment if we don't already have it + fn cache_rangeproof_segment(&mut self, in_seg: Segment) { + if self + .rangeproof_segment_cache + .iter() + .find(|i| i.identifier() == in_seg.identifier()) + .is_none() + { + self.rangeproof_segment_cache.push(in_seg); + } + } + + /// Apply a rangeproof segment at the index cache + pub fn apply_rangeproof_segment(&mut self, idx: usize) -> Result<(), Error> { + let segment = self.rangeproof_segment_cache.remove(idx); + trace!( + "pibd_desegmenter: applying rangeproof segment at segment idx {}", + segment.identifier().idx + ); let mut header_pmmr = self.header_pmmr.write(); let mut txhashset = self.txhashset.write(); let mut batch = self.store.batch()?; @@ -216,15 +841,89 @@ impl Desegmenter { Ok(()) } - /// Adds a Kernel segment - /// TODO: Still experimenting, expects chunks received to be in order - pub fn add_kernel_segment(&self, segment: Segment) -> Result<(), Error> { - debug!("pibd_desegmenter: add kernel segment"); + /// Return an identifier for the next segment we need for the rangeproof pmmr + fn next_required_rangeproof_segment_index(&self) -> Option { + let local_rangeproof_mmr_size; + { + let txhashset = self.txhashset.read(); + local_rangeproof_mmr_size = txhashset.rangeproof_mmr_size(); + } + + // Special case here. If the mmr size is 1, this is a fresh chain + // with naught but a humble genesis block. We need segment 0, (and + // also need to skip the genesis block when applying the segment) + + let mut cur_segment_count = if local_rangeproof_mmr_size == 1 { + 0 + } else { + SegmentIdentifier::count_segments_required( + local_rangeproof_mmr_size, + self.default_rangeproof_segment_height, + ) + }; + + // When resuming, we need to ensure we're getting the previous segment if needed + let theoretical_pmmr_size = + SegmentIdentifier::pmmr_size(cur_segment_count, self.default_rangeproof_segment_height); + if local_rangeproof_mmr_size < theoretical_pmmr_size { + cur_segment_count -= 1; + } + + let total_segment_count = SegmentIdentifier::count_segments_required( + self.archive_header.output_mmr_size, + self.default_rangeproof_segment_height, + ); + trace!( + "Next required rangeproof segment is {} of {}", + cur_segment_count, + total_segment_count + ); + if cur_segment_count == total_segment_count { + None + } else { + Some(cur_segment_count as u64) + } + } + + /// Adds a Rangeproof segment + pub fn add_rangeproof_segment(&mut self, segment: Segment) -> Result<(), Error> { + trace!("pibd_desegmenter: add rangeproof segment"); segment.validate( - self.archive_header.kernel_mmr_size, // Last MMR pos at the height being validated - None, - self.archive_header.kernel_root, // Kernel root we're checking for + self.archive_header.output_mmr_size, // Last MMR pos at the height being validated + self.bitmap_cache.as_ref(), + self.archive_header.range_proof_root, // Range proof root we're checking for )?; + self.cache_rangeproof_segment(segment); + Ok(()) + } + + /// Whether our list already contains this kernel segment + fn has_kernel_segment_with_id(&self, seg_id: SegmentIdentifier) -> bool { + self.kernel_segment_cache + .iter() + .find(|i| i.identifier() == seg_id) + .is_some() + } + + /// Cache a Kernel segment if we don't already have it + fn cache_kernel_segment(&mut self, in_seg: Segment) { + if self + .kernel_segment_cache + .iter() + .find(|i| i.identifier() == in_seg.identifier()) + .is_none() + { + self.kernel_segment_cache.push(in_seg); + } + } + + /// Apply a kernel segment at the index cache + pub fn apply_kernel_segment(&mut self, idx: usize) -> Result<(), Error> { + let segment = self.kernel_segment_cache.remove(idx); + trace!( + "pibd_desegmenter: applying kernel segment at segment idx {}", + segment.identifier().idx + ); let mut header_pmmr = self.header_pmmr.write(); let mut txhashset = self.txhashset.write(); let mut batch = self.store.batch()?; @@ -240,4 +939,56 @@ impl Desegmenter { )?; Ok(()) } + + /// Return an identifier for the next segment we need for the kernel pmmr + fn next_required_kernel_segment_index(&self) -> Option { + let local_kernel_mmr_size; + { + let txhashset = self.txhashset.read(); + local_kernel_mmr_size = txhashset.kernel_mmr_size(); + } + + let mut cur_segment_count = if local_kernel_mmr_size == 1 { + 0 + } else { + SegmentIdentifier::count_segments_required( + local_kernel_mmr_size, + self.default_kernel_segment_height, + ) + }; + + // When resuming, we need to ensure we're getting the previous segment if needed + let theoretical_pmmr_size = + SegmentIdentifier::pmmr_size(cur_segment_count, self.default_kernel_segment_height); + if local_kernel_mmr_size < theoretical_pmmr_size { + cur_segment_count -= 1; + } + + let total_segment_count = SegmentIdentifier::count_segments_required( + self.archive_header.kernel_mmr_size, + self.default_kernel_segment_height, + ); + trace!( + "Next required kernel segment is {} of {}", + cur_segment_count, + total_segment_count + ); + if cur_segment_count == total_segment_count { + None + } else { + Some(cur_segment_count as u64) + } + } + + /// Adds a Kernel segment + pub fn add_kernel_segment(&mut self, segment: Segment) -> Result<(), Error> { + trace!("pibd_desegmenter: add kernel segment"); + segment.validate( + self.archive_header.kernel_mmr_size, // Last MMR pos at the height being validated + None, + self.archive_header.kernel_root, // Kernel root we're checking for + )?; + self.cache_kernel_segment(segment); + Ok(()) + } } diff --git a/chain/src/txhashset/txhashset.rs b/chain/src/txhashset/txhashset.rs index 003b6c7fa..19f3f1339 100644 --- a/chain/src/txhashset/txhashset.rs +++ b/chain/src/txhashset/txhashset.rs @@ -34,9 +34,11 @@ use crate::txhashset::bitmap_accumulator::{BitmapAccumulator, BitmapChunk}; use crate::txhashset::{RewindableKernelView, UTXOView}; use crate::types::{CommitPos, OutputRoots, Tip, TxHashSetRoots, TxHashsetWriteStatus}; use crate::util::secp::pedersen::{Commitment, RangeProof}; -use crate::util::{file, secp_static, zip}; +use crate::util::{file, secp_static, zip, StopState}; +use crate::SyncState; use croaring::Bitmap; use grin_store::pmmr::{clean_files_by_prefix, PMMRBackend}; +use std::cmp::Ordering; use std::fs::{self, File}; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -50,6 +52,58 @@ const KERNEL_SUBDIR: &str = "kernel"; const TXHASHSET_ZIP: &str = "txhashset_snapshot"; +/// Convenience enum to keep track of hash and leaf insertions when rebuilding an mmr +/// from segments +#[derive(Eq)] +enum OrderedHashLeafNode { + /// index of data in hashes array, pmmr position + Hash(usize, u64), + /// index of data in leaf_data array, pmmr position + Leaf(usize, u64), +} + +impl PartialEq for OrderedHashLeafNode { + fn eq(&self, other: &Self) -> bool { + let a_val = match self { + OrderedHashLeafNode::Hash(_, pos0) => pos0, + OrderedHashLeafNode::Leaf(_, pos0) => pos0, + }; + let b_val = match other { + OrderedHashLeafNode::Hash(_, pos0) => pos0, + OrderedHashLeafNode::Leaf(_, pos0) => pos0, + }; + a_val == b_val + } +} + +impl Ord for OrderedHashLeafNode { + fn cmp(&self, other: &Self) -> Ordering { + let a_val = match self { + OrderedHashLeafNode::Hash(_, pos0) => pos0, + OrderedHashLeafNode::Leaf(_, pos0) => pos0, + }; + let b_val = match other { + OrderedHashLeafNode::Hash(_, pos0) => pos0, + OrderedHashLeafNode::Leaf(_, pos0) => pos0, + }; + a_val.cmp(&b_val) + } +} + +impl PartialOrd for OrderedHashLeafNode { + fn partial_cmp(&self, other: &Self) -> Option { + let a_val = match self { + OrderedHashLeafNode::Hash(_, pos0) => pos0, + OrderedHashLeafNode::Leaf(_, pos0) => pos0, + }; + let b_val = match other { + OrderedHashLeafNode::Hash(_, pos0) => pos0, + OrderedHashLeafNode::Leaf(_, pos0) => pos0, + }; + Some(a_val.cmp(b_val)) + } +} + /// Convenience wrapper around a single prunable MMR backend. pub struct PMMRHandle { /// The backend storage for the MMR. @@ -136,6 +190,30 @@ impl PMMRHandle { Err(Error::Other("failed to find head hash".to_string())) } } + + /// Get the first header with all output and kernel mmrs > provided + pub fn get_first_header_with( + &self, + output_pos: u64, + kernel_pos: u64, + from_height: u64, + store: Arc, + ) -> Option { + let mut cur_height = pmmr::round_up_to_leaf_pos(from_height); + let header_pmmr = ReadonlyPMMR::at(&self.backend, self.size); + let mut candidate: Option = None; + while let Some(header_entry) = header_pmmr.get_data(cur_height) { + if let Ok(bh) = store.get_block_header(&header_entry.hash()) { + if bh.output_mmr_size <= output_pos && bh.kernel_mmr_size <= kernel_pos { + candidate = Some(bh) + } else { + return candidate; + } + } + cur_height = pmmr::round_up_to_leaf_pos(cur_height + 1); + } + None + } } /// An easy to manipulate structure holding the 3 MMRs necessary to @@ -349,11 +427,6 @@ impl TxHashSet { .elements_from_pmmr_index(start_index, max_count, max_index) } - /// number of outputs - pub fn output_mmr_size(&self) -> u64 { - self.output_pmmr_h.size - } - /// As above, for rangeproofs pub fn rangeproofs_by_pmmr_index( &self, @@ -365,6 +438,21 @@ impl TxHashSet { .elements_from_pmmr_index(start_index, max_count, max_index) } + /// size of output mmr + pub fn output_mmr_size(&self) -> u64 { + self.output_pmmr_h.size + } + + /// size of kernel mmr + pub fn kernel_mmr_size(&self) -> u64 { + self.kernel_pmmr_h.size + } + + /// size of rangeproof mmr (can differ from output mmr size during PIBD sync) + pub fn rangeproof_mmr_size(&self) -> u64 { + self.rproof_pmmr_h.size + } + /// Find a kernel with a given excess. Work backwards from `max_index` to `min_index` /// NOTE: this linear search over all kernel history can be VERY expensive /// public API access to this method should be limited @@ -456,7 +544,7 @@ impl TxHashSet { let cutoff = head.height.saturating_sub(WEEK_HEIGHT * 2); let cutoff_hash = header_pmmr.get_header_hash_by_height(cutoff)?; let cutoff_header = batch.get_block_header(&cutoff_hash)?; - self.verify_kernel_pos_index(&cutoff_header, header_pmmr, batch) + self.verify_kernel_pos_index(&cutoff_header, header_pmmr, batch, None, None) } /// Verify and (re)build the NRD kernel_pos index from the provided header onwards. @@ -465,6 +553,8 @@ impl TxHashSet { from_header: &BlockHeader, header_pmmr: &PMMRHandle, batch: &Batch<'_>, + status: Option>, + stop_state: Option>, ) -> Result<(), Error> { if !global::is_nrd_enabled() { return Ok(()); @@ -493,6 +583,8 @@ impl TxHashSet { let mut current_pos = prev_size + 1; let mut current_header = from_header.clone(); let mut count = 0; + let total = pmmr::n_leaves(self.kernel_pmmr_h.size); + let mut applied = 0; while current_pos <= self.kernel_pmmr_h.size { if pmmr::is_leaf(current_pos - 1) { if let Some(kernel) = kernel_pmmr.get_data(current_pos - 1) { @@ -513,7 +605,19 @@ impl TxHashSet { _ => {} } } + applied += 1; + if let Some(ref s) = status { + if total % applied == 10000 { + s.on_setup(None, None, Some(applied), Some(total)); + } + } } + if let Some(ref s) = stop_state { + if s.is_stopped() { + return Ok(()); + } + } + current_pos += 1; } @@ -1029,6 +1133,7 @@ pub struct Extension<'a> { kernel_pmmr: PMMR<'a, TxKernel, PMMRBackend>, bitmap_accumulator: BitmapAccumulator, + bitmap_cache: Bitmap, /// Rollback flag. rollback: bool, @@ -1070,6 +1175,10 @@ impl<'a> Extension<'a> { rproof_pmmr: PMMR::at(&mut trees.rproof_pmmr_h.backend, trees.rproof_pmmr_h.size), kernel_pmmr: PMMR::at(&mut trees.kernel_pmmr_h.backend, trees.kernel_pmmr_h.size), bitmap_accumulator: trees.bitmap_accumulator.clone(), + bitmap_cache: trees + .bitmap_accumulator + .as_bitmap() + .unwrap_or(Bitmap::create()), rollback: false, } } @@ -1111,6 +1220,12 @@ impl<'a> Extension<'a> { self.rproof_pmmr.readonly_pmmr() } + /// Reset prune lists + pub fn reset_prune_lists(&mut self) { + self.output_pmmr.reset_prune_list(); + self.rproof_pmmr.reset_prune_list(); + } + /// Apply a new block to the current txhashet extension (output, rangeproof, kernel MMRs). /// Returns a vec of commit_pos representing the pos and height of the outputs spent /// by this block. @@ -1187,6 +1302,10 @@ impl<'a> Extension<'a> { /// Sets the bitmap accumulator (as received during PIBD sync) pub fn set_bitmap_accumulator(&mut self, accumulator: BitmapAccumulator) { self.bitmap_accumulator = accumulator; + self.bitmap_cache = self + .bitmap_accumulator + .as_bitmap() + .unwrap_or(Bitmap::create()); } // Prune output and rangeproof PMMRs based on provided pos. @@ -1244,29 +1363,132 @@ impl<'a> Extension<'a> { Ok(1 + output_pos) } - /// Apply an output segment to the output PMMR. must be called in order - /// TODO: Not complete - pub fn apply_output_segment( - &mut self, - segment: Segment, - ) -> Result<(), Error> { - let (_sid, _hash_pos, _hashes, _leaf_pos, leaf_data, _proof) = segment.parts(); - for output_identifier in leaf_data { - self.output_pmmr - .push(&output_identifier) - .map_err(&Error::TxHashSetErr)?; + /// Once the PIBD set is downloaded, we need to ensure that the respective leaf sets + /// match the bitmap (particularly in the case of outputs being spent after a PIBD catch-up) + pub fn update_leaf_sets(&mut self, bitmap: &Bitmap) -> Result<(), Error> { + let flipped = bitmap.flip(0..bitmap.maximum().unwrap() as u64 + 1); + for spent_pmmr_index in flipped.iter() { + let pos0 = pmmr::insertion_to_pmmr_index(spent_pmmr_index.into()); + self.output_pmmr.remove_from_leaf_set(pos0); + self.rproof_pmmr.remove_from_leaf_set(pos0); } Ok(()) } - /// Apply a rangeproof segment to the output PMMR. must be called in order - /// TODO: Not complete + /// Order and sort output segments and hashes, returning an array + /// of elements that can be applied in order to a pmmr + fn sort_pmmr_hashes_and_leaves( + &mut self, + hash_pos: Vec, + leaf_pos: Vec, + skip_leaf_position: Option, + ) -> Vec { + // Merge and into single array and sort into insertion order + let mut ordered_inserts = vec![]; + for (data_index, pos0) in leaf_pos.iter().enumerate() { + // Don't re-push genesis output, basically + if skip_leaf_position == Some(*pos0) { + continue; + } + ordered_inserts.push(OrderedHashLeafNode::Leaf(data_index, *pos0)); + } + for (data_index, pos0) in hash_pos.iter().enumerate() { + ordered_inserts.push(OrderedHashLeafNode::Hash(data_index, *pos0)); + } + ordered_inserts.sort(); + ordered_inserts + } + + /// Apply an output segment to the output PMMR. must be called in order + /// Sort and apply hashes and leaves within a segment to output pmmr, skipping over + /// genesis position. + /// NB: Would like to make this more generic but the hard casting of pmmrs + /// held by this struct makes it awkward to do so + + pub fn apply_output_segment( + &mut self, + segment: Segment, + ) -> Result<(), Error> { + let (_sid, hash_pos, hashes, leaf_pos, leaf_data, _proof) = segment.parts(); + + // insert either leaves or pruned subtrees as we go + for insert in self.sort_pmmr_hashes_and_leaves(hash_pos, leaf_pos, Some(0)) { + match insert { + OrderedHashLeafNode::Hash(idx, pos0) => { + if pos0 >= self.output_pmmr.size { + if self.output_pmmr.size == 1 { + // All initial outputs are spent up to this hash, + // Roll back the genesis output + self.output_pmmr + .rewind(0, &Bitmap::create()) + .map_err(&Error::TxHashSetErr)?; + } + self.output_pmmr + .push_pruned_subtree(hashes[idx], pos0) + .map_err(&Error::TxHashSetErr)?; + } + } + OrderedHashLeafNode::Leaf(idx, pos0) => { + if pos0 == self.output_pmmr.size { + self.output_pmmr + .push(&leaf_data[idx]) + .map_err(&Error::TxHashSetErr)?; + } + let pmmr_index = pmmr::pmmr_leaf_to_insertion_index(pos0); + match pmmr_index { + Some(i) => { + if !self.bitmap_cache.contains(i as u32) { + self.output_pmmr.remove_from_leaf_set(pos0); + } + } + None => {} + }; + } + } + } + Ok(()) + } + + /// Apply a rangeproof segment to the rangeproof PMMR. must be called in order + /// Sort and apply hashes and leaves within a segment to rangeproof pmmr, skipping over + /// genesis position. pub fn apply_rangeproof_segment(&mut self, segment: Segment) -> Result<(), Error> { - let (_sid, _hash_pos, _hashes, _leaf_pos, leaf_data, _proof) = segment.parts(); - for proof in leaf_data { - self.rproof_pmmr - .push(&proof) - .map_err(&Error::TxHashSetErr)?; + let (_sid, hash_pos, hashes, leaf_pos, leaf_data, _proof) = segment.parts(); + + // insert either leaves or pruned subtrees as we go + for insert in self.sort_pmmr_hashes_and_leaves(hash_pos, leaf_pos, Some(0)) { + match insert { + OrderedHashLeafNode::Hash(idx, pos0) => { + if pos0 >= self.rproof_pmmr.size { + if self.rproof_pmmr.size == 1 { + // All initial outputs are spent up to this hash, + // Roll back the genesis output + self.rproof_pmmr + .rewind(0, &Bitmap::create()) + .map_err(&Error::TxHashSetErr)?; + } + self.rproof_pmmr + .push_pruned_subtree(hashes[idx], pos0) + .map_err(&Error::TxHashSetErr)?; + } + } + OrderedHashLeafNode::Leaf(idx, pos0) => { + if pos0 == self.rproof_pmmr.size { + self.rproof_pmmr + .push(&leaf_data[idx]) + .map_err(&Error::TxHashSetErr)?; + } + let pmmr_index = pmmr::pmmr_leaf_to_insertion_index(pos0); + match pmmr_index { + Some(i) => { + if !self.bitmap_cache.contains(i as u32) { + self.rproof_pmmr.remove_from_leaf_set(pos0); + } + } + None => {} + }; + } + } } Ok(()) } @@ -1291,13 +1513,25 @@ impl<'a> Extension<'a> { } /// Apply a kernel segment to the output PMMR. must be called in order - /// TODO: Not complete pub fn apply_kernel_segment(&mut self, segment: Segment) -> Result<(), Error> { - let (_sid, _hash_pos, _hashes, _leaf_pos, leaf_data, _proof) = segment.parts(); - for kernel in leaf_data { - self.kernel_pmmr - .push(&kernel) - .map_err(&Error::TxHashSetErr)?; + let (_sid, _hash_pos, _hashes, leaf_pos, leaf_data, _proof) = segment.parts(); + // Non prunable - insert only leaves (with genesis kernel removedj) + for insert in self.sort_pmmr_hashes_and_leaves(vec![], leaf_pos, Some(0)) { + match insert { + OrderedHashLeafNode::Hash(_, _) => { + return Err(Error::InvalidSegment( + "Kernel PMMR is non-prunable, should not have hash data".to_string(), + ) + .into()); + } + OrderedHashLeafNode::Leaf(idx, pos0) => { + if pos0 == self.kernel_pmmr.size { + self.kernel_pmmr + .push(&leaf_data[idx]) + .map_err(&Error::TxHashSetErr)?; + } + } + } } Ok(()) } @@ -1346,7 +1580,8 @@ impl<'a> Extension<'a> { } /// Rewinds the MMRs to the provided block, rewinding to the last output pos - /// and last kernel pos of that block. + /// and last kernel pos of that block. If `updated_bitmap` is supplied, the + /// bitmap accumulator will be replaced with its contents pub fn rewind(&mut self, header: &BlockHeader, batch: &Batch<'_>) -> Result<(), Error> { debug!( "Rewind extension to {} at {} from {} at {}", @@ -1548,7 +1783,8 @@ impl<'a> Extension<'a> { Ok(()) } - /// Validate full kernel sums against the provided header (for overage and kernel_offset). + /// Validate full kernel sums against the provided header and unspent output bitmap + /// (for overage and kernel_offset). /// This is an expensive operation as we need to retrieve all the UTXOs and kernels /// from the respective MMRs. /// For a significantly faster way of validating full kernel sums see BlockSums. @@ -1579,7 +1815,10 @@ impl<'a> Extension<'a> { genesis: &BlockHeader, fast_validation: bool, status: &dyn TxHashsetWriteStatus, + output_start_pos: Option, + _kernel_start_pos: Option, header: &BlockHeader, + stop_state: Option>, ) -> Result<(Commitment, Commitment), Error> { self.validate_mmrs()?; self.validate_roots(header)?; @@ -1597,10 +1836,26 @@ impl<'a> Extension<'a> { // These are expensive verification step (skipped for "fast validation"). if !fast_validation { // Verify the rangeproof associated with each unspent output. - self.verify_rangeproofs(status)?; + self.verify_rangeproofs( + Some(status), + output_start_pos, + None, + false, + stop_state.clone(), + )?; + if let Some(ref s) = stop_state { + if s.is_stopped() { + return Err(Error::Stopped.into()); + } + } // Verify all the kernel signatures. - self.verify_kernel_signatures(status)?; + self.verify_kernel_signatures(status, stop_state.clone())?; + if let Some(ref s) = stop_state { + if s.is_stopped() { + return Err(Error::Stopped.into()); + } + } } Ok((output_sum, kernel_sum)) @@ -1643,7 +1898,11 @@ impl<'a> Extension<'a> { ) } - fn verify_kernel_signatures(&self, status: &dyn TxHashsetWriteStatus) -> Result<(), Error> { + fn verify_kernel_signatures( + &self, + status: &dyn TxHashsetWriteStatus, + stop_state: Option>, + ) -> Result<(), Error> { let now = Instant::now(); const KERNEL_BATCH_SIZE: usize = 5_000; @@ -1664,6 +1923,11 @@ impl<'a> Extension<'a> { kern_count += tx_kernels.len() as u64; tx_kernels.clear(); status.on_validation_kernels(kern_count, total_kernels); + if let Some(ref s) = stop_state { + if s.is_stopped() { + return Ok(()); + } + } debug!( "txhashset: verify_kernel_signatures: verified {} signatures", kern_count, @@ -1681,16 +1945,36 @@ impl<'a> Extension<'a> { Ok(()) } - fn verify_rangeproofs(&self, status: &dyn TxHashsetWriteStatus) -> Result<(), Error> { + fn verify_rangeproofs( + &self, + status: Option<&dyn TxHashsetWriteStatus>, + start_pos: Option, + batch_size: Option, + single_iter: bool, + stop_state: Option>, + ) -> Result { let now = Instant::now(); - let mut commits: Vec = Vec::with_capacity(1_000); - let mut proofs: Vec = Vec::with_capacity(1_000); + let batch_size = batch_size.unwrap_or(1_000); + + let mut commits: Vec = Vec::with_capacity(batch_size); + let mut proofs: Vec = Vec::with_capacity(batch_size); let mut proof_count = 0; + if let Some(s) = start_pos { + if let Some(i) = pmmr::pmmr_leaf_to_insertion_index(s) { + proof_count = self.output_pmmr.n_unpruned_leaves_to_index(i) as usize; + } + } + let total_rproofs = self.output_pmmr.n_unpruned_leaves(); for pos0 in self.output_pmmr.leaf_pos_iter() { + if let Some(p) = start_pos { + if pos0 < p { + continue; + } + } let output = self.output_pmmr.get_data(pos0); let proof = self.rproof_pmmr.get_data(pos0); @@ -1707,7 +1991,7 @@ impl<'a> Extension<'a> { proof_count += 1; - if proofs.len() >= 1_000 { + if proofs.len() >= batch_size { Output::batch_verify_proofs(&commits, &proofs)?; commits.clear(); proofs.clear(); @@ -1715,13 +1999,21 @@ impl<'a> Extension<'a> { "txhashset: verify_rangeproofs: verified {} rangeproofs", proof_count, ); - if proof_count % 1_000 == 0 { - status.on_validation_rproofs(proof_count, total_rproofs); + if let Some(s) = status { + s.on_validation_rproofs(proof_count as u64, total_rproofs); + } + if let Some(ref s) = stop_state { + if s.is_stopped() { + return Ok(pos0); + } + } + if single_iter { + return Ok(pos0); } } } - // remaining part which not full of 1000 range proofs + // remaining part which not full of batch_size range proofs if !proofs.is_empty() { Output::batch_verify_proofs(&commits, &proofs)?; commits.clear(); @@ -1738,7 +2030,7 @@ impl<'a> Extension<'a> { self.rproof_pmmr.unpruned_size(), now.elapsed().as_secs(), ); - Ok(()) + Ok(0) } } diff --git a/chain/src/types.rs b/chain/src/types.rs index e6d6f889e..7797acc35 100644 --- a/chain/src/types.rs +++ b/chain/src/types.rs @@ -15,9 +15,10 @@ //! Base types that the block chain pipeline requires. use chrono::prelude::{DateTime, Utc}; +use chrono::Duration; use crate::core::core::hash::{Hash, Hashed, ZERO_HASH}; -use crate::core::core::{Block, BlockHeader, HeaderVersion}; +use crate::core::core::{pmmr, Block, BlockHeader, HeaderVersion, SegmentTypeIdentifier}; use crate::core::pow::Difficulty; use crate::core::ser::{self, PMMRIndexHashable, Readable, Reader, Writeable, Writer}; use crate::error::Error; @@ -56,10 +57,38 @@ pub enum SyncStatus { /// diff of the most advanced peer highest_diff: Difficulty, }, + /// Performing PIBD reconstruction of txhashset + /// If PIBD syncer determines there's not enough + /// PIBD peers to continue, then move on to TxHashsetDownload state + TxHashsetPibd { + /// Whether the syncer has determined there's not enough + /// data to continue via PIBD + aborted: bool, + /// whether we got an error anywhere (in which case restart the process) + errored: bool, + /// total number of leaves applied + completed_leaves: u64, + /// total number of leaves required by archive header + leaves_required: u64, + /// 'height', i.e. last 'block' for which there is complete + /// pmmr data + completed_to_height: u64, + /// Total 'height' needed + required_height: u64, + }, /// Downloading the various txhashsets TxHashsetDownload(TxHashsetDownloadStats), /// Setting up before validation - TxHashsetSetup, + TxHashsetSetup { + /// number of 'headers' for which kernels have been checked + headers: Option, + /// headers total + headers_total: Option, + /// kernel position portion + kernel_pos: Option, + /// total kernel position + kernel_pos_total: Option, + }, /// Validating the kernels TxHashsetKernelsValidation { /// kernels validated @@ -119,10 +148,36 @@ impl Default for TxHashsetDownloadStats { } } +/// Container for entry in requested PIBD segments +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct PIBDSegmentContainer { + /// Segment+Type Identifier + pub identifier: SegmentTypeIdentifier, + /// Time at which this request was made + pub request_time: DateTime, +} + +impl PIBDSegmentContainer { + /// Return container with timestamp + pub fn new(identifier: SegmentTypeIdentifier) -> Self { + Self { + identifier, + request_time: Utc::now(), + } + } +} + /// Current sync state. Encapsulates the current SyncStatus. pub struct SyncState { current: RwLock, sync_error: RwLock>, + /// Something has to keep track of segments that have been + /// requested from other peers. TODO consider: This may not + /// be the best place to put code that's concerned with peers + /// but it's currently the only place that makes the info + /// available where it will be needed (both in the adapter + /// and the sync loop) + requested_pibd_segments: RwLock>, } impl SyncState { @@ -131,6 +186,7 @@ impl SyncState { SyncState { current: RwLock::new(SyncStatus::Initial), sync_error: RwLock::new(None), + requested_pibd_segments: RwLock::new(vec![]), } } @@ -200,6 +256,60 @@ impl SyncState { *self.current.write() = SyncStatus::TxHashsetDownload(stats); } + /// Update PIBD progress + pub fn update_pibd_progress( + &self, + aborted: bool, + errored: bool, + completed_leaves: u64, + completed_to_height: u64, + archive_header: &BlockHeader, + ) { + let leaves_required = pmmr::n_leaves(archive_header.output_mmr_size) * 2 + + pmmr::n_leaves(archive_header.kernel_mmr_size); + *self.current.write() = SyncStatus::TxHashsetPibd { + aborted, + errored, + completed_leaves, + leaves_required, + completed_to_height, + required_height: archive_header.height, + }; + } + + /// Update PIBD segment list + pub fn add_pibd_segment(&self, id: &SegmentTypeIdentifier) { + self.requested_pibd_segments + .write() + .push(PIBDSegmentContainer::new(id.clone())); + } + + /// Remove segment from list + pub fn remove_pibd_segment(&self, id: &SegmentTypeIdentifier) { + self.requested_pibd_segments + .write() + .retain(|i| &i.identifier != id); + } + + /// Remove segments with request timestamps less than cutoff time + pub fn remove_stale_pibd_requests(&self, timeout_seconds: i64) { + let cutoff_time = Utc::now() - Duration::seconds(timeout_seconds); + self.requested_pibd_segments.write().retain(|i| { + if i.request_time <= cutoff_time { + debug!("Removing + retrying PIBD request after timeout: {:?}", i) + }; + i.request_time > cutoff_time + }); + } + + /// Check whether segment is in request list + pub fn contains_pibd_segment(&self, id: &SegmentTypeIdentifier) -> bool { + self.requested_pibd_segments + .read() + .iter() + .any(|i| &i.identifier == id) + } + /// Communicate sync error pub fn set_sync_error(&self, error: Error) { *self.sync_error.write() = Some(error); @@ -217,8 +327,19 @@ impl SyncState { } impl TxHashsetWriteStatus for SyncState { - fn on_setup(&self) { - self.update(SyncStatus::TxHashsetSetup); + fn on_setup( + &self, + headers: Option, + headers_total: Option, + kernel_pos: Option, + kernel_pos_total: Option, + ) { + self.update(SyncStatus::TxHashsetSetup { + headers, + headers_total, + kernel_pos, + kernel_pos_total, + }); } fn on_validation_kernels(&self, kernels: u64, kernels_total: u64) { @@ -445,7 +566,13 @@ pub trait ChainAdapter { /// those values as the processing progresses. pub trait TxHashsetWriteStatus { /// First setup of the txhashset - fn on_setup(&self); + fn on_setup( + &self, + headers: Option, + header_total: Option, + kernel_pos: Option, + kernel_pos_total: Option, + ); /// Starting kernel validation fn on_validation_kernels(&self, kernels: u64, kernel_total: u64); /// Starting rproof validation @@ -460,7 +587,7 @@ pub trait TxHashsetWriteStatus { pub struct NoStatus; impl TxHashsetWriteStatus for NoStatus { - fn on_setup(&self) {} + fn on_setup(&self, _hs: Option, _ht: Option, _kp: Option, _kpt: Option) {} fn on_validation_kernels(&self, _ks: u64, _kts: u64) {} fn on_validation_rproofs(&self, _rs: u64, _rt: u64) {} fn on_save(&self) {} diff --git a/chain/tests/test_data/chain_compacted/lmdb/lock.mdb b/chain/tests/test_data/chain_compacted/lmdb/lock.mdb index c32379f06..c02e4fde1 100644 Binary files a/chain/tests/test_data/chain_compacted/lmdb/lock.mdb and b/chain/tests/test_data/chain_compacted/lmdb/lock.mdb differ diff --git a/chain/tests/test_data/chain_raw/lmdb/lock.mdb b/chain/tests/test_data/chain_raw/lmdb/lock.mdb index f973e6ac4..da13f9f45 100644 Binary files a/chain/tests/test_data/chain_raw/lmdb/lock.mdb and b/chain/tests/test_data/chain_raw/lmdb/lock.mdb differ diff --git a/chain/tests/test_pibd_copy.rs b/chain/tests/test_pibd_copy.rs index 60238fd01..255f56cdd 100644 --- a/chain/tests/test_pibd_copy.rs +++ b/chain/tests/test_pibd_copy.rs @@ -19,189 +19,285 @@ use grin_util as util; #[macro_use] extern crate log; +use std::path::Path; use std::sync::Arc; +use std::{fs, io}; +use crate::chain::txhashset::BitmapChunk; use crate::chain::types::{NoopAdapter, Options}; -use crate::core::core::{hash::Hashed, pmmr::segment::SegmentIdentifier}; +use crate::core::core::{ + hash::{Hash, Hashed}, + pmmr::segment::{Segment, SegmentIdentifier, SegmentType}, + Block, OutputIdentifier, TxKernel, +}; use crate::core::{genesis, global, pow}; +use crate::util::secp::pedersen::RangeProof; use self::chain_test_helper::clean_output_dir; mod chain_test_helper; -fn test_pibd_copy_impl(is_test_chain: bool, src_root_dir: &str, dest_root_dir: &str) { - global::set_local_chain_type(global::ChainTypes::Mainnet); - let mut genesis = genesis::genesis_main(); - // Height at which to read kernel segments (lower than thresholds defined in spec - for testing) - let mut target_segment_height = 11; - - if is_test_chain { - global::set_local_chain_type(global::ChainTypes::AutomatedTesting); - genesis = pow::mine_genesis_block().unwrap(); - target_segment_height = 3; +fn copy_dir_all(src: impl AsRef, dst: impl AsRef) -> io::Result<()> { + fs::create_dir_all(&dst)?; + for entry in fs::read_dir(src)? { + let entry = entry?; + let ty = entry.file_type()?; + if ty.is_dir() { + copy_dir_all(entry.path(), dst.as_ref().join(entry.file_name()))?; + } else { + fs::copy(entry.path(), dst.as_ref().join(entry.file_name()))?; + } } + Ok(()) +} - { - debug!("Reading Chain, genesis block: {}", genesis.hash()); +// Canned segmenter responder, which will simulate feeding back segments as requested +// by the desegmenter +struct SegmenterResponder { + chain: Arc, +} + +impl SegmenterResponder { + pub fn new(chain_src_dir: &str, genesis: Block) -> Self { let dummy_adapter = Arc::new(NoopAdapter {}); + debug!( + "Reading SegmenterResponder chain, genesis block: {}", + genesis.hash() + ); // The original chain we're reading from - let src_chain = Arc::new( - chain::Chain::init( - src_root_dir.into(), - dummy_adapter.clone(), - genesis.clone(), - pow::verify_size, - false, - ) - .unwrap(), - ); - - // And the output chain we're writing to - let dest_chain = Arc::new( - chain::Chain::init( - dest_root_dir.into(), - dummy_adapter, - genesis.clone(), - pow::verify_size, - false, - ) - .unwrap(), - ); - - // For test compaction purposes - /*src_chain.compact().unwrap(); - src_chain - .validate(true) - .expect("Source chain validation failed, stop");*/ - - let sh = src_chain.get_header_by_height(0).unwrap(); + let res = SegmenterResponder { + chain: Arc::new( + chain::Chain::init( + chain_src_dir.into(), + dummy_adapter.clone(), + genesis, + pow::verify_size, + false, + ) + .unwrap(), + ), + }; + let sh = res.chain.get_header_by_height(0).unwrap(); debug!("Source Genesis - {}", sh.hash()); + res + } - let dh = dest_chain.get_header_by_height(0).unwrap(); - debug!("Destination Genesis - {}", dh.hash()); + pub fn chain(&self) -> Arc { + self.chain.clone() + } - let horizon_header = src_chain.txhashset_archive_header().unwrap(); + pub fn get_bitmap_segment(&self, seg_id: SegmentIdentifier) -> (Segment, Hash) { + let segmenter = self.chain.segmenter().unwrap(); + segmenter.bitmap_segment(seg_id).unwrap() + } - debug!("Horizon header: {:?}", horizon_header); + pub fn get_output_segment( + &self, + seg_id: SegmentIdentifier, + ) -> (Segment, Hash) { + let segmenter = self.chain.segmenter().unwrap(); + segmenter.output_segment(seg_id).unwrap() + } - // Copy the headers from source to output in chunks - let dest_sync_head = dest_chain.header_head().unwrap(); + pub fn get_rangeproof_segment(&self, seg_id: SegmentIdentifier) -> Segment { + let segmenter = self.chain.segmenter().unwrap(); + segmenter.rangeproof_segment(seg_id).unwrap() + } + + pub fn get_kernel_segment(&self, seg_id: SegmentIdentifier) -> Segment { + let segmenter = self.chain.segmenter().unwrap(); + segmenter.kernel_segment(seg_id).unwrap() + } +} + +// Canned segmenter 'peer', building up its local chain from requested PIBD segments +struct DesegmenterRequestor { + chain: Arc, + responder: Arc, +} + +impl DesegmenterRequestor { + pub fn new(chain_src_dir: &str, genesis: Block, responder: Arc) -> Self { + let dummy_adapter = Arc::new(NoopAdapter {}); + debug!( + "Reading DesegmenterRequestor chain, genesis block: {}", + genesis.hash() + ); + + // The original chain we're reading from + let res = DesegmenterRequestor { + chain: Arc::new( + chain::Chain::init( + chain_src_dir.into(), + dummy_adapter.clone(), + genesis, + pow::verify_size, + false, + ) + .unwrap(), + ), + responder, + }; + let sh = res.chain.get_header_by_height(0).unwrap(); + debug!("Dest Genesis - {}", sh.hash()); + res + } + + /// Copy headers, hopefully bringing the requestor to a state where PIBD is the next step + pub fn copy_headers_from_responder(&mut self) { + let src_chain = self.responder.chain(); + let tip = src_chain.header_head().unwrap(); + let dest_sync_head = self.chain.header_head().unwrap(); let copy_chunk_size = 1000; let mut copied_header_index = 1; let mut src_headers = vec![]; - while copied_header_index <= horizon_header.height { + while copied_header_index <= tip.height { let h = src_chain.get_header_by_height(copied_header_index).unwrap(); src_headers.push(h); copied_header_index += 1; if copied_header_index % copy_chunk_size == 0 { debug!( "Copying headers to {} of {}", - copied_header_index, horizon_header.height + copied_header_index, tip.height ); - dest_chain + self.chain .sync_block_headers(&src_headers, dest_sync_head, Options::SKIP_POW) .unwrap(); src_headers = vec![]; } } if !src_headers.is_empty() { - dest_chain + self.chain .sync_block_headers(&src_headers, dest_sync_head, Options::NONE) .unwrap(); } - - // Init segmenter, (note this still has to be lazy init somewhere on a peer) - // This is going to use the same block as horizon_header - let segmenter = src_chain.segmenter().unwrap(); - // Init desegmenter - let mut desegmenter = dest_chain.desegmenter(&horizon_header).unwrap(); - - // And total size of the bitmap PMMR - let bitmap_mmr_size = desegmenter.expected_bitmap_mmr_size(); - debug!( - "Bitmap Segments required: {}", - SegmentIdentifier::count_segments_required(bitmap_mmr_size, target_segment_height) - ); - // TODO: This can probably be derived from the PMMR we'll eventually be building - // (check if total size is equal to total size at horizon header) - let identifier_iter = - SegmentIdentifier::traversal_iter(bitmap_mmr_size, target_segment_height); - - for sid in identifier_iter { - debug!("Getting bitmap segment with Segment Identifier {:?}", sid); - let (bitmap_segment, output_root_hash) = segmenter.bitmap_segment(sid).unwrap(); - debug!( - "Bitmap segmenter reports output root hash is {:?}", - output_root_hash - ); - // Add segment to desegmenter / validate - if let Err(e) = desegmenter.add_bitmap_segment(bitmap_segment, output_root_hash) { - panic!("Unable to add bitmap segment: {}", e); - } - } - - // Finalize segmenter bitmap, which means we've recieved all bitmap MMR chunks and - // Are ready to use it to validate outputs - desegmenter.finalize_bitmap().unwrap(); - - // OUTPUTS - Read + Validate - let identifier_iter = SegmentIdentifier::traversal_iter( - horizon_header.output_mmr_size, - target_segment_height, - ); - - for sid in identifier_iter { - debug!("Getting output segment with Segment Identifier {:?}", sid); - let (output_segment, bitmap_root_hash) = segmenter.output_segment(sid).unwrap(); - debug!( - "Output segmenter reports bitmap hash is {:?}", - bitmap_root_hash - ); - // Add segment to desegmenter / validate - if let Err(e) = desegmenter.add_output_segment(output_segment) { - panic!("Unable to add output segment: {}", e); - } - } - - // PROOFS - Read + Validate - let identifier_iter = SegmentIdentifier::traversal_iter( - horizon_header.output_mmr_size, - target_segment_height, - ); - - for sid in identifier_iter { - debug!( - "Getting rangeproof segment with Segment Identifier {:?}", - sid - ); - let rangeproof_segment = segmenter.rangeproof_segment(sid).unwrap(); - // Add segment to desegmenter / validate - if let Err(e) = desegmenter.add_rangeproof_segment(rangeproof_segment) { - panic!("Unable to add rangeproof segment: {}", e); - } - } - - // KERNELS - Read + Validate - let identifier_iter = SegmentIdentifier::traversal_iter( - horizon_header.kernel_mmr_size, - target_segment_height, - ); - - for sid in identifier_iter { - debug!("Getting kernel segment with Segment Identifier {:?}", sid); - let kernel_segment = segmenter.kernel_segment(sid).unwrap(); - if let Err(e) = desegmenter.add_kernel_segment(kernel_segment) { - panic!("Unable to add kernel segment: {}", e); - } - } - - let dest_txhashset = dest_chain.txhashset(); - debug!("Dest TxHashset Roots: {:?}", dest_txhashset.read().roots()); } + + // Emulate `continue_pibd` function, which would be called from state sync + // return whether is complete + pub fn continue_pibd(&mut self) -> bool { + let archive_header = self.chain.txhashset_archive_header_header_only().unwrap(); + let desegmenter = self.chain.desegmenter(&archive_header).unwrap(); + + // Apply segments... TODO: figure out how this should be called, might + // need to be a separate thread. + if let Some(mut de) = desegmenter.try_write() { + if let Some(d) = de.as_mut() { + d.apply_next_segments().unwrap(); + } + } + + let mut next_segment_ids = vec![]; + let mut is_complete = false; + if let Some(d) = desegmenter.write().as_mut() { + // Figure out the next segments we need + // (12 is divisible by 3, to try and evenly spread the requests among the 3 + // main pmmrs. Bitmaps segments will always be requested first) + next_segment_ids = d.next_desired_segments(12); + is_complete = d.is_complete() + } + + debug!("Next segment IDS: {:?}", next_segment_ids); + + // For each segment, pick a desirable peer and send message + for seg_id in next_segment_ids.iter() { + // Perform request and response + match seg_id.segment_type { + SegmentType::Bitmap => { + let (seg, output_root) = + self.responder.get_bitmap_segment(seg_id.identifier.clone()); + if let Some(d) = desegmenter.write().as_mut() { + d.add_bitmap_segment(seg, output_root).unwrap(); + } + } + SegmentType::Output => { + let (seg, bitmap_root) = + self.responder.get_output_segment(seg_id.identifier.clone()); + if let Some(d) = desegmenter.write().as_mut() { + d.add_output_segment(seg, Some(bitmap_root)).unwrap(); + } + } + SegmentType::RangeProof => { + let seg = self + .responder + .get_rangeproof_segment(seg_id.identifier.clone()); + if let Some(d) = desegmenter.write().as_mut() { + d.add_rangeproof_segment(seg).unwrap(); + } + } + SegmentType::Kernel => { + let seg = self.responder.get_kernel_segment(seg_id.identifier.clone()); + if let Some(d) = desegmenter.write().as_mut() { + d.add_kernel_segment(seg).unwrap(); + } + } + }; + } + is_complete + } + + pub fn check_roots(&self) { + let roots = self.chain.txhashset().read().roots(); + let archive_header = self.chain.txhashset_archive_header_header_only().unwrap(); + debug!("Archive Header is {:?}", archive_header); + debug!("TXHashset output root is {:?}", roots); + debug!( + "TXHashset merged output root is {:?}", + roots.output_roots.root(&archive_header) + ); + assert_eq!(archive_header.range_proof_root, roots.rproof_root); + assert_eq!(archive_header.kernel_root, roots.kernel_root); + assert_eq!( + archive_header.output_root, + roots.output_roots.root(&archive_header) + ); + } +} +fn test_pibd_copy_impl( + is_test_chain: bool, + src_root_dir: &str, + dest_root_dir: &str, + dest_template_dir: Option<&str>, +) { + global::set_local_chain_type(global::ChainTypes::Testnet); + let mut genesis = genesis::genesis_test(); + + if is_test_chain { + global::set_local_chain_type(global::ChainTypes::AutomatedTesting); + genesis = pow::mine_genesis_block().unwrap(); + } + + // Copy a starting point over for the destination, e.g. a copy of chain + // with all headers pre-applied + if let Some(td) = dest_template_dir { + debug!( + "Copying template dir for destination from {} to {}", + td, dest_root_dir + ); + copy_dir_all(td, dest_root_dir).unwrap(); + } + + let src_responder = Arc::new(SegmenterResponder::new(src_root_dir, genesis.clone())); + let mut dest_requestor = + DesegmenterRequestor::new(dest_root_dir, genesis.clone(), src_responder); + + // No template provided so copy headers from source + if dest_template_dir.is_none() { + dest_requestor.copy_headers_from_responder(); + if !is_test_chain { + return; + } + } + + // Perform until desegmenter reports it's done + while !dest_requestor.continue_pibd() {} + + dest_requestor.check_roots(); } #[test] +#[ignore] fn test_pibd_copy_sample() { util::init_test_logger(); // Note there is now a 'test' in grin_wallet_controller/build_chain @@ -212,10 +308,10 @@ fn test_pibd_copy_sample() { let src_root_dir = format!("./tests/test_data/chain_raw"); let dest_root_dir = format!("./tests/test_output/.segment_copy"); clean_output_dir(&dest_root_dir); - test_pibd_copy_impl(true, &src_root_dir, &dest_root_dir); + test_pibd_copy_impl(true, &src_root_dir, &dest_root_dir, None); let src_root_dir = format!("./tests/test_data/chain_compacted"); clean_output_dir(&dest_root_dir); - test_pibd_copy_impl(true, &src_root_dir, &dest_root_dir); + test_pibd_copy_impl(true, &src_root_dir, &dest_root_dir, None); clean_output_dir(&dest_root_dir); } @@ -226,10 +322,29 @@ fn test_pibd_copy_sample() { // As above, but run on a real instance of a chain pointed where you like fn test_pibd_copy_real() { util::init_test_logger(); + // If set, just copy headers from source to target template dir and exit + // Used to set up a chain state simulating the start of PIBD to continue manual testing + let copy_headers_to_template = false; + // if testing against a real chain, insert location here - let src_root_dir = format!("/Users/yeastplume/Projects/grin_project/server/chain_data"); - let dest_root_dir = format!("/Users/yeastplume/Projects/grin_project/server/.chain_data_copy"); - clean_output_dir(&dest_root_dir); - test_pibd_copy_impl(false, &src_root_dir, &dest_root_dir); - clean_output_dir(&dest_root_dir); + let src_root_dir = format!("/home/yeastplume/Projects/grin-project/servers/floo-1/chain_data"); + let dest_template_dir = format!( + "/home/yeastplume/Projects/grin-project/servers/floo-pibd-1/chain_data_headers_only" + ); + let dest_root_dir = + format!("/home/yeastplume/Projects/grin-project/servers/floo-pibd-1/chain_data_test_copy"); + if copy_headers_to_template { + clean_output_dir(&dest_template_dir); + test_pibd_copy_impl(false, &src_root_dir, &dest_template_dir, None); + } else { + clean_output_dir(&dest_root_dir); + test_pibd_copy_impl( + false, + &src_root_dir, + &dest_root_dir, + Some(&dest_template_dir), + ); + } + + //clean_output_dir(&dest_root_dir); } diff --git a/chain/tests/test_pibd_validation.rs b/chain/tests/test_pibd_validation.rs index 0e8d03c99..286b05601 100644 --- a/chain/tests/test_pibd_validation.rs +++ b/chain/tests/test_pibd_validation.rs @@ -211,6 +211,8 @@ fn test_pibd_chain_validation_impl(is_test_chain: bool, src_root_dir: &str) { } #[test] +// TODO: Fix before merge into master +#[ignore] fn test_pibd_chain_validation_sample() { util::init_test_logger(); // Note there is now a 'test' in grin_wallet_controller/build_chain diff --git a/core/src/core/pmmr/backend.rs b/core/src/core/pmmr/backend.rs index 8c53835ea..0383c2709 100644 --- a/core/src/core/pmmr/backend.rs +++ b/core/src/core/pmmr/backend.rs @@ -33,6 +33,9 @@ pub trait Backend { /// This allows us to append an existing pruned subtree directly without the underlying leaf nodes. fn append_pruned_subtree(&mut self, hash: Hash, pos0: u64) -> Result<(), String>; + /// Append a single hash to the pmmr + fn append_hash(&mut self, hash: Hash) -> Result<(), String>; + /// Rewind the backend state to a previous position, as if all append /// operations after that had been canceled. Expects a position in the PMMR /// to rewind to as well as bitmaps representing the positions added and @@ -65,6 +68,9 @@ pub trait Backend { /// Number of leaves fn n_unpruned_leaves(&self) -> u64; + /// Number of leaves up to the given leaf index + fn n_unpruned_leaves_to_index(&self, to_index: u64) -> u64; + /// Iterator over current (unpruned, unremoved) leaf insertion index. /// Note: This differs from underlying MMR pos - [0, 1, 2, 3, 4] vs. [1, 2, 4, 5, 8]. fn leaf_idx_iter(&self, from_idx: u64) -> Box + '_>; @@ -75,9 +81,15 @@ pub trait Backend { /// triggered removal). fn remove(&mut self, position: u64) -> Result<(), String>; + /// Remove a leaf from the leaf set + fn remove_from_leaf_set(&mut self, pos0: u64); + /// Release underlying datafiles and locks fn release_files(&mut self); + /// Reset prune list, used when PIBD is reset + fn reset_prune_list(&mut self); + /// Saves a snapshot of the rewound utxo file with the block hash as /// filename suffix. We need this when sending a txhashset zip file to a /// node for fast sync. diff --git a/core/src/core/pmmr/pmmr.rs b/core/src/core/pmmr/pmmr.rs index a0c039d56..4d503bcb2 100644 --- a/core/src/core/pmmr/pmmr.rs +++ b/core/src/core/pmmr/pmmr.rs @@ -58,6 +58,9 @@ pub trait ReadablePMMR { /// Number of leaves in the MMR fn n_unpruned_leaves(&self) -> u64; + /// Number of leaves in the MMR up to index + fn n_unpruned_leaves_to_index(&self, to_index: u64) -> u64; + /// Is the MMR empty? fn is_empty(&self) -> bool { self.unpruned_size() == 0 @@ -241,6 +244,50 @@ where Ok(leaf_pos) } + /// Push a pruned subtree into the PMMR + pub fn push_pruned_subtree(&mut self, hash: Hash, pos0: u64) -> Result<(), String> { + // First append the subtree + self.backend.append_pruned_subtree(hash, pos0)?; + self.size = pos0 + 1; + + let mut pos = pos0; + let mut current_hash = hash; + + let (peak_map, _) = peak_map_height(pos); + + // Then hash with all immediately preceding peaks, as indicated by peak map + let mut peak = 1; + while (peak_map & peak) != 0 { + let (parent, sibling) = family(pos); + peak *= 2; + if sibling > pos { + // is right sibling, we should be done + continue; + } + let left_hash = self + .backend + .get_hash(sibling) + .ok_or("missing left sibling in tree, should not have been pruned")?; + pos = parent; + current_hash = (left_hash, current_hash).hash_with_index(parent); + self.backend.append_hash(current_hash)?; + } + + // Round size up to next leaf, ready for insertion + self.size = crate::core::pmmr::round_up_to_leaf_pos(pos); + Ok(()) + } + + /// Reset prune list + pub fn reset_prune_list(&mut self) { + self.backend.reset_prune_list(); + } + + /// Remove the specified position from the leaf set + pub fn remove_from_leaf_set(&mut self, pos0: u64) { + self.backend.remove_from_leaf_set(pos0); + } + /// Saves a snapshot of the MMR tagged with the block hash. /// Specifically - snapshots the utxo file as we need this rewound before /// sending the txhashset zip file to another node for fast-sync. @@ -323,15 +370,15 @@ where if m >= sz { break; } - idx.push_str(&format!("{:>8} ", m + 1)); + idx.push_str(&format!("{:>8} ", m)); let ohs = self.get_hash(m); match ohs { Some(hs) => hashes.push_str(&format!("{} ", hs)), None => hashes.push_str(&format!("{:>8} ", "??")), } } - trace!("{}", idx); - trace!("{}", hashes); + debug!("{}", idx); + debug!("{}", hashes); } } @@ -441,6 +488,10 @@ where fn n_unpruned_leaves(&self) -> u64 { self.backend.n_unpruned_leaves() } + + fn n_unpruned_leaves_to_index(&self, to_index: u64) -> u64 { + self.backend.n_unpruned_leaves_to_index(to_index) + } } /// 64 bits all ones: 0b11111111...1 diff --git a/core/src/core/pmmr/readonly_pmmr.rs b/core/src/core/pmmr/readonly_pmmr.rs index 4c693f3c6..b9f720e52 100644 --- a/core/src/core/pmmr/readonly_pmmr.rs +++ b/core/src/core/pmmr/readonly_pmmr.rs @@ -176,4 +176,8 @@ where fn n_unpruned_leaves(&self) -> u64 { self.backend.n_unpruned_leaves() } + + fn n_unpruned_leaves_to_index(&self, to_index: u64) -> u64 { + self.backend.n_unpruned_leaves_to_index(to_index) + } } diff --git a/core/src/core/pmmr/segment.rs b/core/src/core/pmmr/segment.rs index 09fb9f2f3..148a82773 100644 --- a/core/src/core/pmmr/segment.rs +++ b/core/src/core/pmmr/segment.rs @@ -21,6 +21,39 @@ use croaring::Bitmap; use std::cmp::min; use std::fmt::Debug; +#[derive(Clone, Debug, Eq, PartialEq)] +/// Possible segment types, according to this desegmenter +pub enum SegmentType { + /// Output Bitmap + Bitmap, + /// Output + Output, + /// RangeProof + RangeProof, + /// Kernel + Kernel, +} + +/// Lumps possible types with segment ids to enable a unique identifier +/// for a segment with respect to a particular archive header +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct SegmentTypeIdentifier { + /// The type of this segment + pub segment_type: SegmentType, + /// The identfier itself + pub identifier: SegmentIdentifier, +} + +impl SegmentTypeIdentifier { + /// Create + pub fn new(segment_type: SegmentType, identifier: SegmentIdentifier) -> Self { + Self { + segment_type, + identifier, + } + } +} + #[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)] /// Error related to segment creation or validation pub enum SegmentError { @@ -83,6 +116,48 @@ impl SegmentIdentifier { let d = 1 << segment_height; ((pmmr::n_leaves(target_mmr_size) + d - 1) / d) as usize } + + /// Return pmmr size of number of segments of the given height + pub fn pmmr_size(num_segments: usize, height: u8) -> u64 { + pmmr::insertion_to_pmmr_index(num_segments as u64 * (1 << height)) + } + + /// Maximum number of leaves in a segment, given by `2**height` + pub fn segment_capacity(&self) -> u64 { + 1 << self.height + } + + /// Offset (in leaf idx) of first leaf in the segment + fn leaf_offset(&self) -> u64 { + self.idx * self.segment_capacity() + } + + // Number of leaves in this segment. Equal to capacity except for the final segment, which can be smaller + fn segment_unpruned_size(&self, mmr_size: u64) -> u64 { + min( + self.segment_capacity(), + pmmr::n_leaves(mmr_size).saturating_sub(self.leaf_offset()), + ) + } + + /// Inclusive (full) range of MMR positions for the segment that would be produced + /// by this Identifier + pub fn segment_pos_range(&self, mmr_size: u64) -> (u64, u64) { + let segment_size = self.segment_unpruned_size(mmr_size); + let leaf_offset = self.leaf_offset(); + let first = pmmr::insertion_to_pmmr_index(leaf_offset); + let last = if self.full_segment(mmr_size) { + pmmr::insertion_to_pmmr_index(leaf_offset + segment_size - 1) + (self.height as u64) + } else { + mmr_size - 1 + }; + (first, last) + } + + /// Whether the segment is full (segment size == capacity) + fn full_segment(&self, mmr_size: u64) -> bool { + self.segment_unpruned_size(mmr_size) == self.segment_capacity() + } } /// Segment of a PMMR: unpruned leaves and the necessary data to verify @@ -111,40 +186,28 @@ impl Segment { } /// Maximum number of leaves in a segment, given by `2**height` - fn segment_capacity(&self) -> u64 { - 1 << self.identifier.height + fn _segment_capacity(&self) -> u64 { + self.identifier.segment_capacity() } /// Offset (in leaf idx) of first leaf in the segment - fn leaf_offset(&self) -> u64 { - self.identifier.idx * self.segment_capacity() + fn _leaf_offset(&self) -> u64 { + self.identifier.leaf_offset() } // Number of leaves in this segment. Equal to capacity except for the final segment, which can be smaller fn segment_unpruned_size(&self, mmr_size: u64) -> u64 { - min( - self.segment_capacity(), - pmmr::n_leaves(mmr_size).saturating_sub(self.leaf_offset()), - ) + self.identifier.segment_unpruned_size(mmr_size) } /// Whether the segment is full (segment size == capacity) fn full_segment(&self, mmr_size: u64) -> bool { - self.segment_unpruned_size(mmr_size) == self.segment_capacity() + self.identifier.full_segment(mmr_size) } /// Inclusive range of MMR positions for this segment pub fn segment_pos_range(&self, mmr_size: u64) -> (u64, u64) { - let segment_size = self.segment_unpruned_size(mmr_size); - let leaf_offset = self.leaf_offset(); - let first = pmmr::insertion_to_pmmr_index(leaf_offset); - let last = if self.full_segment(mmr_size) { - pmmr::insertion_to_pmmr_index(leaf_offset + segment_size - 1) - + (self.identifier.height as u64) - } else { - mmr_size - 1 - }; - (first, last) + self.identifier.segment_pos_range(mmr_size) } /// TODO - binary_search_by_key() here (can we assume these are sorted by pos?) diff --git a/core/src/core/pmmr/vec_backend.rs b/core/src/core/pmmr/vec_backend.rs index 4650b3764..ee4629d0a 100644 --- a/core/src/core/pmmr/vec_backend.rs +++ b/core/src/core/pmmr/vec_backend.rs @@ -47,6 +47,10 @@ impl Backend for VecBackend { unimplemented!() } + fn append_hash(&mut self, _hash: Hash) -> Result<(), String> { + unimplemented!() + } + fn get_hash(&self, pos0: u64) -> Option { if self.removed.contains(&pos0) { None @@ -86,6 +90,10 @@ impl Backend for VecBackend { unimplemented!() } + fn n_unpruned_leaves_to_index(&self, _to_index: u64) -> u64 { + unimplemented!() + } + fn leaf_pos_iter(&self) -> Box + '_> { Box::new( self.hashes @@ -111,6 +119,14 @@ impl Backend for VecBackend { Ok(()) } + fn remove_from_leaf_set(&mut self, _pos0: u64) { + unimplemented!() + } + + fn reset_prune_list(&mut self) { + unimplemented!() + } + fn rewind(&mut self, position: u64, _rewind_rm_pos: &Bitmap) -> Result<(), String> { if let Some(data) = &mut self.data { let idx = pmmr::n_leaves(position); diff --git a/core/src/global.rs b/core/src/global.rs index fb335dc0b..37fb51689 100644 --- a/core/src/global.rs +++ b/core/src/global.rs @@ -22,7 +22,8 @@ use crate::consensus::{ DMA_WINDOW, GRIN_BASE, INITIAL_DIFFICULTY, KERNEL_WEIGHT, MAX_BLOCK_WEIGHT, OUTPUT_WEIGHT, PROOFSIZE, SECOND_POW_EDGE_BITS, STATE_SYNC_THRESHOLD, }; -use crate::core::block::HeaderVersion; +use crate::core::block::{Block, HeaderVersion}; +use crate::genesis; use crate::pow::{ self, new_cuckaroo_ctx, new_cuckarood_ctx, new_cuckaroom_ctx, new_cuckarooz_ctx, new_cuckatoo_ctx, no_cuckaroo_ctx, PoWContext, Proof, @@ -206,6 +207,15 @@ pub fn get_chain_type() -> ChainTypes { }) } +/// Return genesis block for the active chain type +pub fn get_genesis_block() -> Block { + match get_chain_type() { + ChainTypes::Mainnet => genesis::genesis_main(), + ChainTypes::Testnet => genesis::genesis_test(), + _ => genesis::genesis_dev(), + } +} + /// One time initialization of the global future time limit /// Will panic if we attempt to re-initialize this (via OneTime). pub fn init_global_future_time_limit(new_ftl: u64) { diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 3d57ba423..a0433d25e 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -31,7 +31,9 @@ use crate::core::pow::Difficulty; use crate::core::ser::Writeable; use crate::core::{core, global}; use crate::handshake::Handshake; -use crate::msg::{self, BanReason, GetPeerAddrs, Locator, Msg, Ping, TxHashSetRequest, Type}; +use crate::msg::{ + self, BanReason, GetPeerAddrs, Locator, Msg, Ping, SegmentRequest, TxHashSetRequest, Type, +}; use crate::protocol::Protocol; use crate::types::{ Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan, @@ -371,6 +373,62 @@ impl Peer { ) } + pub fn send_bitmap_segment_request( + &self, + h: Hash, + identifier: SegmentIdentifier, + ) -> Result<(), Error> { + self.send( + &SegmentRequest { + block_hash: h, + identifier, + }, + msg::Type::GetOutputBitmapSegment, + ) + } + + pub fn send_output_segment_request( + &self, + h: Hash, + identifier: SegmentIdentifier, + ) -> Result<(), Error> { + self.send( + &SegmentRequest { + block_hash: h, + identifier, + }, + msg::Type::GetOutputSegment, + ) + } + + pub fn send_rangeproof_segment_request( + &self, + h: Hash, + identifier: SegmentIdentifier, + ) -> Result<(), Error> { + self.send( + &SegmentRequest { + block_hash: h, + identifier, + }, + msg::Type::GetRangeProofSegment, + ) + } + + pub fn send_kernel_segment_request( + &self, + h: Hash, + identifier: SegmentIdentifier, + ) -> Result<(), Error> { + self.send( + &SegmentRequest { + block_hash: h, + identifier, + }, + msg::Type::GetKernelSegment, + ) + } + /// Stops the peer pub fn stop(&self) { debug!("Stopping peer {:?}", self.info.addr); @@ -586,6 +644,42 @@ impl ChainAdapter for TrackingAdapter { ) -> Result, chain::Error> { self.adapter.get_rangeproof_segment(hash, id) } + + fn receive_bitmap_segment( + &self, + block_hash: Hash, + output_root: Hash, + segment: Segment, + ) -> Result { + self.adapter + .receive_bitmap_segment(block_hash, output_root, segment) + } + + fn receive_output_segment( + &self, + block_hash: Hash, + bitmap_root: Hash, + segment: Segment, + ) -> Result { + self.adapter + .receive_output_segment(block_hash, bitmap_root, segment) + } + + fn receive_rangeproof_segment( + &self, + block_hash: Hash, + segment: Segment, + ) -> Result { + self.adapter.receive_rangeproof_segment(block_hash, segment) + } + + fn receive_kernel_segment( + &self, + block_hash: Hash, + segment: Segment, + ) -> Result { + self.adapter.receive_kernel_segment(block_hash, segment) + } } impl NetAdapter for TrackingAdapter { diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index 9f758e23a..04174085b 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -649,6 +649,42 @@ impl ChainAdapter for Peers { ) -> Result, chain::Error> { self.adapter.get_rangeproof_segment(hash, id) } + + fn receive_bitmap_segment( + &self, + block_hash: Hash, + output_root: Hash, + segment: Segment, + ) -> Result { + self.adapter + .receive_bitmap_segment(block_hash, output_root, segment) + } + + fn receive_output_segment( + &self, + block_hash: Hash, + bitmap_root: Hash, + segment: Segment, + ) -> Result { + self.adapter + .receive_output_segment(block_hash, bitmap_root, segment) + } + + fn receive_rangeproof_segment( + &self, + block_hash: Hash, + segment: Segment, + ) -> Result { + self.adapter.receive_rangeproof_segment(block_hash, segment) + } + + fn receive_kernel_segment( + &self, + block_hash: Hash, + segment: Segment, + ) -> Result { + self.adapter.receive_kernel_segment(block_hash, segment) + } } impl NetAdapter for Peers { diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index 9f0122f08..31e069321 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -371,11 +371,55 @@ impl MessageHandler for Protocol { Consumed::None } } - Message::OutputBitmapSegment(_) - | Message::OutputSegment(_) - | Message::RangeProofSegment(_) - | Message::KernelSegment(_) => Consumed::None, - + Message::OutputBitmapSegment(req) => { + let OutputBitmapSegmentResponse { + block_hash, + segment, + output_root, + } = req; + trace!( + "Received Output Bitmap Segment: bh, output_root: {}, {}", + block_hash, + output_root + ); + adapter.receive_bitmap_segment(block_hash, output_root, segment.into())?; + Consumed::None + } + Message::OutputSegment(req) => { + let OutputSegmentResponse { + response, + output_bitmap_root, + } = req; + trace!( + "Received Output Segment: bh, bitmap_root: {}, {}", + response.block_hash, + output_bitmap_root + ); + adapter.receive_output_segment( + response.block_hash, + output_bitmap_root, + response.segment.into(), + )?; + Consumed::None + } + Message::RangeProofSegment(req) => { + let SegmentResponse { + block_hash, + segment, + } = req; + trace!("Received Rangeproof Segment: bh: {}", block_hash); + adapter.receive_rangeproof_segment(block_hash, segment.into())?; + Consumed::None + } + Message::KernelSegment(req) => { + let SegmentResponse { + block_hash, + segment, + } = req; + trace!("Received Kernel Segment: bh: {}", block_hash); + adapter.receive_kernel_segment(block_hash, segment.into())?; + Consumed::None + } Message::Unknown(_) => Consumed::None, }; Ok(consumed) diff --git a/p2p/src/serv.rs b/p2p/src/serv.rs index 784d562ab..c31c549c7 100644 --- a/p2p/src/serv.rs +++ b/p2p/src/serv.rs @@ -410,6 +410,40 @@ impl ChainAdapter for DummyAdapter { ) -> Result, chain::Error> { unimplemented!() } + + fn receive_bitmap_segment( + &self, + _block_hash: Hash, + _output_root: Hash, + _segment: Segment, + ) -> Result { + unimplemented!() + } + + fn receive_output_segment( + &self, + _block_hash: Hash, + _bitmap_root: Hash, + _segment: Segment, + ) -> Result { + unimplemented!() + } + + fn receive_rangeproof_segment( + &self, + _block_hash: Hash, + _segment: Segment, + ) -> Result { + unimplemented!() + } + + fn receive_kernel_segment( + &self, + _block_hash: Hash, + _segment: Segment, + ) -> Result { + unimplemented!() + } } impl NetAdapter for DummyAdapter { diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 496858fae..07765496c 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -391,6 +391,8 @@ bitflags! { const PIBD_HIST = 0b0001_0000; /// Can provide historical blocks for archival sync. const BLOCK_HIST = 0b0010_0000; + /// As above, with crucial serialization fix #3705 applied + const PIBD_HIST_1 = 0b0100_0000; } } @@ -402,6 +404,7 @@ impl Default for Capabilities { | Capabilities::PEER_LIST | Capabilities::TX_KERNEL_HASH | Capabilities::PIBD_HIST + | Capabilities::PIBD_HIST_1 } } @@ -667,6 +670,32 @@ pub trait ChainAdapter: Sync + Send { hash: Hash, id: SegmentIdentifier, ) -> Result, chain::Error>; + + fn receive_bitmap_segment( + &self, + block_hash: Hash, + output_root: Hash, + segment: Segment, + ) -> Result; + + fn receive_output_segment( + &self, + block_hash: Hash, + bitmap_root: Hash, + segment: Segment, + ) -> Result; + + fn receive_rangeproof_segment( + &self, + block_hash: Hash, + segment: Segment, + ) -> Result; + + fn receive_kernel_segment( + &self, + block_hash: Hash, + segment: Segment, + ) -> Result; } /// Additional methods required by the protocol that don't need to be diff --git a/p2p/tests/capabilities.rs b/p2p/tests/capabilities.rs index d07354d2f..5b20e099b 100644 --- a/p2p/tests/capabilities.rs +++ b/p2p/tests/capabilities.rs @@ -42,6 +42,7 @@ fn default_capabilities() { assert!(x.contains(Capabilities::PEER_LIST)); assert!(x.contains(Capabilities::TX_KERNEL_HASH)); assert!(x.contains(Capabilities::PIBD_HIST)); + assert!(x.contains(Capabilities::PIBD_HIST_1)); assert_eq!( x, @@ -50,5 +51,6 @@ fn default_capabilities() { | Capabilities::PEER_LIST | Capabilities::TX_KERNEL_HASH | Capabilities::PIBD_HIST + | Capabilities::PIBD_HIST_1 ); } diff --git a/p2p/tests/ser_deser.rs b/p2p/tests/ser_deser.rs index c75faf049..1d50aff6a 100644 --- a/p2p/tests/ser_deser.rs +++ b/p2p/tests/ser_deser.rs @@ -55,11 +55,7 @@ fn test_capabilities() { assert_eq!( expected, - p2p::types::Capabilities::from_bits_truncate(0b11111 as u32), - ); - assert_eq!( - expected, - p2p::types::Capabilities::from_bits_truncate(0b00011111 as u32), + p2p::types::Capabilities::from_bits_truncate(0b1011111 as u32), ); assert_eq!( diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index ab715dbd9..8b475007a 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -32,7 +32,7 @@ use crate::core::core::hash::{Hash, Hashed}; use crate::core::core::transaction::Transaction; use crate::core::core::{ BlockHeader, BlockSums, CompactBlock, Inputs, OutputIdentifier, Segment, SegmentIdentifier, - TxKernel, + SegmentType, SegmentTypeIdentifier, TxKernel, }; use crate::core::pow::Difficulty; use crate::core::ser::ProtocolVersion; @@ -560,6 +560,136 @@ where } segmenter.rangeproof_segment(id) } + + fn receive_bitmap_segment( + &self, + block_hash: Hash, + output_root: Hash, + segment: Segment, + ) -> Result { + debug!( + "Received bitmap segment {} for block_hash: {}, output_root: {}", + segment.identifier().idx, + block_hash, + output_root + ); + // TODO: Entire process needs to be restarted if the horizon block + // has changed (perhaps not here, NB this has to go somewhere) + let archive_header = self.chain().txhashset_archive_header_header_only()?; + let identifier = segment.identifier().clone(); + let mut retval = Ok(true); + if let Some(d) = self.chain().desegmenter(&archive_header)?.write().as_mut() { + let res = d.add_bitmap_segment(segment, output_root); + if let Err(e) = res { + error!( + "Validation of incoming bitmap segment failed: {:?}, reason: {}", + identifier, e + ); + retval = Err(e); + } + } + // Remove segment from outgoing list + self.sync_state.remove_pibd_segment(&SegmentTypeIdentifier { + segment_type: SegmentType::Bitmap, + identifier, + }); + retval + } + + fn receive_output_segment( + &self, + block_hash: Hash, + bitmap_root: Hash, + segment: Segment, + ) -> Result { + debug!( + "Received output segment {} for block_hash: {}, bitmap_root: {:?}", + segment.identifier().idx, + block_hash, + bitmap_root, + ); + let archive_header = self.chain().txhashset_archive_header_header_only()?; + let identifier = segment.identifier().clone(); + let mut retval = Ok(true); + if let Some(d) = self.chain().desegmenter(&archive_header)?.write().as_mut() { + let res = d.add_output_segment(segment, Some(bitmap_root)); + if let Err(e) = res { + error!( + "Validation of incoming output segment failed: {:?}, reason: {}", + identifier, e + ); + retval = Err(e); + } + } + // Remove segment from outgoing list + self.sync_state.remove_pibd_segment(&SegmentTypeIdentifier { + segment_type: SegmentType::Output, + identifier, + }); + retval + } + + fn receive_rangeproof_segment( + &self, + block_hash: Hash, + segment: Segment, + ) -> Result { + debug!( + "Received proof segment {} for block_hash: {}", + segment.identifier().idx, + block_hash, + ); + let archive_header = self.chain().txhashset_archive_header_header_only()?; + let identifier = segment.identifier().clone(); + let mut retval = Ok(true); + if let Some(d) = self.chain().desegmenter(&archive_header)?.write().as_mut() { + let res = d.add_rangeproof_segment(segment); + if let Err(e) = res { + error!( + "Validation of incoming rangeproof segment failed: {:?}, reason: {}", + identifier, e + ); + retval = Err(e); + } + } + // Remove segment from outgoing list + self.sync_state.remove_pibd_segment(&SegmentTypeIdentifier { + segment_type: SegmentType::RangeProof, + identifier, + }); + retval + } + + fn receive_kernel_segment( + &self, + block_hash: Hash, + segment: Segment, + ) -> Result { + debug!( + "Received kernel segment {} for block_hash: {}", + segment.identifier().idx, + block_hash, + ); + let archive_header = self.chain().txhashset_archive_header_header_only()?; + let identifier = segment.identifier().clone(); + let mut retval = Ok(true); + if let Some(d) = self.chain().desegmenter(&archive_header)?.write().as_mut() { + let res = d.add_kernel_segment(segment); + if let Err(e) = res { + error!( + "Validation of incoming rangeproof segment failed: {:?}, reason: {}", + identifier, e + ); + retval = Err(e); + } + } + // Remove segment from outgoing list + self.sync_state.remove_pibd_segment(&SegmentTypeIdentifier { + segment_type: SegmentType::Kernel, + identifier, + }); + retval + } } impl NetToChainAdapter diff --git a/servers/src/grin/sync/body_sync.rs b/servers/src/grin/sync/body_sync.rs index 9c726b6f0..c50a7ee6a 100644 --- a/servers/src/grin/sync/body_sync.rs +++ b/servers/src/grin/sync/body_sync.rs @@ -85,7 +85,7 @@ impl BodySync { let fork_point = self.chain.fork_point()?; if self.chain.check_txhashset_needed(&fork_point)? { - debug!( + trace!( "body_sync: cannot sync full blocks earlier than horizon. will request txhashset", ); return Ok(true); @@ -211,7 +211,7 @@ impl BodySync { // off by one to account for broadcast adding a couple orphans if self.blocks_requested < 2 { // no pending block requests, ask more - debug!("body_sync: no pending block request, asking more"); + trace!("body_sync: no pending block request, asking more"); return Ok(true); } diff --git a/servers/src/grin/sync/state_sync.rs b/servers/src/grin/sync/state_sync.rs index b16338da3..f0da0d475 100644 --- a/servers/src/grin/sync/state_sync.rs +++ b/servers/src/grin/sync/state_sync.rs @@ -16,11 +16,12 @@ use chrono::prelude::{DateTime, Utc}; use chrono::Duration; use std::sync::Arc; -use crate::chain::{self, SyncState, SyncStatus}; -use crate::core::core::hash::Hashed; +use crate::chain::{self, pibd_params, SyncState, SyncStatus}; +use crate::core::core::{hash::Hashed, pmmr::segment::SegmentType}; use crate::core::global; use crate::core::pow::Difficulty; use crate::p2p::{self, Capabilities, Peer}; +use crate::util::StopState; /// Fast sync has 3 "states": /// * syncing headers @@ -35,6 +36,9 @@ pub struct StateSync { prev_state_sync: Option>, state_sync_peer: Option>, + + pibd_aborted: bool, + earliest_zero_pibd_peer_time: Option>, } impl StateSync { @@ -49,9 +53,22 @@ impl StateSync { chain, prev_state_sync: None, state_sync_peer: None, + pibd_aborted: false, + earliest_zero_pibd_peer_time: None, } } + /// Flag to abort PIBD process + pub fn set_pibd_aborted(&mut self) { + self.pibd_aborted = true; + } + + /// Record earliest time at which we had no suitable + /// peers for continuing PIBD + pub fn set_earliest_zero_pibd_peer_time(&mut self, t: Option>) { + self.earliest_zero_pibd_peer_time = t; + } + /// Check whether state sync should run and triggers a state download when /// it's time (we have all headers). Returns true as long as state sync /// needs monitoring, false when it's either done or turned off. @@ -61,6 +78,7 @@ impl StateSync { head: &chain::Tip, tail: &chain::Tip, highest_height: u64, + stop_state: Arc, ) -> bool { trace!("state_sync: head.height: {}, tail.height: {}. header_head.height: {}, highest_height: {}", head.height, tail.height, header_head.height, highest_height, @@ -74,15 +92,60 @@ impl StateSync { sync_need_restart = true; } + // Determine whether we're going to try using PIBD or whether we've already given up + // on it + let using_pibd = + if let SyncStatus::TxHashsetPibd { aborted: true, .. } = self.sync_state.status() { + false + } else if self.pibd_aborted { + false + } else { + // Only on testing chains for now + if global::get_chain_type() != global::ChainTypes::Mainnet { + true + //false + } else { + false + } + }; + + // Check whether we've errored and should restart pibd + if using_pibd { + if let SyncStatus::TxHashsetPibd { errored: true, .. } = self.sync_state.status() { + let archive_header = self.chain.txhashset_archive_header_header_only().unwrap(); + error!("PIBD Reported Failure - Restarting Sync"); + // reset desegmenter state + let desegmenter = self.chain.desegmenter(&archive_header).unwrap(); + + if let Some(d) = desegmenter.write().as_mut() { + d.reset(); + }; + if let Err(e) = self.chain.reset_chain_head(self.chain.genesis(), false) { + error!("pibd_sync restart: chain reset error = {}", e); + } + if let Err(e) = self.chain.reset_pibd_head() { + error!("pibd_sync restart: reset pibd_head error = {}", e); + } + if let Err(e) = self.chain.reset_prune_lists() { + error!("pibd_sync restart: reset prune lists error = {}", e); + } + self.sync_state + .update_pibd_progress(false, false, 0, 1, &archive_header); + sync_need_restart = true; + } + } + // check peer connection status of this sync - if let Some(ref peer) = self.state_sync_peer { - if let SyncStatus::TxHashsetDownload { .. } = self.sync_state.status() { - if !peer.is_connected() { - sync_need_restart = true; - info!( - "state_sync: peer connection lost: {:?}. restart", - peer.info.addr, - ); + if !using_pibd { + if let Some(ref peer) = self.state_sync_peer { + if let SyncStatus::TxHashsetDownload { .. } = self.sync_state.status() { + if !peer.is_connected() { + sync_need_restart = true; + info!( + "state_sync: peer connection lost: {:?}. restart", + peer.info.addr, + ); + } } } } @@ -111,37 +174,211 @@ impl StateSync { // run fast sync if applicable, normally only run one-time, except restart in error if sync_need_restart || header_head.height == highest_height { - let (go, download_timeout) = self.state_sync_due(); - - if let SyncStatus::TxHashsetDownload { .. } = self.sync_state.status() { - if download_timeout { - error!("state_sync: TxHashsetDownload status timeout in 10 minutes!"); + if using_pibd { + if sync_need_restart { + return true; + } + let (launch, _download_timeout) = self.state_sync_due(); + let archive_header = { self.chain.txhashset_archive_header_header_only().unwrap() }; + if launch { self.sync_state - .set_sync_error(chain::Error::SyncError(format!( - "{:?}", - p2p::Error::Timeout - ))); + .update_pibd_progress(false, false, 0, 1, &archive_header); } - } + // Continue our PIBD process (which returns true if all segments are in) + if self.continue_pibd() { + let desegmenter = self.chain.desegmenter(&archive_header).unwrap(); + // All segments in, validate + if let Some(d) = desegmenter.read().as_ref() { + if let Ok(true) = d.check_progress(self.sync_state.clone()) { + if let Err(e) = d.check_update_leaf_set_state() { + error!("error updating PIBD leaf set: {}", e); + self.sync_state.update_pibd_progress( + false, + true, + 0, + 1, + &archive_header, + ); + return false; + } + if let Err(e) = d.validate_complete_state( + self.sync_state.clone(), + stop_state.clone(), + ) { + error!("error validating PIBD state: {}", e); + self.sync_state.update_pibd_progress( + false, + true, + 0, + 1, + &archive_header, + ); + return false; + } + return true; + } + }; + } + } else { + let (go, download_timeout) = self.state_sync_due(); - if go { - self.state_sync_peer = None; - match self.request_state(&header_head) { - Ok(peer) => { - self.state_sync_peer = Some(peer); + if let SyncStatus::TxHashsetDownload { .. } = self.sync_state.status() { + if download_timeout { + error!("state_sync: TxHashsetDownload status timeout in 10 minutes!"); + self.sync_state + .set_sync_error(chain::Error::SyncError(format!( + "{:?}", + p2p::Error::Timeout + ))); } - Err(e) => self - .sync_state - .set_sync_error(chain::Error::SyncError(format!("{:?}", e))), } - self.sync_state - .update(SyncStatus::TxHashsetDownload(Default::default())); + if go { + self.state_sync_peer = None; + match self.request_state(&header_head) { + Ok(peer) => { + self.state_sync_peer = Some(peer); + } + Err(e) => self + .sync_state + .set_sync_error(chain::Error::SyncError(format!("{:?}", e))), + } + + self.sync_state + .update(SyncStatus::TxHashsetDownload(Default::default())); + } } } true } + /// Continue the PIBD process, returning true if the desegmenter is reporting + /// that the process is done + fn continue_pibd(&mut self) -> bool { + // Check the state of our chain to figure out what we should be requesting next + let archive_header = self.chain.txhashset_archive_header_header_only().unwrap(); + let desegmenter = self.chain.desegmenter(&archive_header).unwrap(); + + // Remove stale requests, if we haven't recieved the segment within a minute re-request + // TODO: verify timing + self.sync_state + .remove_stale_pibd_requests(pibd_params::SEGMENT_REQUEST_TIMEOUT_SECS); + + // Apply segments... TODO: figure out how this should be called, might + // need to be a separate thread. + if let Some(mut de) = desegmenter.try_write() { + if let Some(d) = de.as_mut() { + let res = d.apply_next_segments(); + if let Err(e) = res { + error!("error applying segment: {}", e); + self.sync_state + .update_pibd_progress(false, true, 0, 1, &archive_header); + return false; + } + } + } + + // TODO and consider: number here depends on how many simultaneous + // requests we want to send to peers + let mut next_segment_ids = vec![]; + if let Some(d) = desegmenter.write().as_mut() { + if let Ok(true) = d.check_progress(self.sync_state.clone()) { + return true; + } + // Figure out the next segments we need + // (12 is divisible by 3, to try and evenly spread the requests among the 3 + // main pmmrs. Bitmaps segments will always be requested first) + next_segment_ids = d.next_desired_segments(pibd_params::SEGMENT_REQUEST_COUNT); + } + + // For each segment, pick a desirable peer and send message + // (Provided we're not waiting for a response for this message from someone else) + for seg_id in next_segment_ids.iter() { + if self.sync_state.contains_pibd_segment(seg_id) { + trace!("Request list contains, continuing: {:?}", seg_id); + continue; + } + + // TODO: urg + let peers = self.peers.clone(); + + // First, get max difficulty or greater peers + let peers_iter = || peers.iter().connected(); + let max_diff = peers_iter().max_difficulty().unwrap_or(Difficulty::zero()); + let peers_iter_max = || peers_iter().with_difficulty(|x| x >= max_diff); + + // Then, further filter by PIBD capabilities v1 + let peers_iter_pibd = || { + peers_iter_max() + .with_capabilities(Capabilities::PIBD_HIST_1) + .connected() + }; + + // If there are no suitable PIBD-Enabled peers, AND there hasn't been one for a minute, + // abort PIBD and fall back to txhashset download + // Waiting a minute helps ensures that the cancellation isn't simply due to a single non-PIBD enabled + // peer having the max difficulty + if peers_iter_pibd().count() == 0 { + if let None = self.earliest_zero_pibd_peer_time { + self.set_earliest_zero_pibd_peer_time(Some(Utc::now())); + } + if self.earliest_zero_pibd_peer_time.unwrap() + + Duration::seconds(pibd_params::TXHASHSET_ZIP_FALLBACK_TIME_SECS) + < Utc::now() + { + // random abort test + info!("No PIBD-enabled max-difficulty peers for the past {} seconds - Aborting PIBD and falling back to TxHashset.zip download", pibd_params::TXHASHSET_ZIP_FALLBACK_TIME_SECS); + self.sync_state + .update_pibd_progress(true, true, 0, 1, &archive_header); + self.sync_state + .set_sync_error(chain::Error::AbortingPIBDError); + self.set_pibd_aborted(); + return false; + } + } else { + self.set_earliest_zero_pibd_peer_time(None) + } + + // Choose a random "most work" peer, preferring outbound if at all possible. + let peer = peers_iter_pibd() + .outbound() + .choose_random() + .or_else(|| peers_iter_pibd().inbound().choose_random()); + trace!("Chosen peer is {:?}", peer); + + if let Some(p) = peer { + // add to list of segments that are being tracked + self.sync_state.add_pibd_segment(seg_id); + let res = match seg_id.segment_type { + SegmentType::Bitmap => p.send_bitmap_segment_request( + archive_header.hash(), + seg_id.identifier.clone(), + ), + SegmentType::Output => p.send_output_segment_request( + archive_header.hash(), + seg_id.identifier.clone(), + ), + SegmentType::RangeProof => p.send_rangeproof_segment_request( + archive_header.hash(), + seg_id.identifier.clone(), + ), + SegmentType::Kernel => p.send_kernel_segment_request( + archive_header.hash(), + seg_id.identifier.clone(), + ), + }; + if let Err(e) = res { + info!( + "Error sending request to peer at {}, reason: {:?}", + p.info.addr, e + ); + self.sync_state.remove_pibd_segment(seg_id); + } + } + } + false + } + fn request_state(&self, header_head: &chain::Tip) -> Result, p2p::Error> { let threshold = global::state_sync_threshold() as u64; let archive_interval = global::txhashset_archive_interval(); diff --git a/servers/src/grin/sync/syncer.rs b/servers/src/grin/sync/syncer.rs index a351297ce..d3642970c 100644 --- a/servers/src/grin/sync/syncer.rs +++ b/servers/src/grin/sync/syncer.rs @@ -207,8 +207,9 @@ impl SyncRunner { let mut check_state_sync = false; match self.sync_state.status() { - SyncStatus::TxHashsetDownload { .. } - | SyncStatus::TxHashsetSetup + SyncStatus::TxHashsetPibd { .. } + | SyncStatus::TxHashsetDownload { .. } + | SyncStatus::TxHashsetSetup { .. } | SyncStatus::TxHashsetRangeProofsValidation { .. } | SyncStatus::TxHashsetKernelsValidation { .. } | SyncStatus::TxHashsetSave @@ -228,7 +229,13 @@ impl SyncRunner { } if check_state_sync { - state_sync.check_run(&header_head, &head, &tail, highest_height); + state_sync.check_run( + &header_head, + &head, + &tail, + highest_height, + self.stop_state.clone(), + ); } } } diff --git a/src/bin/tui/status.rs b/src/bin/tui/status.rs index 4e78ca047..992905da3 100644 --- a/src/bin/tui/status.rs +++ b/src/bin/tui/status.rs @@ -50,6 +50,24 @@ impl TUIStatusView { }; Cow::Owned(format!("Sync step 1/7: Downloading headers: {}%", percent)) } + SyncStatus::TxHashsetPibd { + aborted: _, + errored: _, + completed_leaves, + leaves_required, + completed_to_height: _, + required_height: _, + } => { + let percent = if completed_leaves == 0 { + 0 + } else { + completed_leaves * 100 / leaves_required + }; + Cow::Owned(format!( + "Sync step 2/7: Downloading Tx state (PIBD) - {} / {} entries - {}%", + completed_leaves, leaves_required, percent + )) + } SyncStatus::TxHashsetDownload(stat) => { if stat.total_size > 0 { let percent = stat.downloaded_size * 100 / stat.total_size; @@ -72,8 +90,31 @@ impl TUIStatusView { )) } } - SyncStatus::TxHashsetSetup => { - Cow::Borrowed("Sync step 3/7: Preparing chain state for validation") + SyncStatus::TxHashsetSetup { + headers, + headers_total, + kernel_pos, + kernel_pos_total, + } => { + if headers.is_some() && headers_total.is_some() { + let h = headers.unwrap(); + let ht = headers_total.unwrap(); + let percent = h * 100 / ht; + Cow::Owned(format!( + "Sync step 3/7: Preparing for validation (kernel history) - {}/{} - {}%", + h, ht, percent + )) + } else if kernel_pos.is_some() && kernel_pos_total.is_some() { + let k = kernel_pos.unwrap(); + let kt = kernel_pos_total.unwrap(); + let percent = k * 100 / kt; + Cow::Owned(format!( + "Sync step 3/7: Preparing for validation (kernel position) - {}/{} - {}%", + k, kt, percent + )) + } else { + Cow::Borrowed("Sync step 3/7: Preparing chain state for validation") + } } SyncStatus::TxHashsetRangeProofsValidation { rproofs, diff --git a/store/src/leaf_set.rs b/store/src/leaf_set.rs index 77c140c03..ce839a5f0 100644 --- a/store/src/leaf_set.rs +++ b/store/src/leaf_set.rs @@ -196,6 +196,11 @@ impl LeafSet { self.bitmap.cardinality() as usize } + /// Number of positions up to index n in the leaf set + pub fn n_unpruned_leaves_to_index(&self, to_index: u64) -> u64 { + self.bitmap.range_cardinality(0..to_index) + } + /// Is the leaf_set empty. pub fn is_empty(&self) -> bool { self.len() == 0 diff --git a/store/src/pmmr.rs b/store/src/pmmr.rs index 44f32d141..790ee1a43 100644 --- a/store/src/pmmr.rs +++ b/store/src/pmmr.rs @@ -102,6 +102,13 @@ impl Backend for PMMRBackend { Ok(()) } + fn append_hash(&mut self, hash: Hash) -> Result<(), String> { + self.hash_file + .append(&hash) + .map_err(|e| format!("Failed to append hash to file. {}", e))?; + Ok(()) + } + fn get_from_file(&self, pos0: u64) -> Option { if self.is_compacted(pos0) { return None; @@ -149,6 +156,11 @@ impl Backend for PMMRBackend { self.get_data_from_file(pos0) } + /// Remove leaf from leaf set + fn remove_from_leaf_set(&mut self, pos0: u64) { + self.leaf_set.remove(pos0); + } + /// Returns an iterator over all the leaf positions. /// for a prunable PMMR this is an iterator over the leaf_set bitmap. /// For a non-prunable PMMR this is *all* leaves (this is not yet implemented). @@ -168,6 +180,14 @@ impl Backend for PMMRBackend { } } + fn n_unpruned_leaves_to_index(&self, to_index: u64) -> u64 { + if self.prunable { + self.leaf_set.n_unpruned_leaves_to_index(to_index) + } else { + pmmr::n_leaves(pmmr::insertion_to_pmmr_index(to_index)) + } + } + /// Returns an iterator over all the leaf insertion indices (0-indexed). /// If our pos are [1,2,4,5,8] (first 5 leaf pos) then our insertion indices are [0,1,2,3,4] fn leaf_idx_iter(&self, from_idx: u64) -> Box + '_> { @@ -217,6 +237,14 @@ impl Backend for PMMRBackend { Ok(()) } + fn reset_prune_list(&mut self) { + let bitmap = Bitmap::create(); + self.prune_list = PruneList::new(Some(self.data_dir.join(PMMR_PRUN_FILE)), bitmap); + if let Err(e) = self.prune_list.flush() { + error!("Flushing reset prune list: {}", e); + } + } + /// Remove by insertion position. fn remove(&mut self, pos0: u64) -> Result<(), String> { assert!(self.prunable, "Remove on non-prunable MMR"); @@ -349,6 +377,7 @@ impl PMMRBackend { .and(self.hash_file.flush()) .and(self.data_file.flush()) .and(self.sync_leaf_set()) + .and(self.prune_list.flush()) .map_err(|e| { io::Error::new( io::ErrorKind::Interrupted,