DB v1->v2 migration (#3044)

* wip

* exhaustive match

* wip

* cleanup docs and move migration fns to chain itself
This commit is contained in:
Antioch Peverell 2019-09-19 21:00:14 +01:00 committed by GitHub
parent 02cee80229
commit 1c072f535c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 83 additions and 18 deletions

View file

@ -189,7 +189,7 @@ impl Chain {
)?; )?;
Chain::log_heads(&store)?; Chain::log_heads(&store)?;
Ok(Chain { let chain = Chain {
db_root, db_root,
store, store,
adapter, adapter,
@ -201,7 +201,18 @@ impl Chain {
verifier_cache, verifier_cache,
archive_mode, archive_mode,
genesis: genesis.header.clone(), genesis: genesis.header.clone(),
}) };
// DB migrations to be run prior to the chain being used.
{
// Migrate full blocks to protocol version v2.
chain.migrate_db_v1_v2()?;
// Rebuild height_for_pos index.
chain.rebuild_height_for_pos()?;
}
Ok(chain)
} }
/// Return our shared header MMR handle. /// Return our shared header MMR handle.
@ -1245,9 +1256,22 @@ impl Chain {
self.header_pmmr.read().get_header_hash_by_height(height) self.header_pmmr.read().get_header_hash_by_height(height)
} }
/// Migrate our local db from v1 to v2.
/// This covers blocks which themselves contain transactions.
/// Transaction kernels changed in v2 due to "variable size kernels".
fn migrate_db_v1_v2(&self) -> Result<(), Error> {
let store_v1 = self.store.with_version(ProtocolVersion(1));
let batch = store_v1.batch()?;
for (_, block) in batch.blocks_iter()? {
batch.migrate_block(&block, ProtocolVersion(2))?;
}
batch.commit()?;
Ok(())
}
/// Migrate the index 'commitment -> output_pos' to index 'commitment -> (output_pos, block_height)' /// Migrate the index 'commitment -> output_pos' to index 'commitment -> (output_pos, block_height)'
/// Note: should only be called when Node start-up, for database migration from the old version. /// Note: should only be called when Node start-up, for database migration from the old version.
pub fn rebuild_height_for_pos(&self) -> Result<(), Error> { fn rebuild_height_for_pos(&self) -> Result<(), Error> {
let header_pmmr = self.header_pmmr.read(); let header_pmmr = self.header_pmmr.read();
let txhashset = self.txhashset.read(); let txhashset = self.txhashset.read();
let mut outputs_pos = txhashset.get_all_output_pos()?; let mut outputs_pos = txhashset.get_all_output_pos()?;

View file

@ -18,6 +18,7 @@ use crate::core::consensus::HeaderInfo;
use crate::core::core::hash::{Hash, Hashed}; use crate::core::core::hash::{Hash, Hashed};
use crate::core::core::{Block, BlockHeader, BlockSums}; use crate::core::core::{Block, BlockHeader, BlockSums};
use crate::core::pow::Difficulty; use crate::core::pow::Difficulty;
use crate::core::ser::ProtocolVersion;
use crate::types::Tip; use crate::types::Tip;
use crate::util::secp::pedersen::Commitment; use crate::util::secp::pedersen::Commitment;
use croaring::Bitmap; use croaring::Bitmap;
@ -49,6 +50,17 @@ impl ChainStore {
let db = store::Store::new(db_root, None, Some(STORE_SUBPATH.clone()), None)?; let db = store::Store::new(db_root, None, Some(STORE_SUBPATH.clone()), None)?;
Ok(ChainStore { db }) Ok(ChainStore { db })
} }
/// Create a new instance of the chain store based on this instance
/// but with the provided protocol version. This is used when migrating
/// data in the db to a different protocol version, reading using one version and
/// writing back to the db with a different version.
pub fn with_version(&self, version: ProtocolVersion) -> ChainStore {
let db_with_version = self.db.with_version(version);
ChainStore {
db: db_with_version,
}
}
} }
impl ChainStore { impl ChainStore {
@ -258,6 +270,17 @@ impl<'a> Batch<'a> {
Ok(()) Ok(())
} }
/// Migrate a block stored in the db by serializing it using the provided protocol version.
/// Block may have been read using a previous protocol version but we do not actually care.
pub fn migrate_block(&self, b: &Block, version: ProtocolVersion) -> Result<(), Error> {
self.db.put_ser_with_version(
&to_key(BLOCK_PREFIX, &mut b.hash().to_vec())[..],
b,
version,
)?;
Ok(())
}
/// Delete a full block. Does not delete any record associated with a block /// Delete a full block. Does not delete any record associated with a block
/// header. /// header.
pub fn delete_block(&self, bh: &Hash) -> Result<(), Error> { pub fn delete_block(&self, bh: &Hash) -> Result<(), Error> {

View file

@ -192,9 +192,6 @@ impl Server {
archive_mode, archive_mode,
)?); )?);
// launching the database migration if needed
shared_chain.rebuild_height_for_pos()?;
pool_adapter.set_chain(shared_chain.clone()); pool_adapter.set_chain(shared_chain.clone());
let net_adapter = Arc::new(NetToChainAdapter::new( let net_adapter = Arc::new(NetToChainAdapter::new(

View file

@ -65,11 +65,13 @@ where
} }
} }
const DEFAULT_DB_VERSION: ProtocolVersion = ProtocolVersion(2);
/// LMDB-backed store facilitating data access and serialization. All writes /// LMDB-backed store facilitating data access and serialization. All writes
/// are done through a Batch abstraction providing atomicity. /// are done through a Batch abstraction providing atomicity.
pub struct Store { pub struct Store {
env: Arc<lmdb::Environment>, env: Arc<lmdb::Environment>,
db: RwLock<Option<Arc<lmdb::Database<'static>>>>, db: Arc<RwLock<Option<Arc<lmdb::Database<'static>>>>>,
name: String, name: String,
version: ProtocolVersion, version: ProtocolVersion,
} }
@ -113,9 +115,9 @@ impl Store {
); );
let res = Store { let res = Store {
env: Arc::new(env), env: Arc::new(env),
db: RwLock::new(None), db: Arc::new(RwLock::new(None)),
name: db_name, name: db_name,
version: ProtocolVersion(1), version: DEFAULT_DB_VERSION,
}; };
{ {
@ -129,6 +131,17 @@ impl Store {
Ok(res) Ok(res)
} }
/// Construct a new store using a specific protocol version.
/// Permits access to the db with legacy protocol versions for db migrations.
pub fn with_version(&self, version: ProtocolVersion) -> Store {
Store {
env: self.env.clone(),
db: self.db.clone(),
name: self.name.clone(),
version: version,
}
}
/// Opens the database environment /// Opens the database environment
pub fn open(&self) -> Result<(), Error> { pub fn open(&self) -> Result<(), Error> {
let mut w = self.db.write(); let mut w = self.db.write();
@ -275,11 +288,8 @@ impl Store {
if self.needs_resize()? { if self.needs_resize()? {
self.do_resize()?; self.do_resize()?;
} }
let txn = lmdb::WriteTransaction::new(self.env.clone())?; let tx = lmdb::WriteTransaction::new(self.env.clone())?;
Ok(Batch { Ok(Batch { store: self, tx })
store: self,
tx: txn,
})
} }
} }
@ -299,10 +309,21 @@ impl<'a> Batch<'a> {
Ok(()) Ok(())
} }
/// Writes a single key and its `Writeable` value to the db. Encapsulates /// Writes a single key and its `Writeable` value to the db.
/// serialization. /// Encapsulates serialization using the (default) version configured on the store instance.
pub fn put_ser<W: ser::Writeable>(&self, key: &[u8], value: &W) -> Result<(), Error> { pub fn put_ser<W: ser::Writeable>(&self, key: &[u8], value: &W) -> Result<(), Error> {
let ser_value = ser::ser_vec(value, self.store.version); self.put_ser_with_version(key, value, self.store.version)
}
/// Writes a single key and its `Writeable` value to the db.
/// Encapsulates serialization using the specified protocol version.
pub fn put_ser_with_version<W: ser::Writeable>(
&self,
key: &[u8],
value: &W,
version: ProtocolVersion,
) -> Result<(), Error> {
let ser_value = ser::ser_vec(value, version);
match ser_value { match ser_value {
Ok(data) => self.put(key, &data), Ok(data) => self.put(key, &data),
Err(err) => Err(Error::SerErr(format!("{}", err))), Err(err) => Err(Error::SerErr(format!("{}", err))),
@ -356,7 +377,7 @@ impl<'a> Batch<'a> {
} }
} }
/// An iterator thad produces Readable instances back. Wraps the lower level /// An iterator that produces Readable instances back. Wraps the lower level
/// DBIterator and deserializes the returned values. /// DBIterator and deserializes the returned values.
pub struct SerIterator<T> pub struct SerIterator<T>
where where