Cleanup how we deal with fixed size vs variable size data files (#2757)

introduce a SizeInfo enum (to replace two options)
This commit is contained in:
Antioch Peverell 2019-04-24 21:43:06 +01:00 committed by Ignotus Peverell
parent e72409d804
commit de21f0d62d
2 changed files with 53 additions and 70 deletions

View file

@ -21,7 +21,7 @@ use crate::core::core::BlockHeader;
use crate::core::ser::{FixedLength, PMMRable}; use crate::core::ser::{FixedLength, PMMRable};
use crate::leaf_set::LeafSet; use crate::leaf_set::LeafSet;
use crate::prune_list::PruneList; use crate::prune_list::PruneList;
use crate::types::DataFile; use crate::types::{AppendOnlyFile, DataFile, SizeEntry, SizeInfo};
use croaring::Bitmap; use croaring::Bitmap;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
@ -201,21 +201,22 @@ impl<T: PMMRable> PMMRBackend<T> {
) -> io::Result<PMMRBackend<T>> { ) -> io::Result<PMMRBackend<T>> {
let data_dir = data_dir.as_ref(); let data_dir = data_dir.as_ref();
// We either have a fixed size *or* a path to a file for tracking sizes. // Are we dealing with "fixed size" data elements or "variable size" data elements
let (elmt_size, size_path) = if fixed_size { // maintained in an associated size file?
(Some(T::E::LEN as u16), None) let size_info = if fixed_size {
SizeInfo::FixedSize(T::E::LEN as u16)
} else { } else {
(None, Some(data_dir.join(PMMR_SIZE_FILE))) SizeInfo::VariableSize(Box::new(AppendOnlyFile::open(
data_dir.join(PMMR_SIZE_FILE),
SizeInfo::FixedSize(SizeEntry::LEN as u16),
)?))
}; };
// Hash file is always "fixed size" and we use 32 bytes per hash. // Hash file is always "fixed size" and we use 32 bytes per hash.
let hash_file = let hash_size_info = SizeInfo::FixedSize(Hash::LEN as u16);
DataFile::open(&data_dir.join(PMMR_HASH_FILE), None, Some(Hash::LEN as u16))?;
let data_file = DataFile::open( let hash_file = DataFile::open(&data_dir.join(PMMR_HASH_FILE), hash_size_info)?;
&data_dir.join(PMMR_DATA_FILE), let data_file = DataFile::open(&data_dir.join(PMMR_DATA_FILE), size_info)?;
size_path.as_ref(),
elmt_size,
)?;
let leaf_set_path = data_dir.join(PMMR_LEAF_FILE); let leaf_set_path = data_dir.join(PMMR_LEAF_FILE);

View file

@ -59,7 +59,15 @@ impl Writeable for SizeEntry {
} }
} }
/// Data file (MMR) wrapper around an append only file. /// Are we dealing with "fixed size" data or "variable size" data in a data file?
pub enum SizeInfo {
/// Fixed size data.
FixedSize(u16),
/// Variable size data.
VariableSize(Box<AppendOnlyFile<SizeEntry>>),
}
/// Data file (MMR) wrapper around an append-only file.
pub struct DataFile<T> { pub struct DataFile<T> {
file: AppendOnlyFile<T>, file: AppendOnlyFile<T>,
} }
@ -69,21 +77,13 @@ where
T: Readable + Writeable + Debug, T: Readable + Writeable + Debug,
{ {
/// Open (or create) a file at the provided path on disk. /// Open (or create) a file at the provided path on disk.
pub fn open<P>(path: P, size_path: Option<P>, elmt_size: Option<u16>) -> io::Result<DataFile<T>> pub fn open<P>(path: P, size_info: SizeInfo) -> io::Result<DataFile<T>>
where where
P: AsRef<Path> + Debug, P: AsRef<Path> + Debug,
{ {
let size_file = if let Some(size_path) = size_path { Ok(DataFile {
Some(AppendOnlyFile::open( file: AppendOnlyFile::open(path, size_info)?,
size_path, })
None,
Some(SizeEntry::LEN as u16),
)?)
} else {
None
};
let file = AppendOnlyFile::open(path, size_file, elmt_size)?;
Ok(DataFile { file })
} }
/// Append an element to the file. /// Append an element to the file.
@ -168,11 +168,7 @@ where
pub struct AppendOnlyFile<T> { pub struct AppendOnlyFile<T> {
path: PathBuf, path: PathBuf,
file: Option<File>, file: Option<File>,
size_info: SizeInfo,
// We either have a fixed_size or an associated "size" file.
elmt_size: Option<u16>,
size_file: Option<Box<AppendOnlyFile<SizeEntry>>>,
mmap: Option<memmap::Mmap>, mmap: Option<memmap::Mmap>,
// Buffer of unsync'd bytes. These bytes will be appended to the file when flushed. // Buffer of unsync'd bytes. These bytes will be appended to the file when flushed.
@ -187,20 +183,15 @@ where
T: Debug + Readable + Writeable, T: Debug + Readable + Writeable,
{ {
/// Open a file (existing or not) as append-only, backed by a mmap. /// Open a file (existing or not) as append-only, backed by a mmap.
pub fn open<P>( pub fn open<P>(path: P, size_info: SizeInfo) -> io::Result<AppendOnlyFile<T>>
path: P,
size_file: Option<AppendOnlyFile<SizeEntry>>,
elmt_size: Option<u16>,
) -> io::Result<AppendOnlyFile<T>>
where where
P: AsRef<Path> + Debug, P: AsRef<Path> + Debug,
{ {
let mut aof = AppendOnlyFile { let mut aof = AppendOnlyFile {
file: None, file: None,
path: path.as_ref().to_path_buf(), path: path.as_ref().to_path_buf(),
elmt_size, size_info,
mmap: None, mmap: None,
size_file: size_file.map(|x| Box::new(x)),
buffer: vec![], buffer: vec![],
buffer_start_pos: 0, buffer_start_pos: 0,
buffer_start_pos_bak: 0, buffer_start_pos_bak: 0,
@ -212,7 +203,7 @@ where
// This will occur during "fast sync" as we do not sync the size_file // This will occur during "fast sync" as we do not sync the size_file
// and must build it locally. // and must build it locally.
// And we can *only* do this after init() the data file (so we know sizes). // And we can *only* do this after init() the data file (so we know sizes).
if let Some(ref mut size_file) = &mut aof.size_file { if let SizeInfo::VariableSize(ref mut size_file) = &mut aof.size_info {
if size_file.size()? == 0 { if size_file.size()? == 0 {
aof.rebuild_size_file()?; aof.rebuild_size_file()?;
@ -228,7 +219,7 @@ where
/// (Re)init an underlying file and its associated memmap. /// (Re)init an underlying file and its associated memmap.
/// Taking care to initialize the mmap_offset_cache for each element. /// Taking care to initialize the mmap_offset_cache for each element.
pub fn init(&mut self) -> io::Result<()> { pub fn init(&mut self) -> io::Result<()> {
if let Some(ref mut size_file) = self.size_file { if let SizeInfo::VariableSize(ref mut size_file) = self.size_info {
size_file.init()?; size_file.init()?;
} }
@ -252,22 +243,18 @@ where
} }
fn size_in_elmts(&self) -> io::Result<u64> { fn size_in_elmts(&self) -> io::Result<u64> {
if let Some(elmt_size) = self.elmt_size { match self.size_info {
Ok(self.size()? / elmt_size as u64) SizeInfo::FixedSize(elmt_size) => Ok(self.size()? / elmt_size as u64),
} else if let Some(ref size_file) = &self.size_file { SizeInfo::VariableSize(ref size_file) => size_file.size_in_elmts(),
size_file.size_in_elmts()
} else {
Ok(0)
} }
} }
fn size_unsync_in_elmts(&self) -> io::Result<u64> { fn size_unsync_in_elmts(&self) -> io::Result<u64> {
if let Some(elmt_size) = self.elmt_size { match self.size_info {
Ok(self.buffer_start_pos + (self.buffer.len() as u64 / elmt_size as u64)) SizeInfo::FixedSize(elmt_size) => {
} else if let Some(ref size_file) = &self.size_file { Ok(self.buffer_start_pos + (self.buffer.len() as u64 / elmt_size as u64))
size_file.size_unsync_in_elmts() }
} else { SizeInfo::VariableSize(ref size_file) => size_file.size_unsync_in_elmts(),
Err(io::Error::new(io::ErrorKind::Other, "size file missing"))
} }
} }
@ -281,7 +268,7 @@ where
/// Append data to the file. Until the append-only file is synced, data is /// Append data to the file. Until the append-only file is synced, data is
/// only written to memory. /// only written to memory.
pub fn append(&mut self, bytes: &mut [u8]) -> io::Result<()> { pub fn append(&mut self, bytes: &mut [u8]) -> io::Result<()> {
if let Some(ref mut size_file) = &mut self.size_file { if let SizeInfo::VariableSize(ref mut size_file) = &mut self.size_info {
let next_pos = size_file.size_unsync_in_elmts()?; let next_pos = size_file.size_unsync_in_elmts()?;
let offset = if next_pos == 0 { let offset = if next_pos == 0 {
0 0
@ -303,18 +290,13 @@ where
// If pos is in the buffer then caller needs to remember to account for this // If pos is in the buffer then caller needs to remember to account for this
// when reading from the buffer. // when reading from the buffer.
fn offset_and_size(&self, pos: u64) -> io::Result<(u64, u16)> { fn offset_and_size(&self, pos: u64) -> io::Result<(u64, u16)> {
if let Some(size) = self.elmt_size { match self.size_info {
// Calculating offset and size is simple if we have fixed size elements. SizeInfo::FixedSize(elmt_size) => Ok((pos * elmt_size as u64, elmt_size)),
Ok((pos * size as u64, size)) SizeInfo::VariableSize(ref size_file) => {
} else if let Some(ref size_file) = &self.size_file { // Otherwise we need to calculate offset and size from entries in the size_file.
// Otherwise we need to calculate offset and size from entries in the size_file. let entry = size_file.read_as_elmt(pos)?;
let entry = size_file.read_as_elmt(pos)?; Ok((entry.offset, entry.size))
Ok((entry.offset, entry.size)) }
} else {
Err(io::Error::new(
io::ErrorKind::Other,
"variable size, missing size file",
))
} }
} }
@ -322,7 +304,7 @@ where
/// We simply "rewind" the buffer_start_pos to the specified position. /// We simply "rewind" the buffer_start_pos to the specified position.
/// Note: We do not currently support rewinding within the buffer itself. /// Note: We do not currently support rewinding within the buffer itself.
pub fn rewind(&mut self, pos: u64) { pub fn rewind(&mut self, pos: u64) {
if let Some(ref mut size_file) = &mut self.size_file { if let SizeInfo::VariableSize(ref mut size_file) = &mut self.size_info {
size_file.rewind(pos); size_file.rewind(pos);
} }
@ -335,7 +317,7 @@ where
/// Syncs all writes (fsync), reallocating the memory map to make the newly /// Syncs all writes (fsync), reallocating the memory map to make the newly
/// written data accessible. /// written data accessible.
pub fn flush(&mut self) -> io::Result<()> { pub fn flush(&mut self) -> io::Result<()> {
if let Some(ref mut size_file) = &mut self.size_file { if let SizeInfo::VariableSize(ref mut size_file) = &mut self.size_info {
// Flush the associated size_file if we have one. // Flush the associated size_file if we have one.
size_file.flush()? size_file.flush()?
} }
@ -398,7 +380,7 @@ where
} }
// Discarding the data file will discard the associated size file if we have one. // Discarding the data file will discard the associated size file if we have one.
if let Some(ref mut size_file) = &mut self.size_file { if let SizeInfo::VariableSize(ref mut size_file) = &mut self.size_info {
size_file.discard(); size_file.discard();
} }
@ -492,7 +474,7 @@ where
// Now rebuild our size file to reflect the pruned data file. // Now rebuild our size file to reflect the pruned data file.
// This will replace the underlying file internally. // This will replace the underlying file internally.
if let Some(_) = &self.size_file { if let SizeInfo::VariableSize(_) = &self.size_info {
self.rebuild_size_file()?; self.rebuild_size_file()?;
} }
@ -503,7 +485,7 @@ where
} }
fn rebuild_size_file(&mut self) -> io::Result<()> { fn rebuild_size_file(&mut self) -> io::Result<()> {
if let Some(ref mut size_file) = &mut self.size_file { if let SizeInfo::VariableSize(ref mut size_file) = &mut self.size_info {
// Note: Reading from data file and writing sizes to the associated (tmp) size_file. // Note: Reading from data file and writing sizes to the associated (tmp) size_file.
let tmp_path = size_file.path.with_extension("tmp"); let tmp_path = size_file.path.with_extension("tmp");
@ -563,7 +545,7 @@ where
self.file = None; self.file = None;
// Remember to release the size_file as well if we have one. // Remember to release the size_file as well if we have one.
if let Some(ref mut size_file) = self.size_file { if let SizeInfo::VariableSize(ref mut size_file) = &mut self.size_info {
size_file.release(); size_file.release();
} }
} }