rework block input bitmap logic to avoid batch locks

This commit is contained in:
yeastplume 2018-06-22 11:44:50 +01:00
parent e7c380836b
commit 8dfc7f3b2f
4 changed files with 96 additions and 84 deletions

View file

@ -441,17 +441,19 @@ impl Chain {
let mut txhashset = self.txhashset.write().unwrap();
let store = self.store.clone();
let roots = txhashset::extending_readonly(&mut txhashset, |extension| {
let (roots, sizes) = txhashset::extending_readonly(&mut txhashset, |extension| {
if is_fork {
pipe::rewind_and_apply_fork(b, store, extension)?;
}
extension.apply_block(b)?;
Ok(extension.roots())
Ok((extension.roots(), extension.sizes()))
})?;
b.header.output_root = roots.output_root;
b.header.range_proof_root = roots.rproof_root;
b.header.kernel_root = roots.kernel_root;
b.header.output_mmr_size = sizes.0;
b.header.kernel_mmr_size = sizes.2;
Ok(())
}

View file

@ -122,7 +122,7 @@ pub fn process_block(b: &Block, ctx: &mut BlockContext) -> Result<Option<Tip>, E
b.hash(),
b.header.height,
);
add_block(b, &mut batch)?;
add_block(b, ctx.store.clone(), &mut batch)?;
let res = update_head(b, &ctx, &mut batch);
if res.is_ok() {
batch.commit()?;
@ -367,13 +367,16 @@ fn validate_block_via_txhashset(b: &Block, ext: &mut txhashset::Extension) -> Re
}
/// Officially adds the block to our chain.
fn add_block(b: &Block, batch: &store::Batch) -> Result<(), Error> {
fn add_block(b: &Block, store: Arc<store::ChainStore>, batch: &mut store::Batch) -> Result<(), Error> {
batch
.save_block(b)
.map_err(|e| Error::StoreErr(e, "pipe save block".to_owned()))?;
let bitmap = store.build_and_cache_block_input_bitmap(&b)?;
batch
.save_block_input_bitmap(&b)
.map_err(|e| Error::StoreErr(e, "pipe save block input bitmap".to_owned()))?;
.save_block_input_bitmap(
&b.hash(),
&bitmap
)?;
Ok(())
}

View file

@ -47,6 +47,7 @@ const BLOCK_INPUT_BITMAP_PREFIX: u8 = 'B' as u8;
pub struct ChainStore {
db: store::Store,
header_cache: Arc<RwLock<LruCache<Hash, BlockHeader>>>,
block_input_bitmap_cache: Arc<RwLock<LruCache<Hash, Vec<u8>>>>,
}
impl ChainStore {
@ -56,6 +57,7 @@ impl ChainStore {
Ok(ChainStore {
db,
header_cache: Arc::new(RwLock::new(LruCache::new(1_000))),
block_input_bitmap_cache: Arc::new(RwLock::new(LruCache::new(1_000))),
})
}
}
@ -158,12 +160,69 @@ impl ChainStore {
)
}
pub fn build_block_input_bitmap(&self, block: &Block) -> Result<Bitmap, Error> {
let bitmap = block
.inputs
.iter()
.filter_map(|x| self.get_output_pos(&x.commitment()).ok())
.map(|x| x as u32)
.collect();
Ok(bitmap)
}
pub fn build_and_cache_block_input_bitmap(&self, block: &Block) -> Result<Bitmap, Error> {
let bitmap = self.build_block_input_bitmap(block)?;
let mut cache = self.block_input_bitmap_cache.write().unwrap();
cache.insert(block.hash(), bitmap.serialize());
Ok(bitmap)
}
pub fn get_block_input_bitmap(&self, bh: &Hash) -> Result<(bool, Bitmap), Error> {
{
let mut cache = self.block_input_bitmap_cache.write().unwrap();
// cache hit - return the value from the cache
if let Some(bytes) = cache.get_mut(bh) {
return Ok((true, Bitmap::deserialize(&bytes)));
}
}
// cache miss - get it from db and cache it for next time
// if we found one in db
let res = self.get_block_input_bitmap_db(bh);
if let Ok((found, bitmap)) = res {
let mut cache = self.block_input_bitmap_cache.write().unwrap();
cache.insert(*bh, bitmap.serialize());
return Ok((found, bitmap));
}
res
}
// Get the block input bitmap from the db or build the bitmap from
// the full block from the db (if the block is found).
// (bool, Bitmap) : (false if bitmap was built and not found in db)
fn get_block_input_bitmap_db(&self, bh: &Hash) -> Result<(bool, Bitmap), Error> {
if let Ok(Some(bytes)) = self.db
.get(&to_key(BLOCK_INPUT_BITMAP_PREFIX, &mut bh.to_vec()))
{
Ok((true, Bitmap::deserialize(&bytes)))
} else {
match self.get_block(bh) {
Ok(block) => {
let bitmap = self.build_and_cache_block_input_bitmap(&block)?;
Ok((false, bitmap))
}
Err(e) => Err(e),
}
}
}
/// Builds a new batch to be used with this store.
pub fn batch(&self) -> Result<Batch, Error> {
Ok(Batch {
store: self,
db: self.db.batch()?,
block_input_bitmap_cache: Arc::new(RwLock::new(LruCache::new(1_000))),
})
}
}
@ -173,7 +232,6 @@ impl ChainStore {
pub struct Batch<'a> {
store: &'a ChainStore,
db: store::Batch<'a>,
block_input_bitmap_cache: Arc<RwLock<LruCache<Hash, Vec<u8>>>>,
}
#[allow(missing_docs)]
@ -294,67 +352,9 @@ impl<'a> Batch<'a> {
)
}
fn build_block_input_bitmap(&self, block: &Block) -> Result<Bitmap, Error> {
let bitmap = block
.inputs
.iter()
.filter_map(|x| self.get_output_pos(&x.commitment()).ok())
.map(|x| x as u32)
.collect();
Ok(bitmap)
}
// Get the block input bitmap from the db or build the bitmap from
// the full block from the db (if the block is found).
fn get_block_input_bitmap_db(&self, bh: &Hash) -> Result<Bitmap, Error> {
if let Ok(Some(bytes)) = self.db
.get(&to_key(BLOCK_INPUT_BITMAP_PREFIX, &mut bh.to_vec()))
{
Ok(Bitmap::deserialize(&bytes))
} else {
match self.get_block(bh) {
Ok(block) => {
let bitmap = self.save_block_input_bitmap(&block)?;
Ok(bitmap)
}
Err(e) => Err(e),
}
}
}
pub fn get_block_input_bitmap(&self, bh: &Hash) -> Result<Bitmap, Error> {
{
let mut cache = self.block_input_bitmap_cache.write().unwrap();
// cache hit - return the value from the cache
if let Some(bytes) = cache.get_mut(bh) {
return Ok(Bitmap::deserialize(&bytes));
}
}
// cache miss - get it from db and cache it for next time
// if we found one in db
let res = self.get_block_input_bitmap_db(bh);
if let Ok(bitmap) = res {
let mut cache = self.block_input_bitmap_cache.write().unwrap();
cache.insert(*bh, bitmap.serialize());
return Ok(bitmap);
}
res
}
pub fn save_block_input_bitmap(&self, block: &Block) -> Result<Bitmap, Error> {
let hash = block.hash();
let bitmap = self.build_block_input_bitmap(block)?;
self.db.put(
&to_key(BLOCK_INPUT_BITMAP_PREFIX, &mut hash.to_vec())[..],
bitmap.serialize(),
)?;
{
let mut cache = self.block_input_bitmap_cache.write().unwrap();
cache.insert(hash, bitmap.serialize());
}
Ok(bitmap)
pub fn save_block_input_bitmap(&self, bh: &Hash, bm: &Bitmap) -> Result<(), Error> {
self.db
.put(&to_key(BLOCK_INPUT_BITMAP_PREFIX, &mut bh.to_vec())[..], bm.serialize())
}
pub fn delete_block_input_bitmap(&self, bh: &Hash) -> Result<(), Error> {
@ -408,7 +408,6 @@ impl<'a> Batch<'a> {
Ok(Batch {
store: self.store,
db: self.db.child()?,
block_input_bitmap_cache: Arc::new(RwLock::new(LruCache::new(1_000))),
})
}
}

View file

@ -243,6 +243,9 @@ impl TxHashSet {
input_pos_to_rewind(self.commit_index.clone(), &horizon_header, &head_header)?;
let batch = self.commit_index.batch()?;
if !rewind_rm_pos.0 {
batch.save_block_input_bitmap(&head_header.hash(), &rewind_rm_pos.1)?;
}
{
let clean_output_index = |commit: &[u8]| {
// do we care if this fails?
@ -252,14 +255,14 @@ impl TxHashSet {
self.output_pmmr_h.backend.check_compact(
horizon_marker.output_pos,
&rewind_add_pos,
&rewind_rm_pos,
&rewind_rm_pos.1,
clean_output_index,
)?;
self.rproof_pmmr_h.backend.check_compact(
horizon_marker.output_pos,
&rewind_add_pos,
&rewind_rm_pos,
&rewind_rm_pos.1,
&prune_noop,
)?;
}
@ -340,7 +343,7 @@ where
match res {
Err(e) => {
debug!(LOGGER, "Error returned, discarding txhashset extension.");
debug!(LOGGER, "Error returned, discarding txhashset extension: {:?}", e);
trees.output_pmmr_h.backend.discard();
trees.rproof_pmmr_h.backend.discard();
trees.kernel_pmmr_h.backend.discard();
@ -621,8 +624,6 @@ impl<'a> Extension<'a> {
output_pos: self.output_pmmr.unpruned_size(),
kernel_pos: self.kernel_pmmr.unpruned_size(),
};
//TODO: This doesn't look right
self.batch.save_block_marker(&b.hash(), &marker)?;
self.new_block_markers.insert(b.hash(), marker);
Ok(())
}
@ -781,7 +782,10 @@ impl<'a> Extension<'a> {
// Rewind our MMRs to the appropriate positions
// based on the block_marker.
let marker = self.batch.get_block_marker(&hash)?;
let (output_pos, kernel_pos) = {
let marker = self.batch.get_block_marker(&hash)?;
(marker.output_pos, marker.kernel_pos)
};
// We need to build bitmaps of added and removed output positions
// so we can correctly rewind all operations applied to the output MMR
@ -793,12 +797,15 @@ impl<'a> Extension<'a> {
output_pos_to_rewind(self.commit_index.clone(), block_header, head_header)?;
let rewind_rm_pos =
input_pos_to_rewind(self.commit_index.clone(), block_header, head_header)?;
if !rewind_rm_pos.0 {
self.batch.save_block_input_bitmap(&head_header.hash(), &rewind_rm_pos.1)?;
}
self.rewind_to_pos(
marker.output_pos,
marker.kernel_pos,
output_pos,
kernel_pos,
&rewind_add_pos,
&rewind_rm_pos,
&rewind_rm_pos.1,
rewind_utxo,
rewind_kernel,
rewind_rangeproof,
@ -1132,9 +1139,10 @@ fn input_pos_to_rewind(
commit_index: Arc<ChainStore>,
block_header: &BlockHeader,
head_header: &BlockHeader,
) -> Result<Bitmap, Error> {
) -> Result<(bool, Bitmap), Error> {
let mut bitmap = Bitmap::create();
let mut current = head_header.hash();
let mut found = false;
loop {
if current == block_header.hash() {
break;
@ -1145,10 +1153,10 @@ fn input_pos_to_rewind(
// I/O should be minimized or eliminated here for most
// rewind scenarios.
let current_header = commit_index.get_block_header(&current)?;
let input_bitmap = commit_index.batch()?.get_block_input_bitmap(&current)?;
bitmap.or_inplace(&input_bitmap);
let input_bitmap = commit_index.get_block_input_bitmap(&current)?;
found = input_bitmap.0;
bitmap.or_inplace(&input_bitmap.1);
current = current_header.previous;
}
Ok(bitmap)
Ok((found, bitmap))
}