From e7bbda81a0a815994955ce8fc71f710cd17a11bb Mon Sep 17 00:00:00 2001 From: Antioch Peverell Date: Wed, 7 Oct 2020 11:47:25 +0100 Subject: [PATCH] migrate blocks in batches (rework db iterator impl) (#3450) * rework db iterator more robust block migration based on low level iterator * cleanup * cleanup * fixup --- chain/src/chain.rs | 41 +++++++--- chain/src/error.rs | 8 ++ chain/src/linked_list.rs | 4 +- chain/src/store.rs | 64 +++++++++------ chain/src/txhashset/txhashset.rs | 7 +- p2p/src/peers.rs | 17 ++-- p2p/src/store.rs | 20 +++-- servers/src/grin/seed.rs | 17 ++-- store/src/lmdb.rs | 129 ++++++++++++++++--------------- store/tests/lmdb.rs | 8 +- 10 files changed, 187 insertions(+), 128 deletions(-) diff --git a/chain/src/chain.rs b/chain/src/chain.rs index 530224f35..6fb9955bc 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -35,13 +35,14 @@ use crate::types::{ }; use crate::util::secp::pedersen::{Commitment, RangeProof}; use crate::{util::RwLock, ChainStore}; +use grin_core::ser; use grin_store::Error::NotFoundErr; -use std::collections::HashMap; use std::fs::{self, File}; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; +use std::{collections::HashMap, io::Cursor}; /// Orphan pool size is limited by MAX_ORPHAN_SIZE pub const MAX_ORPHAN_SIZE: usize = 200; @@ -1187,10 +1188,9 @@ impl Chain { let tail = batch.get_block_header(&tail_hash)?; // Remove old blocks (including short lived fork blocks) which height < tail.height - // here b is a block - for (_, b) in batch.blocks_iter()? { - if b.header.height < tail.height { - let _ = batch.delete_block(&b.hash()); + for block in batch.blocks_iter()? { + if block.header.height < tail.height { + let _ = batch.delete_block(&block.hash()); count += 1; } } @@ -1407,13 +1407,32 @@ impl Chain { /// Migrate our local db from v2 to v3. /// "commit only" inputs. fn migrate_db_v2_v3(store: &ChainStore) -> Result<(), Error> { - let store_v2 = store.with_version(ProtocolVersion(2)); - let batch = store_v2.batch()?; - for (_, block) in batch.blocks_iter()? { - batch.migrate_block(&block, ProtocolVersion(3))?; + let mut keys_to_migrate = vec![]; + for (k, v) in store.batch()?.blocks_raw_iter()? { + // We want to migrate all blocks that cannot be read via v3 protocol version. + let block_v2: Result = + ser::deserialize(&mut Cursor::new(&v), ProtocolVersion(2)); + let block_v3: Result = + ser::deserialize(&mut Cursor::new(&v), ProtocolVersion(3)); + if let (Ok(_), Err(_)) = (block_v2, block_v3) { + keys_to_migrate.push(k); + } } - batch.commit()?; - Ok(()) + debug!( + "migrate_db_v2_v3: {} blocks to migrate", + keys_to_migrate.len() + ); + let mut count = 0; + keys_to_migrate.chunks(100).try_for_each(|keys| { + let batch = store.batch()?; + for key in keys { + batch.migrate_block(&key, ProtocolVersion(2), ProtocolVersion(3))?; + count += 1; + } + batch.commit()?; + debug!("migrate_db_v2_v3: successfully migrated {} blocks", count); + Ok(()) + }) } /// Gets the block header in which a given output appears in the txhashset. diff --git a/chain/src/error.rs b/chain/src/error.rs index 227e7a52b..d4d7dba52 100644 --- a/chain/src/error.rs +++ b/chain/src/error.rs @@ -265,6 +265,14 @@ impl From for Error { } } +impl From for Error { + fn from(error: ser::Error) -> Error { + Error { + inner: Context::new(ErrorKind::SerErr(error)), + } + } +} + impl From for Error { fn from(e: secp::Error) -> Error { Error { diff --git a/chain/src/linked_list.rs b/chain/src/linked_list.rs index cb9822e0a..5c46fa1d0 100644 --- a/chain/src/linked_list.rs +++ b/chain/src/linked_list.rs @@ -390,12 +390,12 @@ impl PruneableListIndex for MultiIndex { let mut list_count = 0; let mut entry_count = 0; let prefix = to_key(self.list_prefix, ""); - for (key, _) in batch.db.iter::>(&prefix)? { + for key in batch.db.iter(&prefix, |k, _| Ok(k.to_vec()))? { let _ = batch.delete(&key); list_count += 1; } let prefix = to_key(self.entry_prefix, ""); - for (key, _) in batch.db.iter::>(&prefix)? { + for key in batch.db.iter(&prefix, |k, _| Ok(k.to_vec()))? { let _ = batch.delete(&key); entry_count += 1; } diff --git a/chain/src/store.rs b/chain/src/store.rs index a28bcece4..74c520180 100644 --- a/chain/src/store.rs +++ b/chain/src/store.rs @@ -23,8 +23,9 @@ use crate::linked_list::MultiIndex; use crate::types::{CommitPos, Tip}; use crate::util::secp::pedersen::Commitment; use croaring::Bitmap; +use grin_core::ser; use grin_store as store; -use grin_store::{option_to_not_found, to_key, Error, SerIterator}; +use grin_store::{option_to_not_found, to_key, Error}; use std::convert::TryInto; use std::sync::Arc; @@ -57,17 +58,6 @@ impl ChainStore { Ok(ChainStore { db }) } - /// Create a new instance of the chain store based on this instance - /// but with the provided protocol version. This is used when migrating - /// data in the db to a different protocol version, reading using one version and - /// writing back to the db with a different version. - pub fn with_version(&self, version: ProtocolVersion) -> ChainStore { - let db_with_version = self.db.with_version(version); - ChainStore { - db: db_with_version, - } - } - /// The current chain head. pub fn head(&self) -> Result { option_to_not_found(self.db.get_ser(&[HEAD_PREFIX]), || "HEAD".to_owned()) @@ -224,11 +214,20 @@ impl<'a> Batch<'a> { Ok(()) } - /// Migrate a block stored in the db by serializing it using the provided protocol version. - /// Block may have been read using a previous protocol version but we do not actually care. - pub fn migrate_block(&self, b: &Block, version: ProtocolVersion) -> Result<(), Error> { - self.db - .put_ser_with_version(&to_key(BLOCK_PREFIX, b.hash())[..], b, version)?; + /// Migrate a block stored in the db reading from one protocol version and writing + /// with new protocol version. + pub fn migrate_block( + &self, + key: &[u8], + from_version: ProtocolVersion, + to_version: ProtocolVersion, + ) -> Result<(), Error> { + let block: Option = self.db.get_with(key, move |_, mut v| { + ser::deserialize(&mut v, from_version).map_err(From::from) + })?; + if let Some(block) = block { + self.db.put_ser_with_version(key, &block, to_version)?; + } Ok(()) } @@ -283,9 +282,14 @@ impl<'a> Batch<'a> { } /// Iterator over the output_pos index. - pub fn output_pos_iter(&self) -> Result, Error> { + pub fn output_pos_iter(&self) -> Result, CommitPos)>, Error> { let key = to_key(OUTPUT_POS_PREFIX, ""); - self.db.iter(&key) + let protocol_version = self.db.protocol_version(); + self.db.iter(&key, move |k, mut v| { + ser::deserialize(&mut v, protocol_version) + .map(|pos| (k.to_vec(), pos)) + .map_err(From::from) + }) } /// Get output_pos from index. @@ -371,10 +375,26 @@ impl<'a> Batch<'a> { }) } - /// An iterator to all block in db - pub fn blocks_iter(&self) -> Result, Error> { + /// Iterator over all full blocks in the db. + /// Uses default db serialization strategy via db protocol version. + pub fn blocks_iter(&self) -> Result, Error> { let key = to_key(BLOCK_PREFIX, ""); - self.db.iter(&key) + let protocol_version = self.db.protocol_version(); + self.db.iter(&key, move |_, mut v| { + ser::deserialize(&mut v, protocol_version).map_err(From::from) + }) + } + + /// Iterator over raw data for full blocks in the db. + /// Used during block migration (we need flexibility around deserialization). + pub fn blocks_raw_iter(&self) -> Result, Vec)>, Error> { + let key = to_key(BLOCK_PREFIX, ""); + self.db.iter(&key, |k, v| Ok((k.to_vec(), v.to_vec()))) + } + + /// Protocol version of our underlying db. + pub fn protocol_version(&self) -> ProtocolVersion { + self.db.protocol_version() } } diff --git a/chain/src/txhashset/txhashset.rs b/chain/src/txhashset/txhashset.rs index 17bbe0558..bc8089dff 100644 --- a/chain/src/txhashset/txhashset.rs +++ b/chain/src/txhashset/txhashset.rs @@ -513,12 +513,13 @@ impl TxHashSet { // Iterate over the current output_pos index, removing any entries that // do not point to to the expected output. let mut removed_count = 0; - for (key, (pos, _)) in batch.output_pos_iter()? { - if let Some(out) = output_pmmr.get_data(pos) { + for (key, pos) in batch.output_pos_iter()? { + if let Some(out) = output_pmmr.get_data(pos.pos) { if let Ok(pos_via_mmr) = batch.get_output_pos(&out.commitment()) { // If the pos matches and the index key matches the commitment // then keep the entry, other we want to clean it up. - if pos == pos_via_mmr && batch.is_match_output_pos_key(&key, &out.commitment()) + if pos.pos == pos_via_mmr + && batch.is_match_output_pos_key(&key, &out.commitment()) { continue; } diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index fed122c9d..e4efa6cca 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -369,15 +369,16 @@ impl Peers { } } - /// All peer information we have in storage + /// Iterator over all peers we know about (stored in our db). + pub fn peers_iter(&self) -> Result, Error> { + self.store.peers_iter().map_err(From::from) + } + + /// Convenience for reading all peers. pub fn all_peers(&self) -> Vec { - match self.store.all_peers() { - Ok(peers) => peers, - Err(e) => { - error!("all_peers failed: {:?}", e); - vec![] - } - } + self.peers_iter() + .map(|peers| peers.collect()) + .unwrap_or(vec![]) } /// Find peers in store (not necessarily connected) and return their data diff --git a/p2p/src/store.rs b/p2p/src/store.rs index a8dae3017..686ceeff2 100644 --- a/p2p/src/store.rs +++ b/p2p/src/store.rs @@ -152,21 +152,27 @@ impl PeerStore { cap: Capabilities, count: usize, ) -> Result, Error> { - let key = to_key(PEER_PREFIX, ""); let peers = self - .db - .iter::(&key)? - .map(|(_, v)| v) + .peers_iter()? .filter(|p| p.flags == state && p.capabilities.contains(cap)) .choose_multiple(&mut thread_rng(), count); Ok(peers) } + /// Iterator over all known peers. + pub fn peers_iter(&self) -> Result, Error> { + let key = to_key(PEER_PREFIX, ""); + let protocol_version = self.db.protocol_version(); + self.db.iter(&key, move |_, mut v| { + ser::deserialize(&mut v, protocol_version).map_err(From::from) + }) + } + /// List all known peers /// Used for /v1/peers/all api endpoint pub fn all_peers(&self) -> Result, Error> { - let key = to_key(PEER_PREFIX, ""); - Ok(self.db.iter::(&key)?.map(|(_, v)| v).collect()) + let peers: Vec = self.peers_iter()?.collect(); + Ok(peers) } /// Convenience method to load a peer data, update its status and save it @@ -194,7 +200,7 @@ impl PeerStore { { let mut to_remove = vec![]; - for x in self.all_peers()? { + for x in self.peers_iter()? { if predicate(&x) { to_remove.push(x) } diff --git a/servers/src/grin/seed.rs b/servers/src/grin/seed.rs index 8327df507..45632ec77 100644 --- a/servers/src/grin/seed.rs +++ b/servers/src/grin/seed.rs @@ -19,8 +19,7 @@ use chrono::prelude::{DateTime, Utc}; use chrono::{Duration, MIN_DATE}; -use rand::seq::SliceRandom; -use rand::thread_rng; +use rand::prelude::*; use std::collections::HashMap; use std::net::ToSocketAddrs; use std::sync::{mpsc, Arc}; @@ -145,14 +144,14 @@ fn monitor_peers( tx: mpsc::Sender, preferred_peers: &[PeerAddr], ) { - // regularly check if we need to acquire more peers and if so, gets + // regularly check if we need to acquire more peers and if so, gets // them from db - let total_count = peers.all_peers().len(); + let mut total_count = 0; let mut healthy_count = 0; let mut banned_count = 0; let mut defuncts = vec![]; - for x in peers.all_peers() { + for x in peers.all_peers().into_iter() { match x.flags { p2p::State::Banned => { let interval = Utc::now().timestamp() - x.last_banned; @@ -172,6 +171,7 @@ fn monitor_peers( p2p::State::Healthy => healthy_count += 1, p2p::State::Defunct => defuncts.push(x), } + total_count += 1; } debug!( @@ -223,11 +223,10 @@ fn monitor_peers( } } - // take a random defunct peer and mark it healthy: over a long period any + // take a random defunct peer and mark it healthy: over a long enough period any // peer will see another as defunct eventually, gives us a chance to retry - if !defuncts.is_empty() { - defuncts.shuffle(&mut thread_rng()); - let _ = peers.update_state(defuncts[0].addr, p2p::State::Healthy); + if let Some(peer) = defuncts.into_iter().choose(&mut thread_rng()) { + let _ = peers.update_state(peer.addr, p2p::State::Healthy); } // find some peers from our db diff --git a/store/src/lmdb.rs b/store/src/lmdb.rs index 7e548a7d3..cd865cb00 100644 --- a/store/src/lmdb.rs +++ b/store/src/lmdb.rs @@ -15,7 +15,6 @@ //! Storage of core types using LMDB. use std::fs; -use std::marker; use std::sync::Arc; use lmdb_zero as lmdb; @@ -45,13 +44,13 @@ pub enum Error { #[fail(display = "LMDB error: {} ", _0)] LmdbErr(lmdb::error::Error), /// Wraps a serialization error for Writeable or Readable - #[fail(display = "Serialization Error")] - SerErr(String), + #[fail(display = "Serialization Error: {}", _0)] + SerErr(ser::Error), /// File handling error - #[fail(display = "File handling Error")] + #[fail(display = "File handling Error: {}", _0)] FileErr(String), /// Other error - #[fail(display = "Other Error")] + #[fail(display = "Other Error: {}", _0)] OtherErr(String), } @@ -61,6 +60,12 @@ impl From for Error { } } +impl From for Error { + fn from(e: ser::Error) -> Error { + Error::SerErr(e) + } +} + /// unwraps the inner option by converting the none case to a not found error pub fn option_to_not_found(res: Result, Error>, field_name: F) -> Result where @@ -247,7 +252,7 @@ impl Store { /// Gets a value from the db, provided its key. /// Deserializes the retrieved data using the provided function. - pub fn get_with( + pub fn get_with( &self, key: &[u8], access: &lmdb::ConstAccessor<'_>, @@ -255,12 +260,12 @@ impl Store { deserialize: F, ) -> Result, Error> where - F: Fn(&[u8]) -> Result, + F: Fn(&[u8], &[u8]) -> Result, { let res: Option<&[u8]> = access.get(db, key).to_opt()?; match res { None => Ok(None), - Some(res) => deserialize(res).map(|x| Some(x)), + Some(res) => deserialize(key, res).map(Some), } } @@ -274,9 +279,8 @@ impl Store { let txn = lmdb::ReadTransaction::new(self.env.clone())?; let access = txn.access(); - self.get_with(key, &access, &db, |mut data| { - ser::deserialize(&mut data, self.protocol_version()) - .map_err(|e| Error::SerErr(format!("{}", e))) + self.get_with(key, &access, &db, |_, mut data| { + ser::deserialize(&mut data, self.protocol_version()).map_err(From::from) }) } @@ -293,23 +297,18 @@ impl Store { Ok(res.is_some()) } - /// Produces an iterator of (key, value) pairs, where values are `Readable` types - /// moving forward from the provided key. - pub fn iter(&self, from: &[u8]) -> Result, Error> { + /// Produces an iterator from the provided key prefix. + pub fn iter(&self, prefix: &[u8], deserialize: F) -> Result, Error> + where + F: Fn(&[u8], &[u8]) -> Result, + { let lock = self.db.read(); let db = lock .as_ref() .ok_or_else(|| Error::NotFoundErr("chain db is None".to_string()))?; let tx = Arc::new(lmdb::ReadTransaction::new(self.env.clone())?); let cursor = Arc::new(tx.cursor(db.clone())?); - Ok(SerIterator { - tx, - cursor, - seek: false, - prefix: from.to_vec(), - version: self.protocol_version(), - _marker: marker::PhantomData, - }) + Ok(PrefixIterator::new(tx, cursor, prefix, deserialize)) } /// Builds a new batch to be used with this store. @@ -364,15 +363,15 @@ impl<'a> Batch<'a> { let ser_value = ser::ser_vec(value, version); match ser_value { Ok(data) => self.put(key, &data), - Err(err) => Err(Error::SerErr(format!("{}", err))), + Err(err) => Err(err.into()), } } /// Low-level access for retrieving data by key. /// Takes a function for flexible deserialization. - pub fn get_with(&self, key: &[u8], deserialize: F) -> Result, Error> + pub fn get_with(&self, key: &[u8], deserialize: F) -> Result, Error> where - F: Fn(&[u8]) -> Result, + F: Fn(&[u8], &[u8]) -> Result, { let access = self.tx.access(); let lock = self.store.db.read(); @@ -395,18 +394,20 @@ impl<'a> Batch<'a> { Ok(res.is_some()) } - /// Produces an iterator of `Readable` types moving forward from the - /// provided key. - pub fn iter(&self, from: &[u8]) -> Result, Error> { - self.store.iter(from) + /// Produces an iterator from the provided key prefix. + pub fn iter(&self, prefix: &[u8], deserialize: F) -> Result, Error> + where + F: Fn(&[u8], &[u8]) -> Result, + { + self.store.iter(prefix, deserialize) } /// Gets a `Readable` value from the db by provided key and default deserialization strategy. pub fn get_ser(&self, key: &[u8]) -> Result, Error> { - self.get_with(key, |mut data| { + self.get_with(key, |_, mut data| { match ser::deserialize(&mut data, self.protocol_version()) { Ok(res) => Ok(res), - Err(e) => Err(Error::SerErr(format!("{}", e))), + Err(e) => Err(From::from(e)), } }) } @@ -437,57 +438,61 @@ impl<'a> Batch<'a> { } } -/// An iterator that produces Readable instances back. Wraps the lower level -/// DBIterator and deserializes the returned values. -pub struct SerIterator +/// An iterator based on key prefix. +/// Caller is responsible for deserialization of the data. +pub struct PrefixIterator where - T: ser::Readable, + F: Fn(&[u8], &[u8]) -> Result, { tx: Arc>, cursor: Arc>, seek: bool, prefix: Vec, - version: ProtocolVersion, - _marker: marker::PhantomData, + deserialize: F, } -impl Iterator for SerIterator +impl Iterator for PrefixIterator where - T: ser::Readable, + F: Fn(&[u8], &[u8]) -> Result, { - type Item = (Vec, T); + type Item = T; - fn next(&mut self) -> Option<(Vec, T)> { + fn next(&mut self) -> Option { let access = self.tx.access(); - let kv = if self.seek { - Arc::get_mut(&mut self.cursor).unwrap().next(&access) + let cursor = Arc::get_mut(&mut self.cursor).expect("failed to get cursor"); + let kv: Result<(&[u8], &[u8]), _> = if self.seek { + cursor.next(&access) } else { self.seek = true; - Arc::get_mut(&mut self.cursor) - .unwrap() - .seek_range_k(&access, &self.prefix[..]) + cursor.seek_range_k(&access, &self.prefix[..]) }; - match kv { - Ok((k, v)) => self.deser_if_prefix_match(k, v), - Err(_) => None, - } + kv.ok() + .filter(|(k, _)| k.starts_with(self.prefix.as_slice())) + .map(|(k, v)| match (self.deserialize)(k, v) { + Ok(v) => Some(v), + Err(_) => None, + }) + .flatten() } } -impl SerIterator +impl PrefixIterator where - T: ser::Readable, + F: Fn(&[u8], &[u8]) -> Result, { - fn deser_if_prefix_match(&self, key: &[u8], value: &[u8]) -> Option<(Vec, T)> { - let plen = self.prefix.len(); - if plen == 0 || (key.len() >= plen && key[0..plen] == self.prefix[..]) { - if let Ok(value) = ser::deserialize(&mut &value[..], self.version) { - Some((key.to_vec(), value)) - } else { - None - } - } else { - None + /// Initialize a new prefix iterator. + pub fn new( + tx: Arc>, + cursor: Arc>, + prefix: &[u8], + deserialize: F, + ) -> PrefixIterator { + PrefixIterator { + tx, + cursor, + seek: false, + prefix: prefix.to_vec(), + deserialize, } } } diff --git a/store/tests/lmdb.rs b/store/tests/lmdb.rs index 54c08887f..6dfebc16b 100644 --- a/store/tests/lmdb.rs +++ b/store/tests/lmdb.rs @@ -15,7 +15,7 @@ use grin_core as core; use grin_store as store; use grin_util as util; -use store::SerIterator; +use store::PrefixIterator; use crate::core::global; use crate::core::ser::{self, Readable, Reader, Writeable, Writer}; @@ -118,14 +118,14 @@ fn test_iter() -> Result<(), store::Error> { // assert_eq!(iter.next(), None); // Check we can not yet see the new entry via an iterator outside the uncommitted batch. - let mut iter: SerIterator> = store.iter(&[0])?; + let mut iter = store.iter(&[0], |_, v| Ok(v.to_vec()))?; assert_eq!(iter.next(), None); batch.commit()?; // Check we can see the new entry via an iterator after committing the batch. - let mut iter: SerIterator> = store.iter(&[0])?; - assert_eq!(iter.next(), Some((key.to_vec(), value.to_vec()))); + let mut iter = store.iter(&[0], |_, v| Ok(v.to_vec()))?; + assert_eq!(iter.next(), Some(value.to_vec())); assert_eq!(iter.next(), None); clean_output_dir(test_dir);