From d36a0b29ef8fc8de1b9994b1dde4c29498f870ed Mon Sep 17 00:00:00 2001 From: Gary Yu Date: Fri, 30 Aug 2019 00:14:39 +0800 Subject: [PATCH] store both mmr index and block height into database for output (#2903) * store both mmr index and block height into database for output * rustfmt * fix: mmr position is 1-based instead of 0-based * (Hash, u64, u64) deserves a type --- api/src/handlers/transactions_api.rs | 4 +- api/src/handlers/utils.rs | 27 ++++-- api/src/types.rs | 9 +- chain/src/chain.rs | 126 +++++++++++++++++---------- chain/src/store.rs | 85 ++++++++++++++++-- chain/src/txhashset/txhashset.rs | 24 +++-- chain/src/types.rs | 12 +++ servers/src/grin/server.rs | 3 + store/src/lmdb.rs | 2 +- 9 files changed, 216 insertions(+), 76 deletions(-) diff --git a/api/src/handlers/transactions_api.rs b/api/src/handlers/transactions_api.rs index 878db7f1a..193755222 100644 --- a/api/src/handlers/transactions_api.rs +++ b/api/src/handlers/transactions_api.rs @@ -70,8 +70,8 @@ impl TxHashSetHandler { // allows traversal of utxo set fn outputs(&self, start_index: u64, mut max: u64) -> Result { //set a limit here - if max > 1000 { - max = 1000; + if max > 10_000 { + max = 10_000; } let chain = w(&self.chain)?; let outputs = chain diff --git a/api/src/handlers/utils.rs b/api/src/handlers/utils.rs index 1aa309e98..2eea81904 100644 --- a/api/src/handlers/utils.rs +++ b/api/src/handlers/utils.rs @@ -51,15 +51,24 @@ pub fn get_output( let chain = w(chain)?; - for x in outputs.iter().filter(|x| chain.is_unspent(x).is_ok()) { - let block_height = chain - .get_header_for_output(&x) - .context(ErrorKind::Internal( - "Can't get header for output".to_owned(), - ))? - .height; - let output_pos = chain.get_output_pos(&x.commit).unwrap_or(0); - return Ok((Output::new(&commit, block_height, output_pos), x.clone())); + for x in outputs.iter() { + let res = chain.is_unspent(x); + match res { + Ok(output_pos) => { + return Ok(( + Output::new(&commit, output_pos.height, output_pos.position), + x.clone(), + )); + } + Err(e) => { + trace!( + "get_output: err: {} for commit: {:?} with feature: {:?}", + e.to_string(), + x.commit, + x.features + ); + } + } } Err(ErrorKind::NotFound)? } diff --git a/api/src/types.rs b/api/src/types.rs index efdfb219a..52b90fb44 100644 --- a/api/src/types.rs +++ b/api/src/types.rs @@ -276,10 +276,11 @@ impl OutputPrintable { }; let out_id = core::OutputIdentifier::from_output(&output); - let spent = chain.is_unspent(&out_id).is_err(); - let block_height = match spent { - true => None, - false => Some(chain.get_header_for_output(&out_id)?.height), + let res = chain.is_unspent(&out_id); + let (spent, block_height) = if let Ok(output_pos) = res { + (false, Some(output_pos.height)) + } else { + (true, None) }; let proof = if include_proof { diff --git a/chain/src/chain.rs b/chain/src/chain.rs index 81c147109..879fcdb48 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -30,7 +30,8 @@ use crate::store; use crate::txhashset; use crate::txhashset::TxHashSet; use crate::types::{ - BlockStatus, ChainAdapter, NoStatus, Options, Tip, TxHashSetRoots, TxHashsetWriteStatus, + BlockStatus, ChainAdapter, NoStatus, Options, OutputMMRPosition, Tip, TxHashSetRoots, + TxHashsetWriteStatus, }; use crate::util::secp::pedersen::{Commitment, RangeProof}; use crate::util::RwLock; @@ -467,13 +468,9 @@ impl Chain { /// spent. This querying is done in a way that is consistent with the /// current chain state, specifically the current winning (valid, most /// work) fork. - pub fn is_unspent(&self, output_ref: &OutputIdentifier) -> Result { + pub fn is_unspent(&self, output_ref: &OutputIdentifier) -> Result { let txhashset = self.txhashset.read(); - let res = txhashset.is_unspent(output_ref); - match res { - Err(e) => Err(e), - Ok((h, _)) => Ok(h), - } + txhashset.is_unspent(output_ref) } /// Validate the tx against the current UTXO set. @@ -980,6 +977,8 @@ impl Chain { debug!("txhashset_write: replaced our txhashset with the new one"); + self.rebuild_height_for_pos()?; + // Check for any orphan blocks and process them based on the new chain state. self.check_orphans(header.height + 1); @@ -1067,26 +1066,30 @@ impl Chain { } } - // Take a write lock on the txhashet and start a new writeable db batch. - let mut txhashset = self.txhashset.write(); - let mut batch = self.store.batch()?; + { + // Take a write lock on the txhashet and start a new writeable db batch. + let mut txhashset = self.txhashset.write(); + let mut batch = self.store.batch()?; - // Compact the txhashset itself (rewriting the pruned backend files). - txhashset.compact(&mut batch)?; + // Compact the txhashset itself (rewriting the pruned backend files). + txhashset.compact(&mut batch)?; - // Rebuild our output_pos index in the db based on current UTXO set. - txhashset::extending(&mut txhashset, &mut batch, |extension| { - extension.rebuild_index()?; - Ok(()) - })?; + // Rebuild our output_pos index in the db based on current UTXO set. + txhashset::extending(&mut txhashset, &mut batch, |extension| { + extension.rebuild_index()?; + Ok(()) + })?; - // If we are not in archival mode remove historical blocks from the db. - if !self.archive_mode { - self.remove_historical_blocks(&txhashset, &mut batch)?; + // If we are not in archival mode remove historical blocks from the db. + if !self.archive_mode { + self.remove_historical_blocks(&txhashset, &mut batch)?; + } + + // Commit all the above db changes. + batch.commit()?; } - // Commit all the above db changes. - batch.commit()?; + self.rebuild_height_for_pos()?; Ok(()) } @@ -1216,6 +1219,57 @@ impl Chain { Ok(hash) } + /// Migrate the index 'commitment -> output_pos' to index 'commitment -> (output_pos, block_height)' + /// Note: should only be called in two cases: + /// - Node start-up. For database migration from the old version. + /// - After the txhashset 'rebuild_index' when state syncing or compact. + pub fn rebuild_height_for_pos(&self) -> Result<(), Error> { + let txhashset = self.txhashset.read(); + let mut outputs_pos = txhashset.get_all_output_pos()?; + let total_outputs = outputs_pos.len(); + if total_outputs == 0 { + debug!("rebuild_height_for_pos: nothing to be rebuilt"); + return Ok(()); + } else { + debug!( + "rebuild_height_for_pos: rebuilding {} output_pos's height...", + total_outputs + ); + } + outputs_pos.sort_by(|a, b| a.1.cmp(&b.1)); + + let max_height = { + let head = self.head()?; + head.height + }; + + let batch = self.store.batch()?; + // clear it before rebuilding + batch.clear_output_pos_height()?; + + let mut i = 0; + for search_height in 0..max_height { + let h = txhashset.get_header_by_height(search_height + 1)?; + while i < total_outputs { + let (commit, pos) = outputs_pos[i]; + if pos > h.output_mmr_size { + // Note: MMR position is 1-based and not 0-based, so here must be '>' instead of '>=' + break; + } + batch.save_output_pos_height(&commit, pos, h.height)?; + trace!("rebuild_height_for_pos: {:?}", (commit, pos, h.height)); + i += 1; + } + } + + // clear the output_pos since now it has been replaced by the new index + batch.clear_output_pos()?; + + batch.commit()?; + debug!("rebuild_height_for_pos: done"); + Ok(()) + } + /// Gets the block header in which a given output appears in the txhashset. pub fn get_header_for_output( &self, @@ -1223,32 +1277,8 @@ impl Chain { ) -> Result { let txhashset = self.txhashset.read(); - let (_, pos) = txhashset.is_unspent(output_ref)?; - - let mut min = 0; - let mut max = { - let head = self.head()?; - head.height - }; - - loop { - let search_height = max - (max - min) / 2; - let h = txhashset.get_header_by_height(search_height)?; - if search_height == 0 { - return Ok(h); - } - let h_prev = txhashset.get_header_by_height(search_height - 1)?; - if pos > h.output_mmr_size { - min = search_height; - } else if pos < h_prev.output_mmr_size { - max = search_height; - } else { - if pos == h_prev.output_mmr_size { - return Ok(h_prev); - } - return Ok(h); - } - } + let output_pos = txhashset.is_unspent(output_ref)?; + Ok(txhashset.get_header_by_height(output_pos.height)?) } /// Verifies the given block header is actually on the current chain. diff --git a/chain/src/store.rs b/chain/src/store.rs index 6a94488d0..08461db5b 100644 --- a/chain/src/store.rs +++ b/chain/src/store.rs @@ -34,6 +34,7 @@ const TAIL_PREFIX: u8 = 'T' as u8; const HEADER_HEAD_PREFIX: u8 = 'I' as u8; const SYNC_HEAD_PREFIX: u8 = 's' as u8; const COMMIT_POS_PREFIX: u8 = 'c' as u8; +const COMMIT_POS_HGT_PREFIX: u8 = 'p' as u8; const BLOCK_INPUT_BITMAP_PREFIX: u8 = 'B' as u8; const BLOCK_SUMS_PREFIX: u8 = 'M' as u8; @@ -111,11 +112,42 @@ impl ChainStore { ) } + /// Get all outputs PMMR pos. (only for migration purpose) + pub fn get_all_output_pos(&self) -> Result, Error> { + let mut outputs_pos = Vec::new(); + let key = to_key(COMMIT_POS_PREFIX, &mut "".to_string().into_bytes()); + for (k, pos) in self.db.iter::(&key)? { + outputs_pos.push((Commitment::from_vec(k[2..].to_vec()), pos)); + } + Ok(outputs_pos) + } + /// Get PMMR pos for the given output commitment. + /// Note: + /// - Original prefix 'COMMIT_POS_PREFIX' is not used anymore for normal case, refer to #2889 for detail. + /// - To be compatible with the old callers, let's keep this function name but replace with new prefix 'COMMIT_POS_HGT_PREFIX' pub fn get_output_pos(&self, commit: &Commitment) -> Result { + let res: Result, Error> = self.db.get_ser(&to_key( + COMMIT_POS_HGT_PREFIX, + &mut commit.as_ref().to_vec(), + )); + match res { + Ok(None) => Err(Error::NotFoundErr(format!( + "Output position for: {:?}", + commit + ))), + Ok(Some((pos, _height))) => Ok(pos), + Err(e) => Err(e), + } + } + + /// Get PMMR pos and block height for the given output commitment. + pub fn get_output_pos_height(&self, commit: &Commitment) -> Result<(u64, u64), Error> { option_to_not_found( - self.db - .get_ser(&to_key(COMMIT_POS_PREFIX, &mut commit.as_ref().to_vec())), + self.db.get_ser(&to_key( + COMMIT_POS_HGT_PREFIX, + &mut commit.as_ref().to_vec(), + )), &format!("Output position for: {:?}", commit), ) } @@ -253,16 +285,50 @@ impl<'a> Batch<'a> { ) } + /// Save output_pos and block height to index. + pub fn save_output_pos_height( + &self, + commit: &Commitment, + pos: u64, + height: u64, + ) -> Result<(), Error> { + self.db.put_ser( + &to_key(COMMIT_POS_HGT_PREFIX, &mut commit.as_ref().to_vec())[..], + &(pos, height), + ) + } + /// Get output_pos from index. + /// Note: + /// - Original prefix 'COMMIT_POS_PREFIX' is not used for normal case anymore, refer to #2889 for detail. + /// - To be compatible with the old callers, let's keep this function name but replace with new prefix 'COMMIT_POS_HGT_PREFIX' pub fn get_output_pos(&self, commit: &Commitment) -> Result { + let res: Result, Error> = self.db.get_ser(&to_key( + COMMIT_POS_HGT_PREFIX, + &mut commit.as_ref().to_vec(), + )); + match res { + Ok(None) => Err(Error::NotFoundErr(format!( + "Output position for: {:?}", + commit + ))), + Ok(Some((pos, _height))) => Ok(pos), + Err(e) => Err(e), + } + } + + /// Get output_pos and block height from index. + pub fn get_output_pos_height(&self, commit: &Commitment) -> Result<(u64, u64), Error> { option_to_not_found( - self.db - .get_ser(&to_key(COMMIT_POS_PREFIX, &mut commit.as_ref().to_vec())), + self.db.get_ser(&to_key( + COMMIT_POS_HGT_PREFIX, + &mut commit.as_ref().to_vec(), + )), &format!("Output position for commit: {:?}", commit), ) } - /// Clear all entries from the output_pos index (must be rebuilt after). + /// Clear all entries from the output_pos index. pub fn clear_output_pos(&self) -> Result<(), Error> { let key = to_key(COMMIT_POS_PREFIX, &mut "".to_string().into_bytes()); for (k, _) in self.db.iter::(&key)? { @@ -271,6 +337,15 @@ impl<'a> Batch<'a> { Ok(()) } + /// Clear all entries from the (output_pos,height) index (must be rebuilt after). + pub fn clear_output_pos_height(&self) -> Result<(), Error> { + let key = to_key(COMMIT_POS_HGT_PREFIX, &mut "".to_string().into_bytes()); + for (k, _) in self.db.iter::<(u64, u64)>(&key)? { + self.db.delete(&k)?; + } + Ok(()) + } + /// Get the previous header. pub fn get_previous_header(&self, header: &BlockHeader) -> Result { self.get_block_header(&header.prev_hash) diff --git a/chain/src/txhashset/txhashset.rs b/chain/src/txhashset/txhashset.rs index 01f4c9baa..95955ebff 100644 --- a/chain/src/txhashset/txhashset.rs +++ b/chain/src/txhashset/txhashset.rs @@ -27,7 +27,7 @@ use crate::core::ser::{PMMRIndexHashable, PMMRable}; use crate::error::{Error, ErrorKind}; use crate::store::{Batch, ChainStore}; use crate::txhashset::{RewindableKernelView, UTXOView}; -use crate::types::{Tip, TxHashSetRoots, TxHashsetWriteStatus}; +use crate::types::{OutputMMRPosition, Tip, TxHashSetRoots, TxHashsetWriteStatus}; use crate::util::secp::pedersen::{Commitment, RangeProof}; use crate::util::{file, secp_static, zip}; use croaring::Bitmap; @@ -172,14 +172,18 @@ impl TxHashSet { /// Check if an output is unspent. /// We look in the index to find the output MMR pos. /// Then we check the entry in the output MMR and confirm the hash matches. - pub fn is_unspent(&self, output_id: &OutputIdentifier) -> Result<(Hash, u64), Error> { - match self.commit_index.get_output_pos(&output_id.commit) { - Ok(pos) => { + pub fn is_unspent(&self, output_id: &OutputIdentifier) -> Result { + match self.commit_index.get_output_pos_height(&output_id.commit) { + Ok((pos, block_height)) => { let output_pmmr: ReadonlyPMMR<'_, Output, _> = ReadonlyPMMR::at(&self.output_pmmr_h.backend, self.output_pmmr_h.last_pos); if let Some(hash) = output_pmmr.get_hash(pos) { if hash == output_id.hash_with_index(pos - 1) { - Ok((hash, pos)) + Ok(OutputMMRPosition { + output_mmr_hash: hash, + position: pos, + height: block_height, + }) } else { Err(ErrorKind::TxHashSetErr(format!("txhashset hash mismatch")).into()) } @@ -234,6 +238,11 @@ impl TxHashSet { Ok(header) } + /// Get all outputs MMR pos + pub fn get_all_output_pos(&self) -> Result, Error> { + Ok(self.commit_index.get_all_output_pos()?) + } + /// returns outputs from the given insertion (leaf) index up to the /// specified limit. Also returns the last index actually populated pub fn outputs_by_insertion_index( @@ -869,8 +878,9 @@ impl<'a> Extension<'a> { for out in b.outputs() { let pos = self.apply_output(out)?; - // Update the output_pos index for the new output. - self.batch.save_output_pos(&out.commitment(), pos)?; + // Update the (output_pos,height) index for the new output. + self.batch + .save_output_pos_height(&out.commitment(), pos, b.header.height)?; } for input in b.inputs() { diff --git a/chain/src/types.rs b/chain/src/types.rs index 6ab6e6dac..0e54a22d5 100644 --- a/chain/src/types.rs +++ b/chain/src/types.rs @@ -217,6 +217,18 @@ pub struct TxHashSetRoots { pub kernel_root: Hash, } +/// A helper to hold the output pmmr position of the txhashset in order to keep them +/// readable. +#[derive(Debug)] +pub struct OutputMMRPosition { + /// The hash at the output position in the MMR. + pub output_mmr_hash: Hash, + /// MMR position + pub position: u64, + /// Block height + pub height: u64, +} + /// The tip of a fork. A handle to the fork ancestry from its leaf in the /// blockchain tree. References the max height and the latest and previous /// blocks diff --git a/servers/src/grin/server.rs b/servers/src/grin/server.rs index 93be47002..a5e61e511 100644 --- a/servers/src/grin/server.rs +++ b/servers/src/grin/server.rs @@ -189,6 +189,9 @@ impl Server { archive_mode, )?); + // launching the database migration if needed + shared_chain.rebuild_height_for_pos()?; + pool_adapter.set_chain(shared_chain.clone()); let net_adapter = Arc::new(NetToChainAdapter::new( diff --git a/store/src/lmdb.rs b/store/src/lmdb.rs index 62c6aca23..12b5fb0bb 100644 --- a/store/src/lmdb.rs +++ b/store/src/lmdb.rs @@ -396,7 +396,7 @@ where { fn deser_if_prefix_match(&self, key: &[u8], value: &[u8]) -> Option<(Vec, T)> { let plen = self.prefix.len(); - if plen == 0 || key[0..plen] == self.prefix[..] { + if plen == 0 || (key.len() >= plen && key[0..plen] == self.prefix[..]) { if let Ok(value) = ser::deserialize(&mut &value[..], self.version) { Some((key.to_vec(), value)) } else {