Various minor store crate cleanups (#1991)

* Various minor store crate cleanups
* Fix optional IteratingReader counter
* Remove unused vec write functions, with changes to ser they required
This commit is contained in:
Ignotus Peverell 2018-11-29 16:04:02 -08:00 committed by GitHub
parent 16aa64bfab
commit 35df4ad11e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 24 additions and 93 deletions

6
Cargo.lock generated
View file

@ -910,7 +910,7 @@ dependencies = [
"libc 0.2.43 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.43 (registry+https://github.com/rust-lang/crates.io-index)",
"lmdb-zero 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", "lmdb-zero 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"memmap 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", "memmap 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.80 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.80 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_derive 1.0.80 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.80 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1327,7 +1327,7 @@ dependencies = [
[[package]] [[package]]
name = "memmap" name = "memmap"
version = "0.6.2" version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [ dependencies = [
"libc 0.2.43 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.43 (registry+https://github.com/rust-lang/crates.io-index)",
@ -2951,7 +2951,7 @@ dependencies = [
"checksum matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08" "checksum matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08"
"checksum memchr 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "148fab2e51b4f1cfc66da2a7c32981d1d3c083a803978268bb11fe4b86925e7a" "checksum memchr 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "148fab2e51b4f1cfc66da2a7c32981d1d3c083a803978268bb11fe4b86925e7a"
"checksum memchr 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "0a3eb002f0535929f1199681417029ebea04aadc0c7a4224b46be99c7f5d6a16" "checksum memchr 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "0a3eb002f0535929f1199681417029ebea04aadc0c7a4224b46be99c7f5d6a16"
"checksum memmap 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e2ffa2c986de11a9df78620c01eeaaf27d94d3ff02bf81bfcca953102dd0c6ff" "checksum memmap 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6585fd95e7bb50d6cc31e20d4cf9afb4e2ba16c5846fc76793f11218da9c475b"
"checksum memoffset 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "0f9dc261e2b62d7a622bf416ea3c5245cdd5d9a7fcc428c0d06804dfce1775b3" "checksum memoffset 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "0f9dc261e2b62d7a622bf416ea3c5245cdd5d9a7fcc428c0d06804dfce1775b3"
"checksum mime 0.3.12 (registry+https://github.com/rust-lang/crates.io-index)" = "0a907b83e7b9e987032439a387e187119cddafc92d5c2aaeb1d92580a793f630" "checksum mime 0.3.12 (registry+https://github.com/rust-lang/crates.io-index)" = "0a907b83e7b9e987032439a387e187119cddafc92d5c2aaeb1d92580a793f630"
"checksum mime_guess 2.0.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)" = "30de2e4613efcba1ec63d8133f344076952090c122992a903359be5a4f99c3ed" "checksum mime_guess 2.0.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)" = "30de2e4613efcba1ec63d8133f344076952090c122992a903359be5a4f99c3ed"

View file

@ -17,7 +17,7 @@ libc = "0.2"
failure = "0.1" failure = "0.1"
failure_derive = "0.1" failure_derive = "0.1"
lmdb-zero = "0.4.4" lmdb-zero = "0.4.4"
memmap = "0.6.2" memmap = "0.7"
serde = "1" serde = "1"
serde_derive = "1" serde_derive = "1"
log = "0.4" log = "0.4"

View file

@ -140,9 +140,7 @@ impl<T: PMMRable> Backend<T> for PMMRBackend<T> {
// Rewind the hash file accounting for pruned/compacted pos // Rewind the hash file accounting for pruned/compacted pos
let shift = self.prune_list.get_shift(position); let shift = self.prune_list.get_shift(position);
self.hash_file self.hash_file.rewind(position - shift);
.rewind(position - shift)
.map_err(|e| format!("Failed to rewind hash file. {}", e))?;
// Rewind the data file accounting for pruned/compacted pos // Rewind the data file accounting for pruned/compacted pos
let leaf_shift = self.prune_list.get_leaf_shift(position); let leaf_shift = self.prune_list.get_leaf_shift(position);
@ -253,19 +251,16 @@ impl<T: PMMRable> PMMRBackend<T> {
/// Syncs all files to disk. A call to sync is required to ensure all the /// Syncs all files to disk. A call to sync is required to ensure all the
/// data has been successfully written to disk. /// data has been successfully written to disk.
pub fn sync(&mut self) -> io::Result<()> { pub fn sync(&mut self) -> io::Result<()> {
self.hash_file.flush()?; self.hash_file
.flush()
if let Err(e) = self.data_file.flush() { .and(self.data_file.flush())
return Err(io::Error::new( .and(self.leaf_set.flush())
.map_err(|e| {
io::Error::new(
io::ErrorKind::Interrupted, io::ErrorKind::Interrupted,
format!("Could not write to log data storage, disk full? {:?}", e), format!("Could not write to state storage, disk full? {:?}", e),
)); )
} })
// Flush the leaf_set to disk.
self.leaf_set.flush()?;
Ok(())
} }
/// Discard the current, non synced state of the backend. /// Discard the current, non synced state of the backend.
@ -275,23 +270,15 @@ impl<T: PMMRable> PMMRBackend<T> {
self.data_file.discard(); self.data_file.discard();
} }
/// Return the data file path
pub fn data_file_path(&self) -> String {
self.get_data_file_path()
}
/// Takes the leaf_set at a given cutoff_pos and generates an updated /// Takes the leaf_set at a given cutoff_pos and generates an updated
/// prune_list. Saves the updated prune_list to disk /// prune_list. Saves the updated prune_list to disk, compacts the hash
/// Compacts the hash and data files based on the prune_list and saves both /// and data files based on the prune_list and saves both to disk.
/// to disk.
/// ///
/// A cutoff position limits compaction on recent data. /// A cutoff position limits compaction on recent data.
/// This will be the last position of a particular block /// This will be the last position of a particular block to keep things
/// to keep things aligned. /// aligned. The block_marker in the db/index for the particular block
/// The block_marker in the db/index for the particular block /// will have a suitable output_pos. This is used to enforce a horizon
/// will have a suitable output_pos. /// after which the local node should have all the data to allow rewinding.
/// This is used to enforce a horizon after which the local node
/// should have all the data to allow rewinding.
pub fn check_compact<P>( pub fn check_compact<P>(
&mut self, &mut self,
cutoff_pos: u64, cutoff_pos: u64,

View file

@ -14,10 +14,8 @@
//! Common storage-related types //! Common storage-related types
use memmap; use memmap;
use std::cmp;
use std::fs::{self, File, OpenOptions}; use std::fs::{self, File, OpenOptions};
use std::io::{self, BufRead, BufReader, BufWriter, ErrorKind, Read, Write}; use std::io::{self, BufWriter, ErrorKind, Read, Write};
use std::path::Path;
use core::core::hash::Hash; use core::core::hash::Hash;
use core::ser::{self, FixedLength}; use core::ser::{self, FixedLength};
@ -67,9 +65,8 @@ impl HashFile {
} }
/// Rewind the backend file to the specified position. /// Rewind the backend file to the specified position.
pub fn rewind(&mut self, position: u64) -> io::Result<()> { pub fn rewind(&mut self, position: u64) {
self.file.rewind(position * Hash::LEN as u64); self.file.rewind(position * Hash::LEN as u64)
Ok(())
} }
/// Flush unsynced changes to the hash file to disk. /// Flush unsynced changes to the hash file to disk.
@ -320,56 +317,3 @@ impl AppendOnlyFile {
self.path.clone() self.path.clone()
} }
} }
/// Read an ordered vector of scalars from a file.
pub fn read_ordered_vec<T>(path: &str, elmt_len: usize) -> io::Result<Vec<T>>
where
T: ser::Readable + cmp::Ord,
{
let file_path = Path::new(&path);
let mut ovec = Vec::with_capacity(1000);
if file_path.exists() {
let mut file = BufReader::with_capacity(elmt_len * 1000, File::open(&path)?);
loop {
// need a block to end mutable borrow before consume
let buf_len = {
let buf = file.fill_buf()?;
if buf.len() == 0 {
break;
}
let elmts_res: Result<Vec<T>, ser::Error> = ser::deserialize(&mut &buf[..]);
match elmts_res {
Ok(elmts) => for elmt in elmts {
if let Err(idx) = ovec.binary_search(&elmt) {
ovec.insert(idx, elmt);
}
},
Err(_) => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("Corrupted storage, could not read file at {}", path),
));
}
}
buf.len()
};
file.consume(buf_len);
}
}
Ok(ovec)
}
/// Writes an ordered vector to a file
pub fn write_vec<T>(path: &str, v: &Vec<T>) -> io::Result<()>
where
T: ser::Writeable,
{
let mut file_path = File::create(&path)?;
ser::serialize(&mut file_path, v).map_err(|_| {
io::Error::new(
io::ErrorKind::InvalidInput,
format!("Failed to serialize data when writing to {}", path),
)
})?;
Ok(())
}