use hash_file abstraction for pmmr backend (#1925)

* use HashFile (wraps an AppendOnlyFile) in the PMMR backend

* rustfmt

* cleanup

* rustfmt
This commit is contained in:
Antioch Peverell 2018-11-05 12:01:24 +00:00 committed by GitHub
parent 92e41fa571
commit d3a8613e43
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 75 additions and 94 deletions

View file

@ -64,7 +64,7 @@ impl HashOnlyMMRHandle {
let path = Path::new(root_dir).join(sub_dir).join(file_name); let path = Path::new(root_dir).join(sub_dir).join(file_name);
fs::create_dir_all(path.clone())?; fs::create_dir_all(path.clone())?;
let backend = HashOnlyMMRBackend::new(path.to_str().unwrap())?; let backend = HashOnlyMMRBackend::new(path.to_str().unwrap())?;
let last_pos = backend.unpruned_size()?; let last_pos = backend.unpruned_size();
Ok(HashOnlyMMRHandle { backend, last_pos }) Ok(HashOnlyMMRHandle { backend, last_pos })
} }
} }
@ -85,7 +85,7 @@ impl<T: PMMRable> PMMRHandle<T> {
let path = Path::new(root_dir).join(sub_dir).join(file_name); let path = Path::new(root_dir).join(sub_dir).join(file_name);
fs::create_dir_all(path.clone())?; fs::create_dir_all(path.clone())?;
let backend = PMMRBackend::new(path.to_str().unwrap().to_string(), prunable, header)?; let backend = PMMRBackend::new(path.to_str().unwrap().to_string(), prunable, header)?;
let last_pos = backend.unpruned_size()?; let last_pos = backend.unpruned_size();
Ok(PMMRHandle { backend, last_pos }) Ok(PMMRHandle { backend, last_pos })
} }
} }

View file

