migrate blocks in batches (rework db iterator impl) (#3450)

* rework db iterator
more robust block migration based on low level iterator

* cleanup

* cleanup

* fixup
This commit is contained in:
Antioch Peverell 2020-10-07 11:47:25 +01:00 committed by GitHub
parent eab26b3ae4
commit e7bbda81a0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 187 additions and 128 deletions

View file

@ -35,13 +35,14 @@ use crate::types::{
};
use crate::util::secp::pedersen::{Commitment, RangeProof};
use crate::{util::RwLock, ChainStore};
use grin_core::ser;
use grin_store::Error::NotFoundErr;
use std::collections::HashMap;
use std::fs::{self, File};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::{collections::HashMap, io::Cursor};
/// Orphan pool size is limited by MAX_ORPHAN_SIZE
pub const MAX_ORPHAN_SIZE: usize = 200;
@ -1187,10 +1188,9 @@ impl Chain {
let tail = batch.get_block_header(&tail_hash)?;
// Remove old blocks (including short lived fork blocks) which height < tail.height
// here b is a block
for (_, b) in batch.blocks_iter()? {
if b.header.height < tail.height {
let _ = batch.delete_block(&b.hash());
for block in batch.blocks_iter()? {
if block.header.height < tail.height {
let _ = batch.delete_block(&block.hash());
count += 1;
}
}
@ -1407,13 +1407,32 @@ impl Chain {
/// Migrate our local db from v2 to v3.
/// "commit only" inputs.
fn migrate_db_v2_v3(store: &ChainStore) -> Result<(), Error> {
let store_v2 = store.with_version(ProtocolVersion(2));
let batch = store_v2.batch()?;
for (_, block) in batch.blocks_iter()? {
batch.migrate_block(&block, ProtocolVersion(3))?;
let mut keys_to_migrate = vec![];
for (k, v) in store.batch()?.blocks_raw_iter()? {
// We want to migrate all blocks that cannot be read via v3 protocol version.
let block_v2: Result<Block, _> =
ser::deserialize(&mut Cursor::new(&v), ProtocolVersion(2));
let block_v3: Result<Block, _> =
ser::deserialize(&mut Cursor::new(&v), ProtocolVersion(3));
if let (Ok(_), Err(_)) = (block_v2, block_v3) {
keys_to_migrate.push(k);
}
}
debug!(
"migrate_db_v2_v3: {} blocks to migrate",
keys_to_migrate.len()
);
let mut count = 0;
keys_to_migrate.chunks(100).try_for_each(|keys| {
let batch = store.batch()?;
for key in keys {
batch.migrate_block(&key, ProtocolVersion(2), ProtocolVersion(3))?;
count += 1;
}
batch.commit()?;
debug!("migrate_db_v2_v3: successfully migrated {} blocks", count);
Ok(())
})
}
/// Gets the block header in which a given output appears in the txhashset.

View file

@ -265,6 +265,14 @@ impl From<io::Error> for Error {
}
}
impl From<ser::Error> for Error {
fn from(error: ser::Error) -> Error {
Error {
inner: Context::new(ErrorKind::SerErr(error)),
}
}
}
impl From<secp::Error> for Error {
fn from(e: secp::Error) -> Error {
Error {

View file

@ -390,12 +390,12 @@ impl<T: PosEntry> PruneableListIndex for MultiIndex<T> {
let mut list_count = 0;
let mut entry_count = 0;
let prefix = to_key(self.list_prefix, "");
for (key, _) in batch.db.iter::<ListWrapper<T>>(&prefix)? {
for key in batch.db.iter(&prefix, |k, _| Ok(k.to_vec()))? {
let _ = batch.delete(&key);
list_count += 1;
}
let prefix = to_key(self.entry_prefix, "");
for (key, _) in batch.db.iter::<ListEntry<T>>(&prefix)? {
for key in batch.db.iter(&prefix, |k, _| Ok(k.to_vec()))? {
let _ = batch.delete(&key);
entry_count += 1;
}

View file

@ -23,8 +23,9 @@ use crate::linked_list::MultiIndex;
use crate::types::{CommitPos, Tip};
use crate::util::secp::pedersen::Commitment;
use croaring::Bitmap;
use grin_core::ser;
use grin_store as store;
use grin_store::{option_to_not_found, to_key, Error, SerIterator};
use grin_store::{option_to_not_found, to_key, Error};
use std::convert::TryInto;
use std::sync::Arc;
@ -57,17 +58,6 @@ impl ChainStore {
Ok(ChainStore { db })
}
/// Create a new instance of the chain store based on this instance
/// but with the provided protocol version. This is used when migrating
/// data in the db to a different protocol version, reading using one version and
/// writing back to the db with a different version.
pub fn with_version(&self, version: ProtocolVersion) -> ChainStore {
let db_with_version = self.db.with_version(version);
ChainStore {
db: db_with_version,
}
}
/// The current chain head.
pub fn head(&self) -> Result<Tip, Error> {
option_to_not_found(self.db.get_ser(&[HEAD_PREFIX]), || "HEAD".to_owned())
@ -224,11 +214,20 @@ impl<'a> Batch<'a> {
Ok(())
}
/// Migrate a block stored in the db by serializing it using the provided protocol version.
/// Block may have been read using a previous protocol version but we do not actually care.
pub fn migrate_block(&self, b: &Block, version: ProtocolVersion) -> Result<(), Error> {
self.db
.put_ser_with_version(&to_key(BLOCK_PREFIX, b.hash())[..], b, version)?;
/// Migrate a block stored in the db reading from one protocol version and writing
/// with new protocol version.
pub fn migrate_block(
&self,
key: &[u8],
from_version: ProtocolVersion,
to_version: ProtocolVersion,
) -> Result<(), Error> {
let block: Option<Block> = self.db.get_with(key, move |_, mut v| {
ser::deserialize(&mut v, from_version).map_err(From::from)
})?;
if let Some(block) = block {
self.db.put_ser_with_version(key, &block, to_version)?;
}
Ok(())
}
@ -283,9 +282,14 @@ impl<'a> Batch<'a> {
}
/// Iterator over the output_pos index.
pub fn output_pos_iter(&self) -> Result<SerIterator<(u64, u64)>, Error> {
pub fn output_pos_iter(&self) -> Result<impl Iterator<Item = (Vec<u8>, CommitPos)>, Error> {
let key = to_key(OUTPUT_POS_PREFIX, "");
self.db.iter(&key)
let protocol_version = self.db.protocol_version();
self.db.iter(&key, move |k, mut v| {
ser::deserialize(&mut v, protocol_version)
.map(|pos| (k.to_vec(), pos))
.map_err(From::from)
})
}
/// Get output_pos from index.
@ -371,10 +375,26 @@ impl<'a> Batch<'a> {
})
}
/// An iterator to all block in db
pub fn blocks_iter(&self) -> Result<SerIterator<Block>, Error> {
/// Iterator over all full blocks in the db.
/// Uses default db serialization strategy via db protocol version.
pub fn blocks_iter(&self) -> Result<impl Iterator<Item = Block>, Error> {
let key = to_key(BLOCK_PREFIX, "");
self.db.iter(&key)
let protocol_version = self.db.protocol_version();
self.db.iter(&key, move |_, mut v| {
ser::deserialize(&mut v, protocol_version).map_err(From::from)
})
}
/// Iterator over raw data for full blocks in the db.
/// Used during block migration (we need flexibility around deserialization).
pub fn blocks_raw_iter(&self) -> Result<impl Iterator<Item = (Vec<u8>, Vec<u8>)>, Error> {
let key = to_key(BLOCK_PREFIX, "");
self.db.iter(&key, |k, v| Ok((k.to_vec(), v.to_vec())))
}
/// Protocol version of our underlying db.
pub fn protocol_version(&self) -> ProtocolVersion {
self.db.protocol_version()
}
}

View file

@ -513,12 +513,13 @@ impl TxHashSet {
// Iterate over the current output_pos index, removing any entries that
// do not point to to the expected output.
let mut removed_count = 0;
for (key, (pos, _)) in batch.output_pos_iter()? {
if let Some(out) = output_pmmr.get_data(pos) {
for (key, pos) in batch.output_pos_iter()? {
if let Some(out) = output_pmmr.get_data(pos.pos) {
if let Ok(pos_via_mmr) = batch.get_output_pos(&out.commitment()) {
// If the pos matches and the index key matches the commitment
// then keep the entry, other we want to clean it up.
if pos == pos_via_mmr && batch.is_match_output_pos_key(&key, &out.commitment())
if pos.pos == pos_via_mmr
&& batch.is_match_output_pos_key(&key, &out.commitment())
{
continue;
}

View file

@ -369,15 +369,16 @@ impl Peers {
}
}
/// All peer information we have in storage
/// Iterator over all peers we know about (stored in our db).
pub fn peers_iter(&self) -> Result<impl Iterator<Item = PeerData>, Error> {
self.store.peers_iter().map_err(From::from)
}
/// Convenience for reading all peers.
pub fn all_peers(&self) -> Vec<PeerData> {
match self.store.all_peers() {
Ok(peers) => peers,
Err(e) => {
error!("all_peers failed: {:?}", e);
vec![]
}
}
self.peers_iter()
.map(|peers| peers.collect())
.unwrap_or(vec![])
}
/// Find peers in store (not necessarily connected) and return their data

View file

@ -152,21 +152,27 @@ impl PeerStore {
cap: Capabilities,
count: usize,
) -> Result<Vec<PeerData>, Error> {
let key = to_key(PEER_PREFIX, "");
let peers = self
.db
.iter::<PeerData>(&key)?
.map(|(_, v)| v)
.peers_iter()?
.filter(|p| p.flags == state && p.capabilities.contains(cap))
.choose_multiple(&mut thread_rng(), count);
Ok(peers)
}
/// Iterator over all known peers.
pub fn peers_iter(&self) -> Result<impl Iterator<Item = PeerData>, Error> {
let key = to_key(PEER_PREFIX, "");
let protocol_version = self.db.protocol_version();
self.db.iter(&key, move |_, mut v| {
ser::deserialize(&mut v, protocol_version).map_err(From::from)
})
}
/// List all known peers
/// Used for /v1/peers/all api endpoint
pub fn all_peers(&self) -> Result<Vec<PeerData>, Error> {
let key = to_key(PEER_PREFIX, "");
Ok(self.db.iter::<PeerData>(&key)?.map(|(_, v)| v).collect())
let peers: Vec<PeerData> = self.peers_iter()?.collect();
Ok(peers)
}
/// Convenience method to load a peer data, update its status and save it
@ -194,7 +200,7 @@ impl PeerStore {
{
let mut to_remove = vec![];
for x in self.all_peers()? {
for x in self.peers_iter()? {
if predicate(&x) {
to_remove.push(x)
}

View file

@ -19,8 +19,7 @@
use chrono::prelude::{DateTime, Utc};
use chrono::{Duration, MIN_DATE};
use rand::seq::SliceRandom;
use rand::thread_rng;
use rand::prelude::*;
use std::collections::HashMap;
use std::net::ToSocketAddrs;
use std::sync::{mpsc, Arc};
@ -147,12 +146,12 @@ fn monitor_peers(
) {
// regularly check if we need to acquire more peers and if so, gets
// them from db
let total_count = peers.all_peers().len();
let mut total_count = 0;
let mut healthy_count = 0;
let mut banned_count = 0;
let mut defuncts = vec![];
for x in peers.all_peers() {
for x in peers.all_peers().into_iter() {
match x.flags {
p2p::State::Banned => {
let interval = Utc::now().timestamp() - x.last_banned;
@ -172,6 +171,7 @@ fn monitor_peers(
p2p::State::Healthy => healthy_count += 1,
p2p::State::Defunct => defuncts.push(x),
}
total_count += 1;
}
debug!(
@ -223,11 +223,10 @@ fn monitor_peers(
}
}
// take a random defunct peer and mark it healthy: over a long period any
// take a random defunct peer and mark it healthy: over a long enough period any
// peer will see another as defunct eventually, gives us a chance to retry
if !defuncts.is_empty() {
defuncts.shuffle(&mut thread_rng());
let _ = peers.update_state(defuncts[0].addr, p2p::State::Healthy);
if let Some(peer) = defuncts.into_iter().choose(&mut thread_rng()) {
let _ = peers.update_state(peer.addr, p2p::State::Healthy);
}
// find some peers from our db

View file

@ -15,7 +15,6 @@
//! Storage of core types using LMDB.
use std::fs;
use std::marker;
use std::sync::Arc;
use lmdb_zero as lmdb;
@ -45,13 +44,13 @@ pub enum Error {
#[fail(display = "LMDB error: {} ", _0)]
LmdbErr(lmdb::error::Error),
/// Wraps a serialization error for Writeable or Readable
#[fail(display = "Serialization Error")]
SerErr(String),
#[fail(display = "Serialization Error: {}", _0)]
SerErr(ser::Error),
/// File handling error
#[fail(display = "File handling Error")]
#[fail(display = "File handling Error: {}", _0)]
FileErr(String),
/// Other error
#[fail(display = "Other Error")]
#[fail(display = "Other Error: {}", _0)]
OtherErr(String),
}
@ -61,6 +60,12 @@ impl From<lmdb::error::Error> for Error {
}
}
impl From<ser::Error> for Error {
fn from(e: ser::Error) -> Error {
Error::SerErr(e)
}
}
/// unwraps the inner option by converting the none case to a not found error
pub fn option_to_not_found<T, F>(res: Result<Option<T>, Error>, field_name: F) -> Result<T, Error>
where
@ -247,7 +252,7 @@ impl Store {
/// Gets a value from the db, provided its key.
/// Deserializes the retrieved data using the provided function.
pub fn get_with<T, F>(
pub fn get_with<F, T>(
&self,
key: &[u8],
access: &lmdb::ConstAccessor<'_>,
@ -255,12 +260,12 @@ impl Store {
deserialize: F,
) -> Result<Option<T>, Error>
where
F: Fn(&[u8]) -> Result<T, Error>,
F: Fn(&[u8], &[u8]) -> Result<T, Error>,
{
let res: Option<&[u8]> = access.get(db, key).to_opt()?;
match res {
None => Ok(None),
Some(res) => deserialize(res).map(|x| Some(x)),
Some(res) => deserialize(key, res).map(Some),
}
}
@ -274,9 +279,8 @@ impl Store {
let txn = lmdb::ReadTransaction::new(self.env.clone())?;
let access = txn.access();
self.get_with(key, &access, &db, |mut data| {
ser::deserialize(&mut data, self.protocol_version())
.map_err(|e| Error::SerErr(format!("{}", e)))
self.get_with(key, &access, &db, |_, mut data| {
ser::deserialize(&mut data, self.protocol_version()).map_err(From::from)
})
}
@ -293,23 +297,18 @@ impl Store {
Ok(res.is_some())
}
/// Produces an iterator of (key, value) pairs, where values are `Readable` types
/// moving forward from the provided key.
pub fn iter<T: ser::Readable>(&self, from: &[u8]) -> Result<SerIterator<T>, Error> {
/// Produces an iterator from the provided key prefix.
pub fn iter<F, T>(&self, prefix: &[u8], deserialize: F) -> Result<PrefixIterator<F, T>, Error>
where
F: Fn(&[u8], &[u8]) -> Result<T, Error>,
{
let lock = self.db.read();
let db = lock
.as_ref()
.ok_or_else(|| Error::NotFoundErr("chain db is None".to_string()))?;
let tx = Arc::new(lmdb::ReadTransaction::new(self.env.clone())?);
let cursor = Arc::new(tx.cursor(db.clone())?);
Ok(SerIterator {
tx,
cursor,
seek: false,
prefix: from.to_vec(),
version: self.protocol_version(),
_marker: marker::PhantomData,
})
Ok(PrefixIterator::new(tx, cursor, prefix, deserialize))
}
/// Builds a new batch to be used with this store.
@ -364,15 +363,15 @@ impl<'a> Batch<'a> {
let ser_value = ser::ser_vec(value, version);
match ser_value {
Ok(data) => self.put(key, &data),
Err(err) => Err(Error::SerErr(format!("{}", err))),
Err(err) => Err(err.into()),
}
}
/// Low-level access for retrieving data by key.
/// Takes a function for flexible deserialization.
pub fn get_with<T, F>(&self, key: &[u8], deserialize: F) -> Result<Option<T>, Error>
pub fn get_with<F, T>(&self, key: &[u8], deserialize: F) -> Result<Option<T>, Error>
where
F: Fn(&[u8]) -> Result<T, Error>,
F: Fn(&[u8], &[u8]) -> Result<T, Error>,
{
let access = self.tx.access();
let lock = self.store.db.read();
@ -395,18 +394,20 @@ impl<'a> Batch<'a> {
Ok(res.is_some())
}
/// Produces an iterator of `Readable` types moving forward from the
/// provided key.
pub fn iter<T: ser::Readable>(&self, from: &[u8]) -> Result<SerIterator<T>, Error> {
self.store.iter(from)
/// Produces an iterator from the provided key prefix.
pub fn iter<F, T>(&self, prefix: &[u8], deserialize: F) -> Result<PrefixIterator<F, T>, Error>
where
F: Fn(&[u8], &[u8]) -> Result<T, Error>,
{
self.store.iter(prefix, deserialize)
}
/// Gets a `Readable` value from the db by provided key and default deserialization strategy.
pub fn get_ser<T: ser::Readable>(&self, key: &[u8]) -> Result<Option<T>, Error> {
self.get_with(key, |mut data| {
self.get_with(key, |_, mut data| {
match ser::deserialize(&mut data, self.protocol_version()) {
Ok(res) => Ok(res),
Err(e) => Err(Error::SerErr(format!("{}", e))),
Err(e) => Err(From::from(e)),
}
})
}
@ -437,57 +438,61 @@ impl<'a> Batch<'a> {
}
}
/// An iterator that produces Readable instances back. Wraps the lower level
/// DBIterator and deserializes the returned values.
pub struct SerIterator<T>
/// An iterator based on key prefix.
/// Caller is responsible for deserialization of the data.
pub struct PrefixIterator<F, T>
where
T: ser::Readable,
F: Fn(&[u8], &[u8]) -> Result<T, Error>,
{
tx: Arc<lmdb::ReadTransaction<'static>>,
cursor: Arc<lmdb::Cursor<'static, 'static>>,
seek: bool,
prefix: Vec<u8>,
version: ProtocolVersion,
_marker: marker::PhantomData<T>,
deserialize: F,
}
impl<T> Iterator for SerIterator<T>
impl<F, T> Iterator for PrefixIterator<F, T>
where
T: ser::Readable,
F: Fn(&[u8], &[u8]) -> Result<T, Error>,
{
type Item = (Vec<u8>, T);
type Item = T;
fn next(&mut self) -> Option<(Vec<u8>, T)> {
fn next(&mut self) -> Option<Self::Item> {
let access = self.tx.access();
let kv = if self.seek {
Arc::get_mut(&mut self.cursor).unwrap().next(&access)
let cursor = Arc::get_mut(&mut self.cursor).expect("failed to get cursor");
let kv: Result<(&[u8], &[u8]), _> = if self.seek {
cursor.next(&access)
} else {
self.seek = true;
Arc::get_mut(&mut self.cursor)
.unwrap()
.seek_range_k(&access, &self.prefix[..])
cursor.seek_range_k(&access, &self.prefix[..])
};
match kv {
Ok((k, v)) => self.deser_if_prefix_match(k, v),
kv.ok()
.filter(|(k, _)| k.starts_with(self.prefix.as_slice()))
.map(|(k, v)| match (self.deserialize)(k, v) {
Ok(v) => Some(v),
Err(_) => None,
}
})
.flatten()
}
}
impl<T> SerIterator<T>
impl<F, T> PrefixIterator<F, T>
where
T: ser::Readable,
F: Fn(&[u8], &[u8]) -> Result<T, Error>,
{
fn deser_if_prefix_match(&self, key: &[u8], value: &[u8]) -> Option<(Vec<u8>, T)> {
let plen = self.prefix.len();
if plen == 0 || (key.len() >= plen && key[0..plen] == self.prefix[..]) {
if let Ok(value) = ser::deserialize(&mut &value[..], self.version) {
Some((key.to_vec(), value))
} else {
None
}
} else {
None
/// Initialize a new prefix iterator.
pub fn new(
tx: Arc<lmdb::ReadTransaction<'static>>,
cursor: Arc<lmdb::Cursor<'static, 'static>>,
prefix: &[u8],
deserialize: F,
) -> PrefixIterator<F, T> {
PrefixIterator {
tx,
cursor,
seek: false,
prefix: prefix.to_vec(),
deserialize,
}
}
}

View file

@ -15,7 +15,7 @@
use grin_core as core;
use grin_store as store;
use grin_util as util;
use store::SerIterator;
use store::PrefixIterator;
use crate::core::global;
use crate::core::ser::{self, Readable, Reader, Writeable, Writer};
@ -118,14 +118,14 @@ fn test_iter() -> Result<(), store::Error> {
// assert_eq!(iter.next(), None);
// Check we can not yet see the new entry via an iterator outside the uncommitted batch.
let mut iter: SerIterator<Vec<u8>> = store.iter(&[0])?;
let mut iter = store.iter(&[0], |_, v| Ok(v.to_vec()))?;
assert_eq!(iter.next(), None);
batch.commit()?;
// Check we can see the new entry via an iterator after committing the batch.
let mut iter: SerIterator<Vec<u8>> = store.iter(&[0])?;
assert_eq!(iter.next(), Some((key.to_vec(), value.to_vec())));
let mut iter = store.iter(&[0], |_, v| Ok(v.to_vec()))?;
assert_eq!(iter.next(), Some(value.to_vec()));
assert_eq!(iter.next(), None);
clean_output_dir(test_dir);