Refactor prune file replace (#3571)

* split prune file rewrite into two steps
only one needs a mut ref to self

* write both tmp files then replace
This commit is contained in:
Antioch Peverell 2021-02-23 11:40:26 +00:00 committed by GitHub
parent a3c9b478e2
commit 9c44a4d08f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 51 additions and 33 deletions

View file

@ -352,17 +352,17 @@ impl<T: PMMRable> PMMRBackend<T> {
// on the cutoff_pos provided. // on the cutoff_pos provided.
let (leaves_removed, pos_to_rm) = self.pos_to_rm(cutoff_pos, rewind_rm_pos); let (leaves_removed, pos_to_rm) = self.pos_to_rm(cutoff_pos, rewind_rm_pos);
// 1. Save compact copy of the hash file, skipping removed data. // Save compact copy of the hash file, skipping removed data.
{ {
let pos_to_rm = map_vec!(pos_to_rm, |pos| { let pos_to_rm = map_vec!(pos_to_rm, |pos| {
let shift = self.prune_list.get_shift(pos.into()); let shift = self.prune_list.get_shift(pos.into());
pos as u64 - shift pos as u64 - shift
}); });
self.hash_file.save_prune(&pos_to_rm)?; self.hash_file.write_tmp_pruned(&pos_to_rm)?;
} }
// 2. Save compact copy of the data file, skipping removed leaves. // Save compact copy of the data file, skipping removed leaves.
{ {
let leaf_pos_to_rm = pos_to_rm let leaf_pos_to_rm = pos_to_rm
.iter() .iter()
@ -376,10 +376,19 @@ impl<T: PMMRable> PMMRBackend<T> {
flat_pos - shift flat_pos - shift
}); });
self.data_file.save_prune(&pos_to_rm)?; self.data_file.write_tmp_pruned(&pos_to_rm)?;
} }
// 3. Update the prune list and write to disk. // Replace hash and data files with compact copies.
// Rebuild and intialize from the new files.
{
debug!("compact: about to replace hash and data files and rebuild...");
self.hash_file.replace_with_tmp()?;
self.data_file.replace_with_tmp()?;
debug!("compact: ...finished replacing and rebuilding");
}
// Update the prune list and write to disk.
{ {
for pos in leaves_removed.iter() { for pos in leaves_removed.iter() {
self.prune_list.add(pos.into()); self.prune_list.add(pos.into());
@ -387,11 +396,10 @@ impl<T: PMMRable> PMMRBackend<T> {
self.prune_list.flush()?; self.prune_list.flush()?;
} }
// 4. Write the leaf_set to disk. // Write the leaf_set to disk.
// Optimize the bitmap storage in the process. // Optimize the bitmap storage in the process.
self.leaf_set.flush()?; self.leaf_set.flush()?;
// 5. cleanup rewind files
self.clean_rewind_files()?; self.clean_rewind_files()?;
Ok(true) Ok(true)

View file

@ -154,10 +154,16 @@ where
} }
/// Write the file out to disk, pruning removed elements. /// Write the file out to disk, pruning removed elements.
pub fn save_prune(&mut self, prune_pos: &[u64]) -> io::Result<()> { pub fn write_tmp_pruned(&self, prune_pos: &[u64]) -> io::Result<()> {
// Need to convert from 1-index to 0-index (don't ask). // Need to convert from 1-index to 0-index (don't ask).
let prune_idx: Vec<_> = prune_pos.iter().map(|x| x - 1).collect(); let prune_idx: Vec<_> = prune_pos.iter().map(|x| x - 1).collect();
self.file.save_prune(prune_idx.as_slice()) self.file.write_tmp_pruned(prune_idx.as_slice())
}
/// Replace with file at tmp path.
/// Rebuild and initialize from new file.
pub fn replace_with_tmp(&mut self) -> io::Result<()> {
self.file.replace_with_tmp()
} }
} }
@ -485,39 +491,43 @@ where
Ok(file) Ok(file)
} }
fn tmp_path(&self) -> PathBuf {
self.path.with_extension("tmp")
}
/// Saves a copy of the current file content, skipping data at the provided /// Saves a copy of the current file content, skipping data at the provided
/// prune positions. prune_pos must be ordered. /// prune positions. prune_pos must be ordered.
pub fn save_prune(&mut self, prune_pos: &[u64]) -> io::Result<()> { pub fn write_tmp_pruned(&self, prune_pos: &[u64]) -> io::Result<()> {
let tmp_path = self.path.with_extension("tmp"); let reader = File::open(&self.path)?;
let mut buf_reader = BufReader::new(reader);
let mut streaming_reader = StreamingReader::new(&mut buf_reader, self.version);
// Scope the reader and writer to within the block so we can safely replace files later on. let mut buf_writer = BufWriter::new(File::create(&self.tmp_path())?);
{ let mut bin_writer = BinWriter::new(&mut buf_writer, self.version);
let reader = File::open(&self.path)?;
let mut buf_reader = BufReader::new(reader);
let mut streaming_reader = StreamingReader::new(&mut buf_reader, self.version);
let mut buf_writer = BufWriter::new(File::create(&tmp_path)?); let mut current_pos = 0;
let mut bin_writer = BinWriter::new(&mut buf_writer, self.version); let mut prune_pos = prune_pos;
while let Ok(elmt) = T::read(&mut streaming_reader) {
let mut current_pos = 0; if prune_pos.contains(&current_pos) {
let mut prune_pos = prune_pos; // Pruned pos, moving on.
while let Ok(elmt) = T::read(&mut streaming_reader) { prune_pos = &prune_pos[1..];
if prune_pos.contains(&current_pos) { } else {
// Pruned pos, moving on. // Not pruned, write to file.
prune_pos = &prune_pos[1..]; elmt.write(&mut bin_writer)
} else { .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
// Not pruned, write to file.
elmt.write(&mut bin_writer)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
}
current_pos += 1;
} }
buf_writer.flush()?; current_pos += 1;
} }
buf_writer.flush()?;
Ok(())
}
/// Replace the underlying file with the file at tmp path.
/// Rebuild and initialize from the new file.
pub fn replace_with_tmp(&mut self) -> io::Result<()> {
// Replace the underlying file - // Replace the underlying file -
// pmmr_data.tmp -> pmmr_data.bin // pmmr_data.tmp -> pmmr_data.bin
self.replace(&tmp_path)?; self.replace(&self.tmp_path())?;
// Now rebuild our size file to reflect the pruned data file. // Now rebuild our size file to reflect the pruned data file.
// This will replace the underlying file internally. // This will replace the underlying file internally.