@ -20,7 +20,7 @@ use croaring::Bitmap;
use core::core::hash::{Hash, Hashed}; use core::core::hash::{Hash, Hashed};
use core::core::pmmr::{self, family, Backend, HashOnlyBackend}; use core::core::pmmr::{self, family, Backend, HashOnlyBackend};
use core::core::BlockHeader; use core::core::BlockHeader;
use core::ser::{self, FixedLength, PMMRable}; use core::ser::{self, PMMRable};
use leaf_set::LeafSet; use leaf_set::LeafSet;
use prune_list::PruneList; use prune_list::PruneList;
use types::{prune_noop, AppendOnlyFile, HashFile}; use types::{prune_noop, AppendOnlyFile, HashFile};
@ -52,7 +52,7 @@ pub const PMMR_FILES: [&str; 4] = [
pub struct PMMRBackend<T: PMMRable> { pub struct PMMRBackend<T: PMMRable> {
data_dir: String, data_dir: String,
prunable: bool, prunable: bool,
hash_file: AppendOnlyFile, hash_file: HashFile,
data_file: AppendOnlyFile, data_file: AppendOnlyFile,
leaf_set: LeafSet, leaf_set: LeafSet,
prune_list: PruneList, prune_list: PruneList,
@ -65,14 +65,15 @@ impl<T: PMMRable> Backend<T> for PMMRBackend<T> {
#[allow(unused_variables)] #[allow(unused_variables)]
fn append(&mut self, data: T, hashes: Vec<Hash>) -> Result<(), String> { fn append(&mut self, data: T, hashes: Vec<Hash>) -> Result<(), String> {
if self.prunable { if self.prunable {
let record_len = Hash::LEN as u64;
let shift = self.prune_list.get_total_shift(); let shift = self.prune_list.get_total_shift();
let position = (self.hash_file.size_unsync() / record_len) + shift + 1; let position = self.hash_file.size_unsync() + shift + 1;
self.leaf_set.add(position); self.leaf_set.add(position);
} }
self.data_file.append(&mut ser::ser_vec(&data).unwrap()); self.data_file.append(&mut ser::ser_vec(&data).unwrap());
for h in &hashes { for h in &hashes {
self.hash_file.append(&mut ser::ser_vec(h).unwrap()); self.hash_file
.append(h)
.map_err(|e| format!("Failed to append hash to file. {}", e))?;
} }
Ok(()) Ok(())
} }
@ -81,27 +82,8 @@ impl<T: PMMRable> Backend<T> for PMMRBackend<T> {
if self.is_compacted(position) { if self.is_compacted(position) {
return None; return None;
} }
let shift = self.prune_list.get_shift(position); let shift = self.prune_list.get_shift(position);
self.hash_file.read(position - shift)
// Read PMMR
// The MMR starts at 1, our binary backend starts at 0
let pos = position - 1;
// Must be on disk, doing a read at the correct position
let hash_record_len = Hash::LEN;
let file_offset = ((pos - shift) as usize) * hash_record_len;
let data = self.hash_file.read(file_offset, hash_record_len);
match ser::deserialize(&mut &data[..]) {
Ok(h) => Some(h),
Err(e) => {
error!(
"Corrupted storage, could not read an entry from hash store: {:?}",
e
);
None
}
}
} }
fn get_data_from_file(&self, position: u64) -> Option<T> { fn get_data_from_file(&self, position: u64) -> Option<T> {
@ -112,9 +94,8 @@ impl<T: PMMRable> Backend<T> for PMMRBackend<T> {
let pos = pmmr::n_leaves(position) - 1; let pos = pmmr::n_leaves(position) - 1;
// Must be on disk, doing a read at the correct position // Must be on disk, doing a read at the correct position
let record_len = T::LEN; let file_offset = ((pos - shift) as usize) * T::LEN;
let file_offset = ((pos - shift) as usize) * record_len; let data = self.data_file.read(file_offset, T::LEN);
let data = self.data_file.read(file_offset, record_len);
match ser::deserialize(&mut &data[..]) { match ser::deserialize(&mut &data[..]) {
Ok(h) => Some(h), Ok(h) => Some(h),
Err(e) => { Err(e) => {
@ -158,15 +139,14 @@ impl<T: PMMRable> Backend<T> for PMMRBackend<T> {
// Rewind the hash file accounting for pruned/compacted pos // Rewind the hash file accounting for pruned/compacted pos
let shift = self.prune_list.get_shift(position); let shift = self.prune_list.get_shift(position);
let record_len = Hash::LEN as u64; self.hash_file
let file_pos = (position - shift) * record_len; .rewind(position - shift)
self.hash_file.rewind(file_pos); .map_err(|e| format!("Failed to rewind hash file. {}", e))?;
// Rewind the data file accounting for pruned/compacted pos // Rewind the data file accounting for pruned/compacted pos
let leaf_shift = self.prune_list.get_leaf_shift(position); let leaf_shift = self.prune_list.get_leaf_shift(position);
let flatfile_pos = pmmr::n_leaves(position); let flatfile_pos = pmmr::n_leaves(position);
let record_len = T::LEN as u64; let file_pos = (flatfile_pos - leaf_shift) * T::LEN as u64;
let file_pos = (flatfile_pos - leaf_shift) * record_len;
self.data_file.rewind(file_pos); self.data_file.rewind(file_pos);
Ok(()) Ok(())
@ -194,9 +174,9 @@ impl<T: PMMRable> Backend<T> for PMMRBackend<T> {
fn dump_stats(&self) { fn dump_stats(&self) {
debug!( debug!(
"pmmr backend: unpruned: {}, hashes: {}, data: {}, leaf_set: {}, prune_list: {}", "pmmr backend: unpruned: {}, hashes: {}, data: {}, leaf_set: {}, prune_list: {}",
self.unpruned_size().unwrap_or(0), self.unpruned_size(),
self.hash_size().unwrap_or(0), self.hash_size(),
self.data_size().unwrap_or(0), self.data_size(),
self.leaf_set.len(), self.leaf_set.len(),
self.prune_list.len(), self.prune_list.len(),
); );
@ -211,7 +191,7 @@ impl<T: PMMRable> PMMRBackend<T> {
prunable: bool, prunable: bool,
header: Option<&BlockHeader>, header: Option<&BlockHeader>,
) -> io::Result<PMMRBackend<T>> { ) -> io::Result<PMMRBackend<T>> {
let hash_file = AppendOnlyFile::open(&format!("{}/{}", data_dir, PMMR_HASH_FILE))?; let hash_file = HashFile::open(&format!("{}/{}", data_dir, PMMR_HASH_FILE))?;
let data_file = AppendOnlyFile::open(&format!("{}/{}", data_dir, PMMR_DATA_FILE))?; let data_file = AppendOnlyFile::open(&format!("{}/{}", data_dir, PMMR_DATA_FILE))?;
let leaf_set_path = format!("{}/{}", data_dir, PMMR_LEAF_FILE); let leaf_set_path = format!("{}/{}", data_dir, PMMR_LEAF_FILE);
@ -251,36 +231,29 @@ impl<T: PMMRable> PMMRBackend<T> {
/// Number of elements in the PMMR stored by this backend. Only produces the /// Number of elements in the PMMR stored by this backend. Only produces the
/// fully sync'd size. /// fully sync'd size.
pub fn unpruned_size(&self) -> io::Result<u64> { pub fn unpruned_size(&self) -> u64 {
let total_shift = self.prune_list.get_total_shift(); let total_shift = self.prune_list.get_total_shift();
let sz = self.hash_file.size();
let record_len = Hash::LEN as u64; sz + total_shift
let sz = self.hash_file.size()?;
Ok(sz / record_len + total_shift)
} }
/// Number of elements in the underlying stored data. Extremely dependent on /// Number of elements in the underlying stored data. Extremely dependent on
/// pruning and compaction. /// pruning and compaction.
pub fn data_size(&self) -> io::Result<u64> { pub fn data_size(&self) -> u64 {
let record_len = T::LEN as u64; self.data_file.size() / T::LEN as u64
self.data_file.size().map(|sz| sz / record_len)
} }
/// Size of the underlying hashed data. Extremely dependent on pruning /// Size of the underlying hashed data. Extremely dependent on pruning
/// and compaction. /// and compaction.
pub fn hash_size(&self) -> io::Result<u64> { pub fn hash_size(&self) -> u64 {
self.hash_file.size().map(|sz| sz / Hash::LEN as u64) self.hash_file.size()
} }
/// Syncs all files to disk. A call to sync is required to ensure all the /// Syncs all files to disk. A call to sync is required to ensure all the
/// data has been successfully written to disk. /// data has been successfully written to disk.
pub fn sync(&mut self) -> io::Result<()> { pub fn sync(&mut self) -> io::Result<()> {
if let Err(e) = self.hash_file.flush() { self.hash_file.flush()?;
return Err(io::Error::new(
io::ErrorKind::Interrupted,
format!("Could not write to log hash storage, disk full? {:?}", e),
));
}
if let Err(e) = self.data_file.flush() { if let Err(e) = self.data_file.flush() {
return Err(io::Error::new( return Err(io::Error::new(
io::ErrorKind::Interrupted, io::ErrorKind::Interrupted,
@ -339,25 +312,17 @@ impl<T: PMMRable> PMMRBackend<T> {
// 1. Save compact copy of the hash file, skipping removed data. // 1. Save compact copy of the hash file, skipping removed data.
{ {
let record_len = Hash::LEN as u64;
let off_to_rm = map_vec!(pos_to_rm, |pos| { let off_to_rm = map_vec!(pos_to_rm, |pos| {
let shift = self.prune_list.get_shift(pos.into()); let shift = self.prune_list.get_shift(pos.into());
((pos as u64) - 1 - shift) * record_len pos as u64 - 1 - shift
}); });
self.hash_file.save_prune( self.hash_file
tmp_prune_file_hash.clone(), .save_prune(tmp_prune_file_hash.clone(), &off_to_rm, &prune_noop)?;
&off_to_rm,
record_len,
&prune_noop,
)?;
} }
// 2. Save compact copy of the data file, skipping removed leaves. // 2. Save compact copy of the data file, skipping removed leaves.
{ {
let record_len = T::LEN as u64;
let leaf_pos_to_rm = pos_to_rm let leaf_pos_to_rm = pos_to_rm
.iter() .iter()
.filter(|&x| pmmr::is_leaf(x.into())) .filter(|&x| pmmr::is_leaf(x.into()))
@ -367,13 +332,13 @@ impl<T: PMMRable> PMMRBackend<T> {
let off_to_rm = map_vec!(leaf_pos_to_rm, |&pos| { let off_to_rm = map_vec!(leaf_pos_to_rm, |&pos| {
let flat_pos = pmmr::n_leaves(pos); let flat_pos = pmmr::n_leaves(pos);
let shift = self.prune_list.get_leaf_shift(pos); let shift = self.prune_list.get_leaf_shift(pos);
(flat_pos - 1 - shift) * record_len (flat_pos - 1 - shift) * T::LEN as u64
}); });
self.data_file.save_prune( self.data_file.save_prune(
tmp_prune_file_data.clone(), tmp_prune_file_data.clone(),
&off_to_rm, &off_to_rm,
record_len, T::LEN as u64,
prune_cb, prune_cb,
)?; )?;
} }
@ -391,7 +356,7 @@ impl<T: PMMRable> PMMRBackend<T> {
tmp_prune_file_hash.clone(), tmp_prune_file_hash.clone(),
format!("{}/{}", self.data_dir, PMMR_HASH_FILE), format!("{}/{}", self.data_dir, PMMR_HASH_FILE),
)?; )?;
self.hash_file = AppendOnlyFile::open(&format!("{}/{}", self.data_dir, PMMR_HASH_FILE))?; self.hash_file = HashFile::open(&format!("{}/{}", self.data_dir, PMMR_HASH_FILE))?;
// 5. Rename the compact copy of the data file and reopen it. // 5. Rename the compact copy of the data file and reopen it.
fs::rename( fs::rename(
@ -477,9 +442,8 @@ impl HashOnlyMMRBackend {
} }
/// The unpruned size of this MMR backend. /// The unpruned size of this MMR backend.
pub fn unpruned_size(&self) -> io::Result<u64> { pub fn unpruned_size(&self) -> u64 {
let sz = self.hash_file.size()?; self.hash_file.size()
Ok(sz / Hash::LEN as u64)
} }
/// Discard any pending changes to this MMR backend. /// Discard any pending changes to this MMR backend.

View file

@ -88,9 +88,27 @@ impl HashFile {
self.file.discard() self.file.discard()
} }
/// Size of the hash file in bytes. /// Size of the hash file in number of hashes (not bytes).
pub fn size(&self) -> io::Result<u64> { pub fn size(&self) -> u64 {
self.file.size() self.file.size() / Hash::LEN as u64
}
/// Size of the unsync'd hash file, in hashes (not bytes).
pub fn size_unsync(&self) -> u64 {
self.file.size_unsync() / Hash::LEN as u64
}
/// Rewrite the hash file out to disk, pruning removed hashes.
pub fn save_prune<T>(&self, target: String, prune_offs: &[u64], prune_cb: T) -> io::Result<()>
where
T: Fn(&[u8]),
{
let prune_offs = prune_offs
.iter()
.map(|x| x * Hash::LEN as u64)
.collect::<Vec<_>>();
self.file
.save_prune(target, prune_offs.as_slice(), Hash::LEN as u64, prune_cb)
} }
} }
@ -127,13 +145,12 @@ impl AppendOnlyFile {
buffer: vec![], buffer: vec![],
buffer_start_bak: 0, buffer_start_bak: 0,
}; };
// if we have a non-empty file then mmap it. // If we have a non-empty file then mmap it.
if let Ok(sz) = aof.size() { let sz = aof.size();
if sz > 0 { if sz > 0 {
aof.buffer_start = sz as usize; aof.buffer_start = sz as usize;
aof.mmap = Some(unsafe { memmap::Mmap::map(&aof.file)? }); aof.mmap = Some(unsafe { memmap::Mmap::map(&aof.file)? });
} }
}
Ok(aof) Ok(aof)
} }
@ -306,8 +323,8 @@ impl AppendOnlyFile {
} }
/// Current size of the file in bytes. /// Current size of the file in bytes.
pub fn size(&self) -> io::Result<u64> { pub fn size(&self) -> u64 {
fs::metadata(&self.path).map(|md| md.len()) fs::metadata(&self.path).map(|md| md.len()).unwrap_or(0)
} }
/// Current size of the (unsynced) file in bytes. /// Current size of the (unsynced) file in bytes.

View file

@ -248,7 +248,7 @@ fn pmmr_reload() {
.unwrap(); .unwrap();
backend.sync().unwrap(); backend.sync().unwrap();
assert_eq!(backend.unpruned_size().unwrap(), mmr_size); assert_eq!(backend.unpruned_size(), mmr_size);
// prune some more to get rm log data // prune some more to get rm log data
{ {
@ -256,14 +256,14 @@ fn pmmr_reload() {
pmmr.prune(5).unwrap(); pmmr.prune(5).unwrap();
} }
backend.sync().unwrap(); backend.sync().unwrap();
assert_eq!(backend.unpruned_size().unwrap(), mmr_size); assert_eq!(backend.unpruned_size(), mmr_size);
} }
// create a new backend referencing the data files // create a new backend referencing the data files
// and check everything still works as expected // and check everything still works as expected
{ {
let mut backend = store::pmmr::PMMRBackend::new(data_dir.to_string(), true, None).unwrap(); let mut backend = store::pmmr::PMMRBackend::new(data_dir.to_string(), true, None).unwrap();
assert_eq!(backend.unpruned_size().unwrap(), mmr_size); assert_eq!(backend.unpruned_size(), mmr_size);
{ {
let pmmr: PMMR<TestElem, _> = PMMR::at(&mut backend, mmr_size); let pmmr: PMMR<TestElem, _> = PMMR::at(&mut backend, mmr_size);
assert_eq!(root, pmmr.root()); assert_eq!(root, pmmr.root());
@ -397,7 +397,7 @@ fn pmmr_rewind() {
assert_eq!(backend.get_data(9), Some(elems[5])); assert_eq!(backend.get_data(9), Some(elems[5]));
assert_eq!(backend.get_hash(9), Some(elems[5].hash_with_index(8))); assert_eq!(backend.get_hash(9), Some(elems[5].hash_with_index(8)));
assert_eq!(backend.data_size().unwrap(), 2); assert_eq!(backend.data_size(), 2);
{ {
let mut pmmr: PMMR<TestElem, _> = PMMR::at(&mut backend, 10); let mut pmmr: PMMR<TestElem, _> = PMMR::at(&mut backend, 10);
@ -419,7 +419,7 @@ fn pmmr_rewind() {
// check we have no data in the backend after // check we have no data in the backend after
// pruning, compacting and rewinding // pruning, compacting and rewinding
assert_eq!(backend.data_size().unwrap(), 0); assert_eq!(backend.data_size(), 0);
teardown(data_dir); teardown(data_dir);
} }
@ -510,8 +510,8 @@ fn pmmr_compact_horizon() {
// 0010012001001230 // 0010012001001230
// 9 leaves // 9 leaves
assert_eq!(backend.data_size().unwrap(), 19); assert_eq!(backend.data_size(), 19);
assert_eq!(backend.hash_size().unwrap(), 35); assert_eq!(backend.hash_size(), 35);
let pos_1_hash = backend.get_hash(1).unwrap(); let pos_1_hash = backend.get_hash(1).unwrap();
let pos_2_hash = backend.get_hash(2).unwrap(); let pos_2_hash = backend.get_hash(2).unwrap();
@ -589,8 +589,8 @@ fn pmmr_compact_horizon() {
let backend = let backend =
store::pmmr::PMMRBackend::<TestElem>::new(data_dir.to_string(), true, None).unwrap(); store::pmmr::PMMRBackend::<TestElem>::new(data_dir.to_string(), true, None).unwrap();
assert_eq!(backend.data_size().unwrap(), 19); assert_eq!(backend.data_size(), 19);
assert_eq!(backend.hash_size().unwrap(), 35); assert_eq!(backend.hash_size(), 35);
// check we can read a hash by pos correctly from recreated backend // check we can read a hash by pos correctly from recreated backend
assert_eq!(backend.get_hash(7), Some(pos_7_hash)); assert_eq!(backend.get_hash(7), Some(pos_7_hash));
@ -625,8 +625,8 @@ fn pmmr_compact_horizon() {
// 0010012001001230 // 0010012001001230
assert_eq!(backend.data_size().unwrap(), 13); assert_eq!(backend.data_size(), 13);
assert_eq!(backend.hash_size().unwrap(), 27); assert_eq!(backend.hash_size(), 27);
// check we can read a hash by pos correctly from recreated backend // check we can read a hash by pos correctly from recreated backend
// get_hash() and get_from_file() should return the same value // get_hash() and get_from_file() should return the same value