diff --git a/Cargo.lock b/Cargo.lock index fcdee46df..a8a618661 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -747,7 +747,6 @@ dependencies = [ "grin_store 1.1.0", "grin_util 1.1.0", "lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", - "lmdb-zero 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "lru-cache 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -831,7 +830,6 @@ dependencies = [ "grin_pool 1.1.0", "grin_store 1.1.0", "grin_util 1.1.0", - "lmdb-zero 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "net2 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)", "num 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/chain/Cargo.toml b/chain/Cargo.toml index d5ee39635..5f36d7b0b 100644 --- a/chain/Cargo.toml +++ b/chain/Cargo.toml @@ -12,7 +12,6 @@ edition = "2018" [dependencies] bitflags = "1" byteorder = "1" -lmdb-zero = "0.4.4" failure = "0.1" failure_derive = "0.1" croaring = "0.3" diff --git a/chain/src/chain.rs b/chain/src/chain.rs index 93b3529e3..fd0148d09 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -24,7 +24,6 @@ use crate::core::core::{ use crate::core::global; use crate::core::pow; use crate::error::{Error, ErrorKind}; -use crate::lmdb; use crate::pipe; use crate::store; use crate::txhashset; @@ -160,7 +159,6 @@ impl Chain { /// based on the genesis block if necessary. pub fn init( db_root: String, - db_env: Arc<lmdb::Environment>, adapter: Arc<dyn ChainAdapter + Send + Sync>, genesis: Block, pow_verifier: fn(&BlockHeader) -> Result<(), pow::Error>, @@ -177,7 +175,7 @@ impl Chain { return Err(ErrorKind::Stopped.into()); } - let store = Arc::new(store::ChainStore::new(db_env)?); + let store = Arc::new(store::ChainStore::new(&db_root)?); // open the txhashset, creating a new one if necessary let mut txhashset = txhashset::TxHashSet::open(db_root.clone(), store.clone(), None)?; @@ -985,6 +983,7 @@ impl Chain { let mut current = self.get_header_by_height(head.height - horizon - 1)?; let batch = self.store.batch()?; + loop { // Go to the store directly so we can handle NotFoundErr robustly. match self.store.get_block(¤t.hash()) { diff --git a/chain/src/lib.rs b/chain/src/lib.rs index 559106513..cf8f8d077 100644 --- a/chain/src/lib.rs +++ b/chain/src/lib.rs @@ -23,8 +23,6 @@ #[macro_use] extern crate bitflags; -use lmdb_zero as lmdb; - #[macro_use] extern crate serde_derive; #[macro_use] diff --git a/chain/src/store.rs b/chain/src/store.rs index 1b0265316..1ce6a2f00 100644 --- a/chain/src/store.rs +++ b/chain/src/store.rs @@ -18,7 +18,6 @@ use crate::core::consensus::HeaderInfo; use crate::core::core::hash::{Hash, Hashed}; use crate::core::core::{Block, BlockHeader, BlockSums}; use crate::core::pow::Difficulty; -use crate::lmdb; use crate::types::Tip; use crate::util::secp::pedersen::Commitment; use croaring::Bitmap; @@ -45,8 +44,8 @@ pub struct ChainStore { impl ChainStore { /// Create new chain store - pub fn new(db_env: Arc<lmdb::Environment>) -> Result<ChainStore, Error> { - let db = store::Store::open(db_env, STORE_SUBPATH); + pub fn new(db_root: &str) -> Result<ChainStore, Error> { + let db = store::Store::new(db_root, Some(STORE_SUBPATH.clone()), None)?; Ok(ChainStore { db }) } } diff --git a/chain/tests/data_file_integrity.rs b/chain/tests/data_file_integrity.rs index 90da6c9ce..667b4a8e9 100644 --- a/chain/tests/data_file_integrity.rs +++ b/chain/tests/data_file_integrity.rs @@ -26,7 +26,6 @@ use chrono::Duration; use grin_chain as chain; use grin_core as core; use grin_keychain as keychain; -use grin_store as store; use grin_util as util; use std::fs; use std::sync::Arc; @@ -41,10 +40,8 @@ fn setup(dir_name: &str) -> Chain { global::set_mining_mode(ChainTypes::AutomatedTesting); let genesis_block = pow::mine_genesis_block().unwrap(); let verifier_cache = Arc::new(RwLock::new(LruVerifierCache::new())); - let db_env = Arc::new(store::new_env(dir_name.to_string())); chain::Chain::init( dir_name.to_string(), - db_env, Arc::new(NoopAdapter {}), genesis_block, pow::verify_size, @@ -57,10 +54,8 @@ fn setup(dir_name: &str) -> Chain { fn reload_chain(dir_name: &str) -> Chain { let verifier_cache = Arc::new(RwLock::new(LruVerifierCache::new())); - let db_env = Arc::new(store::new_env(dir_name.to_string())); chain::Chain::init( dir_name.to_string(), - db_env, Arc::new(NoopAdapter {}), genesis::genesis_dev(), pow::verify_size, diff --git a/chain/tests/mine_simple_chain.rs b/chain/tests/mine_simple_chain.rs index 997cbe129..3c545bbcb 100644 --- a/chain/tests/mine_simple_chain.rs +++ b/chain/tests/mine_simple_chain.rs @@ -28,7 +28,6 @@ use chrono::Duration; use grin_chain as chain; use grin_core as core; use grin_keychain as keychain; -use grin_store as store; use grin_util as util; use std::fs; use std::sync::Arc; @@ -41,10 +40,8 @@ fn setup(dir_name: &str, genesis: Block) -> Chain { util::init_test_logger(); clean_output_dir(dir_name); let verifier_cache = Arc::new(RwLock::new(LruVerifierCache::new())); - let db_env = Arc::new(store::new_env(dir_name.to_string())); chain::Chain::init( dir_name.to_string(), - db_env, Arc::new(NoopAdapter {}), genesis, pow::verify_size, @@ -532,11 +529,9 @@ where fn actual_diff_iter_output() { global::set_mining_mode(ChainTypes::AutomatedTesting); let genesis_block = pow::mine_genesis_block().unwrap(); - let db_env = Arc::new(store::new_env(".grin".to_string())); let verifier_cache = Arc::new(RwLock::new(LruVerifierCache::new())); let chain = chain::Chain::init( "../.grin".to_string(), - db_env, Arc::new(NoopAdapter {}), genesis_block, pow::verify_size, diff --git a/chain/tests/store_indices.rs b/chain/tests/store_indices.rs index a5686e278..adfab597a 100644 --- a/chain/tests/store_indices.rs +++ b/chain/tests/store_indices.rs @@ -23,7 +23,6 @@ use env_logger; use grin_chain as chain; use grin_core as core; use grin_keychain as keychain; -use grin_store as store; use std::fs; use std::sync::Arc; @@ -53,9 +52,8 @@ fn test_various_store_indices() { let keychain = ExtKeychain::from_random_seed(false).unwrap(); let key_id = ExtKeychainPath::new(1, 1, 0, 0, 0).to_identifier(); - let db_env = Arc::new(store::new_env(chain_dir.to_string())); - let chain_store = Arc::new(chain::store::ChainStore::new(db_env).unwrap()); + let chain_store = Arc::new(chain::store::ChainStore::new(chain_dir).unwrap()); global::set_mining_mode(ChainTypes::AutomatedTesting); let genesis = pow::mine_genesis_block().unwrap(); diff --git a/chain/tests/test_coinbase_maturity.rs b/chain/tests/test_coinbase_maturity.rs index ec42a841d..821f0840f 100644 --- a/chain/tests/test_coinbase_maturity.rs +++ b/chain/tests/test_coinbase_maturity.rs @@ -26,7 +26,6 @@ use env_logger; use grin_chain as chain; use grin_core as core; use grin_keychain as keychain; -use grin_store as store; use grin_util as util; use std::fs; use std::sync::Arc; @@ -45,10 +44,8 @@ fn test_coinbase_maturity() { let verifier_cache = Arc::new(RwLock::new(LruVerifierCache::new())); - let db_env = Arc::new(store::new_env(".grin".to_string())); let chain = chain::Chain::init( ".grin".to_string(), - db_env, Arc::new(NoopAdapter {}), genesis_block, pow::verify_size, diff --git a/chain/tests/test_txhashset.rs b/chain/tests/test_txhashset.rs index 9e53a30fd..6d81a18b8 100644 --- a/chain/tests/test_txhashset.rs +++ b/chain/tests/test_txhashset.rs @@ -15,7 +15,6 @@ use grin_chain as chain; use grin_core as core; -use grin_store as store; use grin_util as util; use std::collections::HashSet; @@ -38,8 +37,7 @@ fn clean_output_dir(dir_name: &str) { fn test_unexpected_zip() { let db_root = format!(".grin_txhashset_zip"); clean_output_dir(&db_root); - let db_env = Arc::new(store::new_env(db_root.clone())); - let chain_store = ChainStore::new(db_env).unwrap(); + let chain_store = ChainStore::new(&db_root).unwrap(); let store = Arc::new(chain_store); txhashset::TxHashSet::open(db_root.clone(), store.clone(), None).unwrap(); let head = BlockHeader::default(); diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 377957796..7b70234c2 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -13,7 +13,6 @@ edition = "2018" bitflags = "1" bytes = "0.4" enum_primitive = "0.1" -lmdb-zero = "0.4.4" net2 = "0.2" num = "0.1" rand = "0.5" diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index 3ba530201..920b75132 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -25,7 +25,6 @@ extern crate bitflags; #[macro_use] extern crate enum_primitive; -use lmdb_zero as lmdb; #[macro_use] extern crate grin_core as core; diff --git a/p2p/src/serv.rs b/p2p/src/serv.rs index 8be990846..55e7ff8f1 100644 --- a/p2p/src/serv.rs +++ b/p2p/src/serv.rs @@ -18,8 +18,6 @@ use std::sync::Arc; use std::time::Duration; use std::{io, thread}; -use crate::lmdb; - use crate::core::core; use crate::core::core::hash::Hash; use crate::core::global; @@ -48,7 +46,7 @@ pub struct Server { impl Server { /// Creates a new idle p2p server with no peers pub fn new( - db_env: Arc<lmdb::Environment>, + db_root: &str, capab: Capabilities, config: P2PConfig, adapter: Arc<dyn ChainAdapter>, @@ -59,7 +57,7 @@ impl Server { config: config.clone(), capabilities: capab, handshake: Arc::new(Handshake::new(genesis, config.clone())), - peers: Arc::new(Peers::new(PeerStore::new(db_env)?, adapter, config)), + peers: Arc::new(Peers::new(PeerStore::new(db_root)?, adapter, config)), stop_state, }) } diff --git a/p2p/src/store.rs b/p2p/src/store.rs index 074cc066f..597df3f0d 100644 --- a/p2p/src/store.rs +++ b/p2p/src/store.rs @@ -17,9 +17,6 @@ use chrono::Utc; use num::FromPrimitive; use rand::{thread_rng, Rng}; -use std::sync::Arc; - -use crate::lmdb; use crate::core::ser::{self, Readable, Reader, Writeable, Writer}; use crate::types::{Capabilities, PeerAddr, ReasonForBan}; @@ -117,8 +114,8 @@ pub struct PeerStore { impl PeerStore { /// Instantiates a new peer store under the provided root path. - pub fn new(db_env: Arc<lmdb::Environment>) -> Result<PeerStore, Error> { - let db = grin_store::Store::open(db_env, STORE_SUBPATH); + pub fn new(db_root: &str) -> Result<PeerStore, Error> { + let db = grin_store::Store::new(db_root, Some(STORE_SUBPATH), None)?; Ok(PeerStore { db: db }) } diff --git a/p2p/tests/peer_handshake.rs b/p2p/tests/peer_handshake.rs index 57c335e4d..9bd9db806 100644 --- a/p2p/tests/peer_handshake.rs +++ b/p2p/tests/peer_handshake.rs @@ -15,7 +15,6 @@ use grin_core as core; use grin_p2p as p2p; -use grin_store as store; use grin_util as util; use grin_util::{Mutex, StopState}; @@ -50,10 +49,9 @@ fn peer_handshake() { ..p2p::P2PConfig::default() }; let net_adapter = Arc::new(p2p::DummyAdapter {}); - let db_env = Arc::new(store::new_env(".grin".to_string())); let server = Arc::new( p2p::Server::new( - db_env, + ".grin", p2p::Capabilities::UNKNOWN, p2p_config.clone(), net_adapter.clone(), diff --git a/pool/tests/common.rs b/pool/tests/common.rs index b4bd83554..d69bebc63 100644 --- a/pool/tests/common.rs +++ b/pool/tests/common.rs @@ -29,7 +29,6 @@ use grin_chain as chain; use grin_core as core; use grin_keychain as keychain; use grin_pool as pool; -use grin_store as store; use grin_util as util; use std::collections::HashSet; use std::fs; @@ -37,17 +36,16 @@ use std::sync::Arc; #[derive(Clone)] pub struct ChainAdapter { - pub store: Arc<ChainStore>, + pub store: Arc<RwLock<ChainStore>>, pub utxo: Arc<RwLock<HashSet<Commitment>>>, } impl ChainAdapter { pub fn init(db_root: String) -> Result<ChainAdapter, String> { let target_dir = format!("target/{}", db_root); - let db_env = Arc::new(store::new_env(target_dir.clone())); - let chain_store = - ChainStore::new(db_env).map_err(|e| format!("failed to init chain_store, {:?}", e))?; - let store = Arc::new(chain_store); + let chain_store = ChainStore::new(&target_dir) + .map_err(|e| format!("failed to init chain_store, {:?}", e))?; + let store = Arc::new(RwLock::new(chain_store)); let utxo = Arc::new(RwLock::new(HashSet::new())); Ok(ChainAdapter { store, utxo }) @@ -56,7 +54,8 @@ impl ChainAdapter { pub fn update_db_for_block(&self, block: &Block) { let header = &block.header; let tip = Tip::from_header(header); - let batch = self.store.batch().unwrap(); + let mut s = self.store.write(); + let batch = s.batch().unwrap(); batch.save_block_header(header).unwrap(); batch.save_head(&tip).unwrap(); @@ -102,20 +101,20 @@ impl ChainAdapter { impl BlockChain for ChainAdapter { fn chain_head(&self) -> Result<BlockHeader, PoolError> { - self.store - .head_header() + let s = self.store.read(); + s.head_header() .map_err(|_| PoolError::Other(format!("failed to get chain head"))) } fn get_block_header(&self, hash: &Hash) -> Result<BlockHeader, PoolError> { - self.store - .get_block_header(hash) + let s = self.store.read(); + s.get_block_header(hash) .map_err(|_| PoolError::Other(format!("failed to get block header"))) } fn get_block_sums(&self, hash: &Hash) -> Result<BlockSums, PoolError> { - self.store - .get_block_sums(hash) + let s = self.store.read(); + s.get_block_sums(hash) .map_err(|_| PoolError::Other(format!("failed to get block sums"))) } diff --git a/servers/src/grin/server.rs b/servers/src/grin/server.rs index 80ee209a8..5c6cb3a4a 100644 --- a/servers/src/grin/server.rs +++ b/servers/src/grin/server.rs @@ -43,7 +43,6 @@ use crate::mining::test_miner::Miner; use crate::p2p; use crate::p2p::types::PeerAddr; use crate::pool; -use crate::store; use crate::util::file::get_first_line; use crate::util::{Mutex, RwLock, StopState}; @@ -179,10 +178,8 @@ impl Server { info!("Starting server, genesis block: {}", genesis.hash()); - let db_env = Arc::new(store::new_env(config.db_root.clone())); let shared_chain = Arc::new(chain::Chain::init( config.db_root.clone(), - db_env, chain_adapter.clone(), genesis.clone(), pow::verify_size, @@ -202,13 +199,8 @@ impl Server { init_net_hooks(&config), )); - let peer_db_env = Arc::new(store::new_named_env( - config.db_root.clone(), - "peer".into(), - config.p2p_config.peer_max_count, - )); let p2p_server = Arc::new(p2p::Server::new( - peer_db_env, + &config.db_root, config.p2p_config.capabilities, config.p2p_config.clone(), net_adapter.clone(), diff --git a/servers/src/mining/stratumserver.rs b/servers/src/mining/stratumserver.rs index 26df94f59..b9019f323 100644 --- a/servers/src/mining/stratumserver.rs +++ b/servers/src/mining/stratumserver.rs @@ -743,7 +743,8 @@ impl WorkersList { } pub fn send_to(&self, worker_id: usize, msg: String) { - self.workers_list + let _ = self + .workers_list .read() .get(&worker_id) .unwrap() @@ -753,7 +754,7 @@ impl WorkersList { pub fn broadcast(&self, msg: String) { for worker in self.workers_list.read().values() { - worker.tx.unbounded_send(msg.clone()); + let _ = worker.tx.unbounded_send(msg.clone()); } } diff --git a/store/src/lib.rs b/store/src/lib.rs index eeec7ee38..6261183c1 100644 --- a/store/src/lib.rs +++ b/store/src/lib.rs @@ -27,11 +27,12 @@ use failure; extern crate failure_derive; #[macro_use] extern crate grin_core as core; +extern crate grin_util as util; //use grin_core as core; pub mod leaf_set; -mod lmdb; +pub mod lmdb; pub mod pmmr; pub mod prune_list; pub mod types; diff --git a/store/src/lmdb.rs b/store/src/lmdb.rs index 9cc5b32ef..6106bfb8d 100644 --- a/store/src/lmdb.rs +++ b/store/src/lmdb.rs @@ -23,6 +23,14 @@ use lmdb_zero::traits::CreateCursor; use lmdb_zero::LmdbResultExt; use crate::core::ser; +use crate::util::{RwLock, RwLockReadGuard}; + +/// number of bytes to grow the database by when needed +pub const ALLOC_CHUNK_SIZE: usize = 134_217_728; //128 MB +const RESIZE_PERCENT: f32 = 0.9; +/// Want to ensure that each resize gives us at least this % +/// of total space free +const RESIZE_MIN_TARGET_PERCENT: f32 = 0.65; /// Main error type for this lmdb #[derive(Clone, Eq, PartialEq, Debug, Fail)] @@ -54,77 +62,143 @@ pub fn option_to_not_found<T>(res: Result<Option<T>, Error>, field_name: &str) - } } -/// Create a new LMDB env under the provided directory. -/// By default creates an environment named "lmdb". -/// Be aware of transactional semantics in lmdb -/// (transactions are per environment, not per database). -pub fn new_env(path: String) -> lmdb::Environment { - new_named_env(path, "lmdb".into(), None) -} - -/// TODO - We probably need more flexibility here, 500GB probably too big for peers... -/// Create a new LMDB env under the provided directory with the provided name. -pub fn new_named_env(path: String, name: String, max_readers: Option<u32>) -> lmdb::Environment { - let full_path = [path, name].join("/"); - fs::create_dir_all(&full_path) - .expect("Unable to create directory 'db_root' to store chain_data"); - - let mut env_builder = lmdb::EnvBuilder::new().unwrap(); - env_builder.set_maxdbs(8).unwrap(); - // half a TB should give us plenty room, will be an issue on 32 bits - // (which we don't support anyway) - - #[cfg(not(target_os = "windows"))] - env_builder.set_mapsize(5_368_709_120).unwrap_or_else(|e| { - panic!("Unable to allocate LMDB space: {:?}", e); - }); - //TODO: This is temporary to support (beta) windows support - //Windows allocates the entire file at once, so this needs to - //be changed to allocate as little as possible and increase as needed - #[cfg(target_os = "windows")] - env_builder.set_mapsize(524_288_000).unwrap_or_else(|e| { - panic!("Unable to allocate LMDB space: {:?}", e); - }); - - if let Some(max_readers) = max_readers { - env_builder - .set_maxreaders(max_readers) - .expect("Unable set max_readers"); - } - unsafe { - env_builder - .open(&full_path, lmdb::open::NOTLS, 0o600) - .unwrap() - } -} - /// LMDB-backed store facilitating data access and serialization. All writes /// are done through a Batch abstraction providing atomicity. pub struct Store { env: Arc<lmdb::Environment>, - db: Arc<lmdb::Database<'static>>, + db: RwLock<Option<Arc<lmdb::Database<'static>>>>, + name: String, } impl Store { - /// Creates a new store with the provided name under the specified - /// environment - pub fn open(env: Arc<lmdb::Environment>, name: &str) -> Store { - let db = Arc::new( - lmdb::Database::open( - env.clone(), - Some(name), - &lmdb::DatabaseOptions::new(lmdb::db::CREATE), - ) - .unwrap(), + /// Create a new LMDB env under the provided directory. + /// By default creates an environment named "lmdb". + /// Be aware of transactional semantics in lmdb + /// (transactions are per environment, not per database). + pub fn new(path: &str, name: Option<&str>, max_readers: Option<u32>) -> Result<Store, Error> { + let name = match name { + Some(n) => n.to_owned(), + None => "lmdb".to_owned(), + }; + let full_path = [path.to_owned(), name.clone()].join("/"); + fs::create_dir_all(&full_path) + .expect("Unable to create directory 'db_root' to store chain_data"); + + let mut env_builder = lmdb::EnvBuilder::new().unwrap(); + env_builder.set_maxdbs(8)?; + + if let Some(max_readers) = max_readers { + env_builder.set_maxreaders(max_readers)?; + } + + let env = unsafe { env_builder.open(&full_path, lmdb::open::NOTLS, 0o600)? }; + + debug!( + "DB Mapsize for {} is {}", + full_path, + env.info().as_ref().unwrap().mapsize ); - Store { env, db } + let res = Store { + env: Arc::new(env), + db: RwLock::new(None), + name: name, + }; + + { + let mut w = res.db.write(); + *w = Some(Arc::new(lmdb::Database::open( + res.env.clone(), + Some(&res.name), + &lmdb::DatabaseOptions::new(lmdb::db::CREATE), + )?)); + } + Ok(res) + } + + /// Opens the database environment + pub fn open(&self) -> Result<(), Error> { + let mut w = self.db.write(); + *w = Some(Arc::new(lmdb::Database::open( + self.env.clone(), + Some(&self.name), + &lmdb::DatabaseOptions::new(lmdb::db::CREATE), + )?)); + Ok(()) + } + + /// Determines whether the environment needs a resize based on a simple percentage threshold + pub fn needs_resize(&self) -> Result<bool, Error> { + let env_info = self.env.info()?; + let stat = self.env.stat()?; + + let size_used = stat.psize as usize * env_info.last_pgno; + trace!("DB map size: {}", env_info.mapsize); + trace!("Space used: {}", size_used); + trace!("Space remaining: {}", env_info.mapsize - size_used); + let resize_percent = RESIZE_PERCENT; + trace!( + "Percent used: {:.*} Percent threshold: {:.*}", + 4, + size_used as f64 / env_info.mapsize as f64, + 4, + resize_percent + ); + + if size_used as f32 / env_info.mapsize as f32 > resize_percent + || env_info.mapsize < ALLOC_CHUNK_SIZE + { + trace!("Resize threshold met (percent-based)"); + Ok(true) + } else { + trace!("Resize threshold not met (percent-based)"); + Ok(false) + } + } + + /// Increments the database size by as many ALLOC_CHUNK_SIZES + /// to give a minimum threshold of free space + pub fn do_resize(&self) -> Result<(), Error> { + let env_info = self.env.info()?; + let stat = self.env.stat()?; + let size_used = stat.psize as usize * env_info.last_pgno; + + let new_mapsize = if env_info.mapsize < ALLOC_CHUNK_SIZE { + ALLOC_CHUNK_SIZE + } else { + let mut tot = env_info.mapsize; + while size_used as f32 / tot as f32 > RESIZE_MIN_TARGET_PERCENT { + tot += ALLOC_CHUNK_SIZE; + } + tot + }; + + // close + let mut w = self.db.write(); + *w = None; + + unsafe { + self.env.set_mapsize(new_mapsize)?; + } + + *w = Some(Arc::new(lmdb::Database::open( + self.env.clone(), + Some(&self.name), + &lmdb::DatabaseOptions::new(lmdb::db::CREATE), + )?)); + + info!( + "Resized database from {} to {}", + env_info.mapsize, new_mapsize + ); + Ok(()) } /// Gets a value from the db, provided its key pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Error> { + let db = self.db.read(); let txn = lmdb::ReadTransaction::new(self.env.clone())?; let access = txn.access(); - let res = access.get(&self.db, key); + let res = access.get(&db.as_ref().unwrap(), key); res.map(|res: &[u8]| res.to_vec()) .to_opt() .map_err(From::from) @@ -133,17 +207,19 @@ impl Store { /// Gets a `Readable` value from the db, provided its key. Encapsulates /// serialization. pub fn get_ser<T: ser::Readable>(&self, key: &[u8]) -> Result<Option<T>, Error> { + let db = self.db.read(); let txn = lmdb::ReadTransaction::new(self.env.clone())?; let access = txn.access(); - self.get_ser_access(key, &access) + self.get_ser_access(key, &access, db) } fn get_ser_access<T: ser::Readable>( &self, key: &[u8], access: &lmdb::ConstAccessor<'_>, + db: RwLockReadGuard<'_, Option<Arc<lmdb::Database<'static>>>>, ) -> Result<Option<T>, Error> { - let res: lmdb::error::Result<&[u8]> = access.get(&self.db, key); + let res: lmdb::error::Result<&[u8]> = access.get(&db.as_ref().unwrap(), key); match res.to_opt() { Ok(Some(mut res)) => match ser::deserialize(&mut res) { Ok(res) => Ok(Some(res)), @@ -156,17 +232,19 @@ impl Store { /// Whether the provided key exists pub fn exists(&self, key: &[u8]) -> Result<bool, Error> { + let db = self.db.read(); let txn = lmdb::ReadTransaction::new(self.env.clone())?; let access = txn.access(); - let res: lmdb::error::Result<&lmdb::Ignore> = access.get(&self.db, key); + let res: lmdb::error::Result<&lmdb::Ignore> = access.get(&db.as_ref().unwrap(), key); res.to_opt().map(|r| r.is_some()).map_err(From::from) } /// Produces an iterator of `Readable` types moving forward from the /// provided key. pub fn iter<T: ser::Readable>(&self, from: &[u8]) -> Result<SerIterator<T>, Error> { + let db = self.db.read(); let tx = Arc::new(lmdb::ReadTransaction::new(self.env.clone())?); - let cursor = Arc::new(tx.cursor(self.db.clone()).unwrap()); + let cursor = Arc::new(tx.cursor(db.as_ref().unwrap().clone()).unwrap()); Ok(SerIterator { tx, cursor, @@ -178,6 +256,10 @@ impl Store { /// Builds a new batch to be used with this store. pub fn batch(&self) -> Result<Batch<'_>, Error> { + // check if the db needs resizing before returning the batch + if self.needs_resize()? { + self.do_resize()?; + } let txn = lmdb::WriteTransaction::new(self.env.clone())?; Ok(Batch { store: self, @@ -195,9 +277,10 @@ pub struct Batch<'a> { impl<'a> Batch<'a> { /// Writes a single key/value pair to the db pub fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error> { + let db = self.store.db.read(); self.tx .access() - .put(&self.store.db, key, value, lmdb::put::Flags::empty())?; + .put(&db.as_ref().unwrap(), key, value, lmdb::put::Flags::empty())?; Ok(()) } @@ -231,12 +314,14 @@ impl<'a> Batch<'a> { /// content of the current batch into account. pub fn get_ser<T: ser::Readable>(&self, key: &[u8]) -> Result<Option<T>, Error> { let access = self.tx.access(); - self.store.get_ser_access(key, &access) + let db = self.store.db.read(); + self.store.get_ser_access(key, &access, db) } /// Deletes a key/value pair from the db pub fn delete(&self, key: &[u8]) -> Result<(), Error> { - self.tx.access().del_key(&self.store.db, key)?; + let db = self.store.db.read(); + self.tx.access().del_key(&db.as_ref().unwrap(), key)?; Ok(()) } diff --git a/store/tests/lmdb.rs b/store/tests/lmdb.rs new file mode 100644 index 000000000..ec1039ac5 --- /dev/null +++ b/store/tests/lmdb.rs @@ -0,0 +1,103 @@ +// Copyright 2018 The Grin Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use grin_store as store; +use grin_util as util; + +use grin_core::ser::{self, Readable, Reader, Writeable, Writer}; + +use std::fs; + +const WRITE_CHUNK_SIZE: usize = 20; +const TEST_ALLOC_SIZE: usize = store::lmdb::ALLOC_CHUNK_SIZE / 8 / WRITE_CHUNK_SIZE; + +#[derive(Clone)] +struct PhatChunkStruct { + phatness: u64, +} + +impl PhatChunkStruct { + /// create + pub fn new() -> PhatChunkStruct { + PhatChunkStruct { phatness: 0 } + } +} + +impl Readable for PhatChunkStruct { + fn read(reader: &mut Reader) -> Result<PhatChunkStruct, ser::Error> { + let mut retval = PhatChunkStruct::new(); + for _ in 0..TEST_ALLOC_SIZE { + retval.phatness = reader.read_u64()?; + } + Ok(retval) + } +} + +impl Writeable for PhatChunkStruct { + fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> { + // write many times + for _ in 0..TEST_ALLOC_SIZE { + writer.write_u64(self.phatness)?; + } + Ok(()) + } +} + +fn clean_output_dir(test_dir: &str) { + let _ = fs::remove_dir_all(test_dir); +} + +fn setup(test_dir: &str) { + util::init_test_logger(); + clean_output_dir(test_dir); +} + +#[test] +fn lmdb_allocate() -> Result<(), store::Error> { + let test_dir = "test_output/lmdb_allocate"; + setup(test_dir); + // Allocate more than the initial chunk, ensuring + // the DB resizes underneath + { + let store = store::Store::new(test_dir, Some("test1"), None)?; + + for i in 0..WRITE_CHUNK_SIZE * 2 { + println!("Allocating chunk: {}", i); + let chunk = PhatChunkStruct::new(); + let mut key_val = format!("phat_chunk_set_1_{}", i).as_bytes().to_vec(); + let batch = store.batch()?; + let key = store::to_key(b'P', &mut key_val); + batch.put_ser(&key, &chunk)?; + batch.commit()?; + } + } + println!("***********************************"); + println!("***************NEXT*****************"); + println!("***********************************"); + // Open env again and keep adding + { + let store = store::Store::new(test_dir, Some("test1"), None)?; + for i in 0..WRITE_CHUNK_SIZE * 2 { + println!("Allocating chunk: {}", i); + let chunk = PhatChunkStruct::new(); + let mut key_val = format!("phat_chunk_set_2_{}", i).as_bytes().to_vec(); + let batch = store.batch()?; + let key = store::to_key(b'P', &mut key_val); + batch.put_ser(&key, &chunk)?; + batch.commit()?; + } + } + + Ok(()) +} diff --git a/util/src/lib.rs b/util/src/lib.rs index 08b7cf98d..93883e951 100644 --- a/util/src/lib.rs +++ b/util/src/lib.rs @@ -29,7 +29,7 @@ extern crate lazy_static; extern crate serde_derive; // Re-export so only has to be included once pub use parking_lot::Mutex; -pub use parking_lot::RwLock; +pub use parking_lot::{RwLock, RwLockReadGuard}; // Re-export so only has to be included once pub use secp256k1zkp as secp; diff --git a/util/src/read_write.rs b/util/src/read_write.rs index fc8d7c41e..15e3f3f72 100644 --- a/util/src/read_write.rs +++ b/util/src/read_write.rs @@ -89,7 +89,7 @@ pub fn write_all(stream: &mut dyn Write, mut buf: &[u8], timeout: Duration) -> i return Err(io::Error::new( io::ErrorKind::WriteZero, "failed to write whole buffer", - )) + )); } Ok(n) => buf = &buf[n..], Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}