Batch cache (#1706)

* leverage the cache for reads via the batch
add additional block_sums cache

* rustfmt
This commit is contained in:
Antioch Peverell 2018-10-09 22:48:21 +01:00 committed by GitHub
parent 4a1e3c27bb
commit d10f373911
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -47,6 +47,7 @@ pub struct ChainStore {
db: store::Store,
header_cache: Arc<RwLock<LruCache<Hash, BlockHeader>>>,
block_input_bitmap_cache: Arc<RwLock<LruCache<Hash, Vec<u8>>>>,
block_sums_cache: Arc<RwLock<LruCache<Hash, BlockSums>>>,
}
impl ChainStore {
@ -57,6 +58,7 @@ impl ChainStore {
db,
header_cache: Arc::new(RwLock::new(LruCache::new(1_000))),
block_input_bitmap_cache: Arc::new(RwLock::new(LruCache::new(1_000))),
block_sums_cache: Arc::new(RwLock::new(LruCache::new(1_000))),
})
}
}
@ -92,12 +94,31 @@ impl ChainStore {
self.db.exists(&to_key(BLOCK_PREFIX, &mut h.to_vec()))
}
pub fn get_block_sums(&self, bh: &Hash) -> Result<BlockSums, Error> {
option_to_not_found(
self.db
.get_ser(&to_key(BLOCK_SUMS_PREFIX, &mut bh.to_vec())),
&format!("Block sums for block: {}", bh),
)
pub fn get_block_sums(&self, h: &Hash) -> Result<BlockSums, Error> {
{
let mut block_sums_cache = self.block_sums_cache.write().unwrap();
// cache hit - return the value from the cache
if let Some(block_sums) = block_sums_cache.get_mut(h) {
return Ok(block_sums.clone());
}
}
let block_sums: Result<BlockSums, Error> = option_to_not_found(
self.db.get_ser(&to_key(BLOCK_SUMS_PREFIX, &mut h.to_vec())),
&format!("Block sums for block: {}", h),
);
// cache miss - so adding to the cache for next time
if let Ok(block_sums) = block_sums {
{
let mut block_sums_cache = self.block_sums_cache.write().unwrap();
block_sums_cache.insert(*h, block_sums.clone());
}
Ok(block_sums)
} else {
block_sums
}
}
pub fn get_block_header(&self, h: &Hash) -> Result<BlockHeader, Error> {
@ -176,8 +197,10 @@ impl ChainStore {
/// Builds a new batch to be used with this store.
pub fn batch(&self) -> Result<Batch, Error> {
Ok(Batch {
store: self,
db: self.db.batch()?,
header_cache: self.header_cache.clone(),
block_input_bitmap_cache: self.block_input_bitmap_cache.clone(),
block_sums_cache: self.block_sums_cache.clone(),
})
}
}
@ -185,8 +208,10 @@ impl ChainStore {
/// An atomic batch in which all changes can be committed all at once or
/// discarded on error.
pub struct Batch<'a> {
store: &'a ChainStore,
db: store::Batch<'a>,
header_cache: Arc<RwLock<LruCache<Hash, BlockHeader>>>,
block_sums_cache: Arc<RwLock<LruCache<Hash, BlockSums>>>,
block_input_bitmap_cache: Arc<RwLock<LruCache<Hash, Vec<u8>>>>,
}
#[allow(missing_docs)]
@ -257,14 +282,12 @@ impl<'a> Batch<'a> {
self.db.exists(&to_key(BLOCK_PREFIX, &mut h.to_vec()))
}
/// Save the block and its header
/// Save the block and its header, caching the header.
pub fn save_block(&self, b: &Block) -> Result<(), Error> {
self.save_block_header(&b.header)?;
self.db
.put_ser(&to_key(BLOCK_PREFIX, &mut b.hash().to_vec())[..], b)?;
self.db.put_ser(
&to_key(BLOCK_HEADER_PREFIX, &mut b.hash().to_vec())[..],
&b.header,
)
Ok(())
}
/// Delete a full block. Does not delete any record associated with a block
@ -283,10 +306,16 @@ impl<'a> Batch<'a> {
Ok(())
}
pub fn save_block_header(&self, bh: &BlockHeader) -> Result<(), Error> {
let hash = bh.hash();
pub fn save_block_header(&self, header: &BlockHeader) -> Result<(), Error> {
let hash = header.hash();
{
let mut header_cache = self.header_cache.write().unwrap();
header_cache.insert(hash, header.clone());
}
self.db
.put_ser(&to_key(BLOCK_HEADER_PREFIX, &mut hash.to_vec())[..], bh)?;
.put_ser(&to_key(BLOCK_HEADER_PREFIX, &mut hash.to_vec())[..], header)?;
Ok(())
}
@ -320,11 +349,31 @@ impl<'a> Batch<'a> {
}
pub fn get_block_header(&self, h: &Hash) -> Result<BlockHeader, Error> {
option_to_not_found(
{
let mut header_cache = self.header_cache.write().unwrap();
// cache hit - return the value from the cache
if let Some(header) = header_cache.get_mut(h) {
return Ok(header.clone());
}
}
let header: Result<BlockHeader, Error> = option_to_not_found(
self.db
.get_ser(&to_key(BLOCK_HEADER_PREFIX, &mut h.to_vec())),
&format!("Block header for block: {}", h),
)
&format!("BLOCK HEADER: {}", h),
);
// cache miss - so adding to the cache for next time
if let Ok(header) = header {
{
let mut header_cache = self.header_cache.write().unwrap();
header_cache.insert(*h, header.clone());
}
Ok(header)
} else {
header
}
}
fn save_block_input_bitmap(&self, bh: &Hash, bm: &Bitmap) -> Result<(), Error> {
@ -339,17 +388,41 @@ impl<'a> Batch<'a> {
.delete(&to_key(BLOCK_INPUT_BITMAP_PREFIX, &mut bh.to_vec()))
}
pub fn save_block_sums(&self, bh: &Hash, sums: &BlockSums) -> Result<(), Error> {
pub fn save_block_sums(&self, h: &Hash, sums: &BlockSums) -> Result<(), Error> {
{
let mut block_sums_cache = self.block_sums_cache.write().unwrap();
block_sums_cache.insert(*h, sums.clone());
}
self.db
.put_ser(&to_key(BLOCK_SUMS_PREFIX, &mut bh.to_vec())[..], &sums)
.put_ser(&to_key(BLOCK_SUMS_PREFIX, &mut h.to_vec())[..], &sums)
}
pub fn get_block_sums(&self, bh: &Hash) -> Result<BlockSums, Error> {
option_to_not_found(
self.db
.get_ser(&to_key(BLOCK_SUMS_PREFIX, &mut bh.to_vec())),
&format!("Block sums for block: {}", bh),
)
pub fn get_block_sums(&self, h: &Hash) -> Result<BlockSums, Error> {
{
let mut block_sums_cache = self.block_sums_cache.write().unwrap();
// cache hit - return the value from the cache
if let Some(block_sums) = block_sums_cache.get_mut(h) {
return Ok(block_sums.clone());
}
}
let block_sums: Result<BlockSums, Error> = option_to_not_found(
self.db.get_ser(&to_key(BLOCK_SUMS_PREFIX, &mut h.to_vec())),
&format!("Block sums for block: {}", h),
);
// cache miss - so adding to the cache for next time
if let Ok(block_sums) = block_sums {
{
let mut block_sums_cache = self.block_sums_cache.write().unwrap();
block_sums_cache.insert(*h, block_sums.clone());
}
Ok(block_sums)
} else {
block_sums
}
}
fn delete_block_sums(&self, bh: &Hash) -> Result<(), Error> {
@ -405,7 +478,7 @@ impl<'a> Batch<'a> {
self.save_header_height(&header)?;
if header.height > 0 {
let mut prev_header = self.store.get_block_header(&header.previous)?;
let mut prev_header = self.get_block_header(&header.previous)?;
while prev_header.height > 0 {
if !force {
if let Ok(_) = self.is_on_current_chain(&prev_header) {
@ -414,7 +487,7 @@ impl<'a> Batch<'a> {
}
self.save_header_height(&prev_header)?;
prev_header = self.store.get_block_header(&prev_header.previous)?;
prev_header = self.get_block_header(&prev_header.previous)?;
}
}
Ok(())
@ -438,7 +511,7 @@ impl<'a> Batch<'a> {
self.save_block_input_bitmap(&block.hash(), &bitmap)?;
// Finally cache it locally for use later.
let mut cache = self.store.block_input_bitmap_cache.write().unwrap();
let mut cache = self.block_input_bitmap_cache.write().unwrap();
cache.insert(block.hash(), bitmap.serialize());
Ok(bitmap)
@ -446,7 +519,7 @@ impl<'a> Batch<'a> {
pub fn get_block_input_bitmap(&self, bh: &Hash) -> Result<Bitmap, Error> {
{
let mut cache = self.store.block_input_bitmap_cache.write().unwrap();
let mut cache = self.block_input_bitmap_cache.write().unwrap();
// cache hit - return the value from the cache
if let Some(bytes) = cache.get_mut(bh) {
@ -488,8 +561,10 @@ impl<'a> Batch<'a> {
/// commit, abandoned otherwise.
pub fn child(&mut self) -> Result<Batch, Error> {
Ok(Batch {
store: self.store,
db: self.db.child()?,
header_cache: self.header_cache.clone(),
block_sums_cache: self.block_sums_cache.clone(),
block_input_bitmap_cache: self.block_input_bitmap_cache.clone(),
})
}
}