From 426f4e9d6ba743dc29afdc0b5ec7cc69d3a66242 Mon Sep 17 00:00:00 2001 From: Ignotus Peverell Date: Sat, 4 Feb 2017 11:10:07 -0800 Subject: [PATCH] Added support for batched writes. Updated to rocksdb 0.6.0. --- store/Cargo.toml | 2 +- store/src/lib.rs | 63 ++++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 57 insertions(+), 8 deletions(-) diff --git a/store/Cargo.toml b/store/Cargo.toml index d1a9b8bd3..86cf23c97 100644 --- a/store/Cargo.toml +++ b/store/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" authors = ["Ignotus Peverell "] [dependencies] -rocksdb = { git = "https://github.com/ethcore/rust-rocksdb" } +rocksdb = "^0.6.0" tiny-keccak = "1.1" grin_core = { path = "../core" } diff --git a/store/src/lib.rs b/store/src/lib.rs index d81b166f1..df515f1b9 100644 --- a/store/src/lib.rs +++ b/store/src/lib.rs @@ -23,25 +23,35 @@ extern crate grin_core as core; extern crate rocksdb; +use std::fmt; use std::sync::RwLock; use core::ser; -use rocksdb::{DB, Options, Writable, DBCompactionStyle}; +use rocksdb::{DB, WriteBatch, DBCompactionStyle}; /// Main error type for this crate. #[derive(Debug)] pub enum Error { /// Wraps an error originating from RocksDB (which unfortunately returns /// string errors). - RocksDbErr(String), + RocksDbErr(rocksdb::Error), /// Wraps a serialization error for Writeable or Readable SerErr(ser::Error), } -impl From for Error { - fn from(s: String) -> Error { - Error::RocksDbErr(s) +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + &Error::RocksDbErr(ref e) => write!(f, "RocksDb Error: {}", e.to_string()), + &Error::SerErr(ref e) => write!(f, "Serialization Error: {}", e.to_string()), + } + } +} + +impl From for Error { + fn from(e: rocksdb::Error) -> Error { + Error::RocksDbErr(e) } } @@ -56,9 +66,9 @@ unsafe impl Send for Store {} impl Store { /// Opens a new RocksDB at the specified location. pub fn open(path: &str) -> Result { - let mut opts = Options::new(); + let mut opts = rocksdb::Options::default(); opts.create_if_missing(true); - opts.set_compaction_style(DBCompactionStyle::DBUniversalCompaction); + opts.set_compaction_style(DBCompactionStyle::Universal); opts.set_max_open_files(256); opts.set_use_fsync(false); let db = try!(DB::open(&opts, &path)); @@ -116,4 +126,43 @@ impl Store { let db = self.rdb.write().unwrap(); db.delete(key).map_err(Error::RocksDbErr) } + + /// Builds a new batch to be used with this store. + pub fn batch(&self) -> Batch { + Batch { + store: self, + batch: WriteBatch::default(), + } + } + + fn write(&self, batch: WriteBatch) -> Result<(), Error> { + let db = self.rdb.write().unwrap(); + db.write(batch).map_err(Error::RocksDbErr) + } +} + +/// Batch to write multiple Writeables to RocksDb in an atomic manner. +pub struct Batch<'a> { + store: &'a Store, + batch: WriteBatch, +} + +impl<'a> Batch<'a> { + /// Writes a single key and its `Writeable` value to the batch. The write + /// function must be called to "commit" the batch to storage. + pub fn put_ser(mut self, key: &[u8], value: &ser::Writeable) -> Result, Error> { + let ser_value = ser::ser_vec(value); + match ser_value { + Ok(data) => { + self.batch.put(key, &data[..])?; + Ok(self) + } + Err(err) => Err(Error::SerErr(err)), + } + } + + /// Writes the batch to RocksDb. + pub fn write(self) -> Result<(), Error> { + self.store.write(self.batch) + } }