mirror of
https://github.com/mimblewimble/grin.git
synced 2025-04-19 00:41:15 +03:00
Dynamic LMDB mapsize allocation [1.1.0] (#2605)
* dynamically resize lmdb * rustfmt * explicitly close db before resizing * rustfmt * test fix * rustfmt * pool tests * chain fix * merge * move RwLock into Store, ensure resize gives a min threshold * rustfmt * move locks based on testing * rustfmt
This commit is contained in:
parent
d560a36dd6
commit
beaae28d70
23 changed files with 283 additions and 135 deletions
2
Cargo.lock
generated
2
Cargo.lock
generated
|
@ -747,7 +747,6 @@ dependencies = [
|
|||
"grin_store 1.1.0",
|
||||
"grin_util 1.1.0",
|
||||
"lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"lmdb-zero 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"lru-cache 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"rand 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
|
@ -831,7 +830,6 @@ dependencies = [
|
|||
"grin_pool 1.1.0",
|
||||
"grin_store 1.1.0",
|
||||
"grin_util 1.1.0",
|
||||
"lmdb-zero 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"net2 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"num 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
|
|
|
@ -12,7 +12,6 @@ edition = "2018"
|
|||
[dependencies]
|
||||
bitflags = "1"
|
||||
byteorder = "1"
|
||||
lmdb-zero = "0.4.4"
|
||||
failure = "0.1"
|
||||
failure_derive = "0.1"
|
||||
croaring = "0.3"
|
||||
|
|
|
@ -24,7 +24,6 @@ use crate::core::core::{
|
|||
use crate::core::global;
|
||||
use crate::core::pow;
|
||||
use crate::error::{Error, ErrorKind};
|
||||
use crate::lmdb;
|
||||
use crate::pipe;
|
||||
use crate::store;
|
||||
use crate::txhashset;
|
||||
|
@ -160,7 +159,6 @@ impl Chain {
|
|||
/// based on the genesis block if necessary.
|
||||
pub fn init(
|
||||
db_root: String,
|
||||
db_env: Arc<lmdb::Environment>,
|
||||
adapter: Arc<dyn ChainAdapter + Send + Sync>,
|
||||
genesis: Block,
|
||||
pow_verifier: fn(&BlockHeader) -> Result<(), pow::Error>,
|
||||
|
@ -177,7 +175,7 @@ impl Chain {
|
|||
return Err(ErrorKind::Stopped.into());
|
||||
}
|
||||
|
||||
let store = Arc::new(store::ChainStore::new(db_env)?);
|
||||
let store = Arc::new(store::ChainStore::new(&db_root)?);
|
||||
|
||||
// open the txhashset, creating a new one if necessary
|
||||
let mut txhashset = txhashset::TxHashSet::open(db_root.clone(), store.clone(), None)?;
|
||||
|
@ -985,6 +983,7 @@ impl Chain {
|
|||
let mut current = self.get_header_by_height(head.height - horizon - 1)?;
|
||||
|
||||
let batch = self.store.batch()?;
|
||||
|
||||
loop {
|
||||
// Go to the store directly so we can handle NotFoundErr robustly.
|
||||
match self.store.get_block(¤t.hash()) {
|
||||
|
|
|
@ -23,8 +23,6 @@
|
|||
#[macro_use]
|
||||
extern crate bitflags;
|
||||
|
||||
use lmdb_zero as lmdb;
|
||||
|
||||
#[macro_use]
|
||||
extern crate serde_derive;
|
||||
#[macro_use]
|
||||
|
|
|
@ -18,7 +18,6 @@ use crate::core::consensus::HeaderInfo;
|
|||
use crate::core::core::hash::{Hash, Hashed};
|
||||
use crate::core::core::{Block, BlockHeader, BlockSums};
|
||||
use crate::core::pow::Difficulty;
|
||||
use crate::lmdb;
|
||||
use crate::types::Tip;
|
||||
use crate::util::secp::pedersen::Commitment;
|
||||
use croaring::Bitmap;
|
||||
|
@ -45,8 +44,8 @@ pub struct ChainStore {
|
|||
|
||||
impl ChainStore {
|
||||
/// Create new chain store
|
||||
pub fn new(db_env: Arc<lmdb::Environment>) -> Result<ChainStore, Error> {
|
||||
let db = store::Store::open(db_env, STORE_SUBPATH);
|
||||
pub fn new(db_root: &str) -> Result<ChainStore, Error> {
|
||||
let db = store::Store::new(db_root, Some(STORE_SUBPATH.clone()), None)?;
|
||||
Ok(ChainStore { db })
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,7 +26,6 @@ use chrono::Duration;
|
|||
use grin_chain as chain;
|
||||
use grin_core as core;
|
||||
use grin_keychain as keychain;
|
||||
use grin_store as store;
|
||||
use grin_util as util;
|
||||
use std::fs;
|
||||
use std::sync::Arc;
|
||||
|
@ -41,10 +40,8 @@ fn setup(dir_name: &str) -> Chain {
|
|||
global::set_mining_mode(ChainTypes::AutomatedTesting);
|
||||
let genesis_block = pow::mine_genesis_block().unwrap();
|
||||
let verifier_cache = Arc::new(RwLock::new(LruVerifierCache::new()));
|
||||
let db_env = Arc::new(store::new_env(dir_name.to_string()));
|
||||
chain::Chain::init(
|
||||
dir_name.to_string(),
|
||||
db_env,
|
||||
Arc::new(NoopAdapter {}),
|
||||
genesis_block,
|
||||
pow::verify_size,
|
||||
|
@ -57,10 +54,8 @@ fn setup(dir_name: &str) -> Chain {
|
|||
|
||||
fn reload_chain(dir_name: &str) -> Chain {
|
||||
let verifier_cache = Arc::new(RwLock::new(LruVerifierCache::new()));
|
||||
let db_env = Arc::new(store::new_env(dir_name.to_string()));
|
||||
chain::Chain::init(
|
||||
dir_name.to_string(),
|
||||
db_env,
|
||||
Arc::new(NoopAdapter {}),
|
||||
genesis::genesis_dev(),
|
||||
pow::verify_size,
|
||||
|
|
|
@ -28,7 +28,6 @@ use chrono::Duration;
|
|||
use grin_chain as chain;
|
||||
use grin_core as core;
|
||||
use grin_keychain as keychain;
|
||||
use grin_store as store;
|
||||
use grin_util as util;
|
||||
use std::fs;
|
||||
use std::sync::Arc;
|
||||
|
@ -41,10 +40,8 @@ fn setup(dir_name: &str, genesis: Block) -> Chain {
|
|||
util::init_test_logger();
|
||||
clean_output_dir(dir_name);
|
||||
let verifier_cache = Arc::new(RwLock::new(LruVerifierCache::new()));
|
||||
let db_env = Arc::new(store::new_env(dir_name.to_string()));
|
||||
chain::Chain::init(
|
||||
dir_name.to_string(),
|
||||
db_env,
|
||||
Arc::new(NoopAdapter {}),
|
||||
genesis,
|
||||
pow::verify_size,
|
||||
|
@ -532,11 +529,9 @@ where
|
|||
fn actual_diff_iter_output() {
|
||||
global::set_mining_mode(ChainTypes::AutomatedTesting);
|
||||
let genesis_block = pow::mine_genesis_block().unwrap();
|
||||
let db_env = Arc::new(store::new_env(".grin".to_string()));
|
||||
let verifier_cache = Arc::new(RwLock::new(LruVerifierCache::new()));
|
||||
let chain = chain::Chain::init(
|
||||
"../.grin".to_string(),
|
||||
db_env,
|
||||
Arc::new(NoopAdapter {}),
|
||||
genesis_block,
|
||||
pow::verify_size,
|
||||
|
|
|
@ -23,7 +23,6 @@ use env_logger;
|
|||
use grin_chain as chain;
|
||||
use grin_core as core;
|
||||
use grin_keychain as keychain;
|
||||
use grin_store as store;
|
||||
use std::fs;
|
||||
use std::sync::Arc;
|
||||
|
||||
|
@ -53,9 +52,8 @@ fn test_various_store_indices() {
|
|||
|
||||
let keychain = ExtKeychain::from_random_seed(false).unwrap();
|
||||
let key_id = ExtKeychainPath::new(1, 1, 0, 0, 0).to_identifier();
|
||||
let db_env = Arc::new(store::new_env(chain_dir.to_string()));
|
||||
|
||||
let chain_store = Arc::new(chain::store::ChainStore::new(db_env).unwrap());
|
||||
let chain_store = Arc::new(chain::store::ChainStore::new(chain_dir).unwrap());
|
||||
|
||||
global::set_mining_mode(ChainTypes::AutomatedTesting);
|
||||
let genesis = pow::mine_genesis_block().unwrap();
|
||||
|
|
|
@ -26,7 +26,6 @@ use env_logger;
|
|||
use grin_chain as chain;
|
||||
use grin_core as core;
|
||||
use grin_keychain as keychain;
|
||||
use grin_store as store;
|
||||
use grin_util as util;
|
||||
use std::fs;
|
||||
use std::sync::Arc;
|
||||
|
@ -45,10 +44,8 @@ fn test_coinbase_maturity() {
|
|||
|
||||
let verifier_cache = Arc::new(RwLock::new(LruVerifierCache::new()));
|
||||
|
||||
let db_env = Arc::new(store::new_env(".grin".to_string()));
|
||||
let chain = chain::Chain::init(
|
||||
".grin".to_string(),
|
||||
db_env,
|
||||
Arc::new(NoopAdapter {}),
|
||||
genesis_block,
|
||||
pow::verify_size,
|
||||
|
|
|
@ -15,7 +15,6 @@
|
|||
use grin_chain as chain;
|
||||
use grin_core as core;
|
||||
|
||||
use grin_store as store;
|
||||
use grin_util as util;
|
||||
|
||||
use std::collections::HashSet;
|
||||
|
@ -38,8 +37,7 @@ fn clean_output_dir(dir_name: &str) {
|
|||
fn test_unexpected_zip() {
|
||||
let db_root = format!(".grin_txhashset_zip");
|
||||
clean_output_dir(&db_root);
|
||||
let db_env = Arc::new(store::new_env(db_root.clone()));
|
||||
let chain_store = ChainStore::new(db_env).unwrap();
|
||||
let chain_store = ChainStore::new(&db_root).unwrap();
|
||||
let store = Arc::new(chain_store);
|
||||
txhashset::TxHashSet::open(db_root.clone(), store.clone(), None).unwrap();
|
||||
let head = BlockHeader::default();
|
||||
|
|
|
@ -13,7 +13,6 @@ edition = "2018"
|
|||
bitflags = "1"
|
||||
bytes = "0.4"
|
||||
enum_primitive = "0.1"
|
||||
lmdb-zero = "0.4.4"
|
||||
net2 = "0.2"
|
||||
num = "0.1"
|
||||
rand = "0.5"
|
||||
|
|
|
@ -25,7 +25,6 @@ extern crate bitflags;
|
|||
|
||||
#[macro_use]
|
||||
extern crate enum_primitive;
|
||||
use lmdb_zero as lmdb;
|
||||
|
||||
#[macro_use]
|
||||
extern crate grin_core as core;
|
||||
|
|
|
@ -18,8 +18,6 @@ use std::sync::Arc;
|
|||
use std::time::Duration;
|
||||
use std::{io, thread};
|
||||
|
||||
use crate::lmdb;
|
||||
|
||||
use crate::core::core;
|
||||
use crate::core::core::hash::Hash;
|
||||
use crate::core::global;
|
||||
|
@ -48,7 +46,7 @@ pub struct Server {
|
|||
impl Server {
|
||||
/// Creates a new idle p2p server with no peers
|
||||
pub fn new(
|
||||
db_env: Arc<lmdb::Environment>,
|
||||
db_root: &str,
|
||||
capab: Capabilities,
|
||||
config: P2PConfig,
|
||||
adapter: Arc<dyn ChainAdapter>,
|
||||
|
@ -59,7 +57,7 @@ impl Server {
|
|||
config: config.clone(),
|
||||
capabilities: capab,
|
||||
handshake: Arc::new(Handshake::new(genesis, config.clone())),
|
||||
peers: Arc::new(Peers::new(PeerStore::new(db_env)?, adapter, config)),
|
||||
peers: Arc::new(Peers::new(PeerStore::new(db_root)?, adapter, config)),
|
||||
stop_state,
|
||||
})
|
||||
}
|
||||
|
|
|
@ -17,9 +17,6 @@
|
|||
use chrono::Utc;
|
||||
use num::FromPrimitive;
|
||||
use rand::{thread_rng, Rng};
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::lmdb;
|
||||
|
||||
use crate::core::ser::{self, Readable, Reader, Writeable, Writer};
|
||||
use crate::types::{Capabilities, PeerAddr, ReasonForBan};
|
||||
|
@ -117,8 +114,8 @@ pub struct PeerStore {
|
|||
|
||||
impl PeerStore {
|
||||
/// Instantiates a new peer store under the provided root path.
|
||||
pub fn new(db_env: Arc<lmdb::Environment>) -> Result<PeerStore, Error> {
|
||||
let db = grin_store::Store::open(db_env, STORE_SUBPATH);
|
||||
pub fn new(db_root: &str) -> Result<PeerStore, Error> {
|
||||
let db = grin_store::Store::new(db_root, Some(STORE_SUBPATH), None)?;
|
||||
Ok(PeerStore { db: db })
|
||||
}
|
||||
|
||||
|
|
|
@ -15,7 +15,6 @@
|
|||
use grin_core as core;
|
||||
use grin_p2p as p2p;
|
||||
|
||||
use grin_store as store;
|
||||
use grin_util as util;
|
||||
use grin_util::{Mutex, StopState};
|
||||
|
||||
|
@ -50,10 +49,9 @@ fn peer_handshake() {
|
|||
..p2p::P2PConfig::default()
|
||||
};
|
||||
let net_adapter = Arc::new(p2p::DummyAdapter {});
|
||||
let db_env = Arc::new(store::new_env(".grin".to_string()));
|
||||
let server = Arc::new(
|
||||
p2p::Server::new(
|
||||
db_env,
|
||||
".grin",
|
||||
p2p::Capabilities::UNKNOWN,
|
||||
p2p_config.clone(),
|
||||
net_adapter.clone(),
|
||||
|
|
|
@ -29,7 +29,6 @@ use grin_chain as chain;
|
|||
use grin_core as core;
|
||||
use grin_keychain as keychain;
|
||||
use grin_pool as pool;
|
||||
use grin_store as store;
|
||||
use grin_util as util;
|
||||
use std::collections::HashSet;
|
||||
use std::fs;
|
||||
|
@ -37,17 +36,16 @@ use std::sync::Arc;
|
|||
|
||||
#[derive(Clone)]
|
||||
pub struct ChainAdapter {
|
||||
pub store: Arc<ChainStore>,
|
||||
pub store: Arc<RwLock<ChainStore>>,
|
||||
pub utxo: Arc<RwLock<HashSet<Commitment>>>,
|
||||
}
|
||||
|
||||
impl ChainAdapter {
|
||||
pub fn init(db_root: String) -> Result<ChainAdapter, String> {
|
||||
let target_dir = format!("target/{}", db_root);
|
||||
let db_env = Arc::new(store::new_env(target_dir.clone()));
|
||||
let chain_store =
|
||||
ChainStore::new(db_env).map_err(|e| format!("failed to init chain_store, {:?}", e))?;
|
||||
let store = Arc::new(chain_store);
|
||||
let chain_store = ChainStore::new(&target_dir)
|
||||
.map_err(|e| format!("failed to init chain_store, {:?}", e))?;
|
||||
let store = Arc::new(RwLock::new(chain_store));
|
||||
let utxo = Arc::new(RwLock::new(HashSet::new()));
|
||||
|
||||
Ok(ChainAdapter { store, utxo })
|
||||
|
@ -56,7 +54,8 @@ impl ChainAdapter {
|
|||
pub fn update_db_for_block(&self, block: &Block) {
|
||||
let header = &block.header;
|
||||
let tip = Tip::from_header(header);
|
||||
let batch = self.store.batch().unwrap();
|
||||
let mut s = self.store.write();
|
||||
let batch = s.batch().unwrap();
|
||||
|
||||
batch.save_block_header(header).unwrap();
|
||||
batch.save_head(&tip).unwrap();
|
||||
|
@ -102,20 +101,20 @@ impl ChainAdapter {
|
|||
|
||||
impl BlockChain for ChainAdapter {
|
||||
fn chain_head(&self) -> Result<BlockHeader, PoolError> {
|
||||
self.store
|
||||
.head_header()
|
||||
let s = self.store.read();
|
||||
s.head_header()
|
||||
.map_err(|_| PoolError::Other(format!("failed to get chain head")))
|
||||
}
|
||||
|
||||
fn get_block_header(&self, hash: &Hash) -> Result<BlockHeader, PoolError> {
|
||||
self.store
|
||||
.get_block_header(hash)
|
||||
let s = self.store.read();
|
||||
s.get_block_header(hash)
|
||||
.map_err(|_| PoolError::Other(format!("failed to get block header")))
|
||||
}
|
||||
|
||||
fn get_block_sums(&self, hash: &Hash) -> Result<BlockSums, PoolError> {
|
||||
self.store
|
||||
.get_block_sums(hash)
|
||||
let s = self.store.read();
|
||||
s.get_block_sums(hash)
|
||||
.map_err(|_| PoolError::Other(format!("failed to get block sums")))
|
||||
}
|
||||
|
||||
|
|
|
@ -43,7 +43,6 @@ use crate::mining::test_miner::Miner;
|
|||
use crate::p2p;
|
||||
use crate::p2p::types::PeerAddr;
|
||||
use crate::pool;
|
||||
use crate::store;
|
||||
use crate::util::file::get_first_line;
|
||||
use crate::util::{Mutex, RwLock, StopState};
|
||||
|
||||
|
@ -179,10 +178,8 @@ impl Server {
|
|||
|
||||
info!("Starting server, genesis block: {}", genesis.hash());
|
||||
|
||||
let db_env = Arc::new(store::new_env(config.db_root.clone()));
|
||||
let shared_chain = Arc::new(chain::Chain::init(
|
||||
config.db_root.clone(),
|
||||
db_env,
|
||||
chain_adapter.clone(),
|
||||
genesis.clone(),
|
||||
pow::verify_size,
|
||||
|
@ -202,13 +199,8 @@ impl Server {
|
|||
init_net_hooks(&config),
|
||||
));
|
||||
|
||||
let peer_db_env = Arc::new(store::new_named_env(
|
||||
config.db_root.clone(),
|
||||
"peer".into(),
|
||||
config.p2p_config.peer_max_count,
|
||||
));
|
||||
let p2p_server = Arc::new(p2p::Server::new(
|
||||
peer_db_env,
|
||||
&config.db_root,
|
||||
config.p2p_config.capabilities,
|
||||
config.p2p_config.clone(),
|
||||
net_adapter.clone(),
|
||||
|
|
|
@ -743,7 +743,8 @@ impl WorkersList {
|
|||
}
|
||||
|
||||
pub fn send_to(&self, worker_id: usize, msg: String) {
|
||||
self.workers_list
|
||||
let _ = self
|
||||
.workers_list
|
||||
.read()
|
||||
.get(&worker_id)
|
||||
.unwrap()
|
||||
|
@ -753,7 +754,7 @@ impl WorkersList {
|
|||
|
||||
pub fn broadcast(&self, msg: String) {
|
||||
for worker in self.workers_list.read().values() {
|
||||
worker.tx.unbounded_send(msg.clone());
|
||||
let _ = worker.tx.unbounded_send(msg.clone());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -27,11 +27,12 @@ use failure;
|
|||
extern crate failure_derive;
|
||||
#[macro_use]
|
||||
extern crate grin_core as core;
|
||||
extern crate grin_util as util;
|
||||
|
||||
//use grin_core as core;
|
||||
|
||||
pub mod leaf_set;
|
||||
mod lmdb;
|
||||
pub mod lmdb;
|
||||
pub mod pmmr;
|
||||
pub mod prune_list;
|
||||
pub mod types;
|
||||
|
|
|
@ -23,6 +23,14 @@ use lmdb_zero::traits::CreateCursor;
|
|||
use lmdb_zero::LmdbResultExt;
|
||||
|
||||
use crate::core::ser;
|
||||
use crate::util::{RwLock, RwLockReadGuard};
|
||||
|
||||
/// number of bytes to grow the database by when needed
|
||||
pub const ALLOC_CHUNK_SIZE: usize = 134_217_728; //128 MB
|
||||
const RESIZE_PERCENT: f32 = 0.9;
|
||||
/// Want to ensure that each resize gives us at least this %
|
||||
/// of total space free
|
||||
const RESIZE_MIN_TARGET_PERCENT: f32 = 0.65;
|
||||
|
||||
/// Main error type for this lmdb
|
||||
#[derive(Clone, Eq, PartialEq, Debug, Fail)]
|
||||
|
@ -54,77 +62,143 @@ pub fn option_to_not_found<T>(res: Result<Option<T>, Error>, field_name: &str) -
|
|||
}
|
||||
}
|
||||
|
||||
/// Create a new LMDB env under the provided directory.
|
||||
/// By default creates an environment named "lmdb".
|
||||
/// Be aware of transactional semantics in lmdb
|
||||
/// (transactions are per environment, not per database).
|
||||
pub fn new_env(path: String) -> lmdb::Environment {
|
||||
new_named_env(path, "lmdb".into(), None)
|
||||
}
|
||||
|
||||
/// TODO - We probably need more flexibility here, 500GB probably too big for peers...
|
||||
/// Create a new LMDB env under the provided directory with the provided name.
|
||||
pub fn new_named_env(path: String, name: String, max_readers: Option<u32>) -> lmdb::Environment {
|
||||
let full_path = [path, name].join("/");
|
||||
fs::create_dir_all(&full_path)
|
||||
.expect("Unable to create directory 'db_root' to store chain_data");
|
||||
|
||||
let mut env_builder = lmdb::EnvBuilder::new().unwrap();
|
||||
env_builder.set_maxdbs(8).unwrap();
|
||||
// half a TB should give us plenty room, will be an issue on 32 bits
|
||||
// (which we don't support anyway)
|
||||
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
env_builder.set_mapsize(5_368_709_120).unwrap_or_else(|e| {
|
||||
panic!("Unable to allocate LMDB space: {:?}", e);
|
||||
});
|
||||
//TODO: This is temporary to support (beta) windows support
|
||||
//Windows allocates the entire file at once, so this needs to
|
||||
//be changed to allocate as little as possible and increase as needed
|
||||
#[cfg(target_os = "windows")]
|
||||
env_builder.set_mapsize(524_288_000).unwrap_or_else(|e| {
|
||||
panic!("Unable to allocate LMDB space: {:?}", e);
|
||||
});
|
||||
|
||||
if let Some(max_readers) = max_readers {
|
||||
env_builder
|
||||
.set_maxreaders(max_readers)
|
||||
.expect("Unable set max_readers");
|
||||
}
|
||||
unsafe {
|
||||
env_builder
|
||||
.open(&full_path, lmdb::open::NOTLS, 0o600)
|
||||
.unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
/// LMDB-backed store facilitating data access and serialization. All writes
|
||||
/// are done through a Batch abstraction providing atomicity.
|
||||
pub struct Store {
|
||||
env: Arc<lmdb::Environment>,
|
||||
db: Arc<lmdb::Database<'static>>,
|
||||
db: RwLock<Option<Arc<lmdb::Database<'static>>>>,
|
||||
name: String,
|
||||
}
|
||||
|
||||
impl Store {
|
||||
/// Creates a new store with the provided name under the specified
|
||||
/// environment
|
||||
pub fn open(env: Arc<lmdb::Environment>, name: &str) -> Store {
|
||||
let db = Arc::new(
|
||||
lmdb::Database::open(
|
||||
env.clone(),
|
||||
Some(name),
|
||||
&lmdb::DatabaseOptions::new(lmdb::db::CREATE),
|
||||
)
|
||||
.unwrap(),
|
||||
/// Create a new LMDB env under the provided directory.
|
||||
/// By default creates an environment named "lmdb".
|
||||
/// Be aware of transactional semantics in lmdb
|
||||
/// (transactions are per environment, not per database).
|
||||
pub fn new(path: &str, name: Option<&str>, max_readers: Option<u32>) -> Result<Store, Error> {
|
||||
let name = match name {
|
||||
Some(n) => n.to_owned(),
|
||||
None => "lmdb".to_owned(),
|
||||
};
|
||||
let full_path = [path.to_owned(), name.clone()].join("/");
|
||||
fs::create_dir_all(&full_path)
|
||||
.expect("Unable to create directory 'db_root' to store chain_data");
|
||||
|
||||
let mut env_builder = lmdb::EnvBuilder::new().unwrap();
|
||||
env_builder.set_maxdbs(8)?;
|
||||
|
||||
if let Some(max_readers) = max_readers {
|
||||
env_builder.set_maxreaders(max_readers)?;
|
||||
}
|
||||
|
||||
let env = unsafe { env_builder.open(&full_path, lmdb::open::NOTLS, 0o600)? };
|
||||
|
||||
debug!(
|
||||
"DB Mapsize for {} is {}",
|
||||
full_path,
|
||||
env.info().as_ref().unwrap().mapsize
|
||||
);
|
||||
Store { env, db }
|
||||
let res = Store {
|
||||
env: Arc::new(env),
|
||||
db: RwLock::new(None),
|
||||
name: name,
|
||||
};
|
||||
|
||||
{
|
||||
let mut w = res.db.write();
|
||||
*w = Some(Arc::new(lmdb::Database::open(
|
||||
res.env.clone(),
|
||||
Some(&res.name),
|
||||
&lmdb::DatabaseOptions::new(lmdb::db::CREATE),
|
||||
)?));
|
||||
}
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
/// Opens the database environment
|
||||
pub fn open(&self) -> Result<(), Error> {
|
||||
let mut w = self.db.write();
|
||||
*w = Some(Arc::new(lmdb::Database::open(
|
||||
self.env.clone(),
|
||||
Some(&self.name),
|
||||
&lmdb::DatabaseOptions::new(lmdb::db::CREATE),
|
||||
)?));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Determines whether the environment needs a resize based on a simple percentage threshold
|
||||
pub fn needs_resize(&self) -> Result<bool, Error> {
|
||||
let env_info = self.env.info()?;
|
||||
let stat = self.env.stat()?;
|
||||
|
||||
let size_used = stat.psize as usize * env_info.last_pgno;
|
||||
trace!("DB map size: {}", env_info.mapsize);
|
||||
trace!("Space used: {}", size_used);
|
||||
trace!("Space remaining: {}", env_info.mapsize - size_used);
|
||||
let resize_percent = RESIZE_PERCENT;
|
||||
trace!(
|
||||
"Percent used: {:.*} Percent threshold: {:.*}",
|
||||
4,
|
||||
size_used as f64 / env_info.mapsize as f64,
|
||||
4,
|
||||
resize_percent
|
||||
);
|
||||
|
||||
if size_used as f32 / env_info.mapsize as f32 > resize_percent
|
||||
|| env_info.mapsize < ALLOC_CHUNK_SIZE
|
||||
{
|
||||
trace!("Resize threshold met (percent-based)");
|
||||
Ok(true)
|
||||
} else {
|
||||
trace!("Resize threshold not met (percent-based)");
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
/// Increments the database size by as many ALLOC_CHUNK_SIZES
|
||||
/// to give a minimum threshold of free space
|
||||
pub fn do_resize(&self) -> Result<(), Error> {
|
||||
let env_info = self.env.info()?;
|
||||
let stat = self.env.stat()?;
|
||||
let size_used = stat.psize as usize * env_info.last_pgno;
|
||||
|
||||
let new_mapsize = if env_info.mapsize < ALLOC_CHUNK_SIZE {
|
||||
ALLOC_CHUNK_SIZE
|
||||
} else {
|
||||
let mut tot = env_info.mapsize;
|
||||
while size_used as f32 / tot as f32 > RESIZE_MIN_TARGET_PERCENT {
|
||||
tot += ALLOC_CHUNK_SIZE;
|
||||
}
|
||||
tot
|
||||
};
|
||||
|
||||
// close
|
||||
let mut w = self.db.write();
|
||||
*w = None;
|
||||
|
||||
unsafe {
|
||||
self.env.set_mapsize(new_mapsize)?;
|
||||
}
|
||||
|
||||
*w = Some(Arc::new(lmdb::Database::open(
|
||||
self.env.clone(),
|
||||
Some(&self.name),
|
||||
&lmdb::DatabaseOptions::new(lmdb::db::CREATE),
|
||||
)?));
|
||||
|
||||
info!(
|
||||
"Resized database from {} to {}",
|
||||
env_info.mapsize, new_mapsize
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Gets a value from the db, provided its key
|
||||
pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Error> {
|
||||
let db = self.db.read();
|
||||
let txn = lmdb::ReadTransaction::new(self.env.clone())?;
|
||||
let access = txn.access();
|
||||
let res = access.get(&self.db, key);
|
||||
let res = access.get(&db.as_ref().unwrap(), key);
|
||||
res.map(|res: &[u8]| res.to_vec())
|
||||
.to_opt()
|
||||
.map_err(From::from)
|
||||
|
@ -133,17 +207,19 @@ impl Store {
|
|||
/// Gets a `Readable` value from the db, provided its key. Encapsulates
|
||||
/// serialization.
|
||||
pub fn get_ser<T: ser::Readable>(&self, key: &[u8]) -> Result<Option<T>, Error> {
|
||||
let db = self.db.read();
|
||||
let txn = lmdb::ReadTransaction::new(self.env.clone())?;
|
||||
let access = txn.access();
|
||||
self.get_ser_access(key, &access)
|
||||
self.get_ser_access(key, &access, db)
|
||||
}
|
||||
|
||||
fn get_ser_access<T: ser::Readable>(
|
||||
&self,
|
||||
key: &[u8],
|
||||
access: &lmdb::ConstAccessor<'_>,
|
||||
db: RwLockReadGuard<'_, Option<Arc<lmdb::Database<'static>>>>,
|
||||
) -> Result<Option<T>, Error> {
|
||||
let res: lmdb::error::Result<&[u8]> = access.get(&self.db, key);
|
||||
let res: lmdb::error::Result<&[u8]> = access.get(&db.as_ref().unwrap(), key);
|
||||
match res.to_opt() {
|
||||
Ok(Some(mut res)) => match ser::deserialize(&mut res) {
|
||||
Ok(res) => Ok(Some(res)),
|
||||
|
@ -156,17 +232,19 @@ impl Store {
|
|||
|
||||
/// Whether the provided key exists
|
||||
pub fn exists(&self, key: &[u8]) -> Result<bool, Error> {
|
||||
let db = self.db.read();
|
||||
let txn = lmdb::ReadTransaction::new(self.env.clone())?;
|
||||
let access = txn.access();
|
||||
let res: lmdb::error::Result<&lmdb::Ignore> = access.get(&self.db, key);
|
||||
let res: lmdb::error::Result<&lmdb::Ignore> = access.get(&db.as_ref().unwrap(), key);
|
||||
res.to_opt().map(|r| r.is_some()).map_err(From::from)
|
||||
}
|
||||
|
||||
/// Produces an iterator of `Readable` types moving forward from the
|
||||
/// provided key.
|
||||
pub fn iter<T: ser::Readable>(&self, from: &[u8]) -> Result<SerIterator<T>, Error> {
|
||||
let db = self.db.read();
|
||||
let tx = Arc::new(lmdb::ReadTransaction::new(self.env.clone())?);
|
||||
let cursor = Arc::new(tx.cursor(self.db.clone()).unwrap());
|
||||
let cursor = Arc::new(tx.cursor(db.as_ref().unwrap().clone()).unwrap());
|
||||
Ok(SerIterator {
|
||||
tx,
|
||||
cursor,
|
||||
|
@ -178,6 +256,10 @@ impl Store {
|
|||
|
||||
/// Builds a new batch to be used with this store.
|
||||
pub fn batch(&self) -> Result<Batch<'_>, Error> {
|
||||
// check if the db needs resizing before returning the batch
|
||||
if self.needs_resize()? {
|
||||
self.do_resize()?;
|
||||
}
|
||||
let txn = lmdb::WriteTransaction::new(self.env.clone())?;
|
||||
Ok(Batch {
|
||||
store: self,
|
||||
|
@ -195,9 +277,10 @@ pub struct Batch<'a> {
|
|||
impl<'a> Batch<'a> {
|
||||
/// Writes a single key/value pair to the db
|
||||
pub fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error> {
|
||||
let db = self.store.db.read();
|
||||
self.tx
|
||||
.access()
|
||||
.put(&self.store.db, key, value, lmdb::put::Flags::empty())?;
|
||||
.put(&db.as_ref().unwrap(), key, value, lmdb::put::Flags::empty())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -231,12 +314,14 @@ impl<'a> Batch<'a> {
|
|||
/// content of the current batch into account.
|
||||
pub fn get_ser<T: ser::Readable>(&self, key: &[u8]) -> Result<Option<T>, Error> {
|
||||
let access = self.tx.access();
|
||||
self.store.get_ser_access(key, &access)
|
||||
let db = self.store.db.read();
|
||||
self.store.get_ser_access(key, &access, db)
|
||||
}
|
||||
|
||||
/// Deletes a key/value pair from the db
|
||||
pub fn delete(&self, key: &[u8]) -> Result<(), Error> {
|
||||
self.tx.access().del_key(&self.store.db, key)?;
|
||||
let db = self.store.db.read();
|
||||
self.tx.access().del_key(&db.as_ref().unwrap(), key)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
103
store/tests/lmdb.rs
Normal file
103
store/tests/lmdb.rs
Normal file
|
@ -0,0 +1,103 @@
|
|||
// Copyright 2018 The Grin Developers
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use grin_store as store;
|
||||
use grin_util as util;
|
||||
|
||||
use grin_core::ser::{self, Readable, Reader, Writeable, Writer};
|
||||
|
||||
use std::fs;
|
||||
|
||||
const WRITE_CHUNK_SIZE: usize = 20;
|
||||
const TEST_ALLOC_SIZE: usize = store::lmdb::ALLOC_CHUNK_SIZE / 8 / WRITE_CHUNK_SIZE;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct PhatChunkStruct {
|
||||
phatness: u64,
|
||||
}
|
||||
|
||||
impl PhatChunkStruct {
|
||||
/// create
|
||||
pub fn new() -> PhatChunkStruct {
|
||||
PhatChunkStruct { phatness: 0 }
|
||||
}
|
||||
}
|
||||
|
||||
impl Readable for PhatChunkStruct {
|
||||
fn read(reader: &mut Reader) -> Result<PhatChunkStruct, ser::Error> {
|
||||
let mut retval = PhatChunkStruct::new();
|
||||
for _ in 0..TEST_ALLOC_SIZE {
|
||||
retval.phatness = reader.read_u64()?;
|
||||
}
|
||||
Ok(retval)
|
||||
}
|
||||
}
|
||||
|
||||
impl Writeable for PhatChunkStruct {
|
||||
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
|
||||
// write many times
|
||||
for _ in 0..TEST_ALLOC_SIZE {
|
||||
writer.write_u64(self.phatness)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn clean_output_dir(test_dir: &str) {
|
||||
let _ = fs::remove_dir_all(test_dir);
|
||||
}
|
||||
|
||||
fn setup(test_dir: &str) {
|
||||
util::init_test_logger();
|
||||
clean_output_dir(test_dir);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn lmdb_allocate() -> Result<(), store::Error> {
|
||||
let test_dir = "test_output/lmdb_allocate";
|
||||
setup(test_dir);
|
||||
// Allocate more than the initial chunk, ensuring
|
||||
// the DB resizes underneath
|
||||
{
|
||||
let store = store::Store::new(test_dir, Some("test1"), None)?;
|
||||
|
||||
for i in 0..WRITE_CHUNK_SIZE * 2 {
|
||||
println!("Allocating chunk: {}", i);
|
||||
let chunk = PhatChunkStruct::new();
|
||||
let mut key_val = format!("phat_chunk_set_1_{}", i).as_bytes().to_vec();
|
||||
let batch = store.batch()?;
|
||||
let key = store::to_key(b'P', &mut key_val);
|
||||
batch.put_ser(&key, &chunk)?;
|
||||
batch.commit()?;
|
||||
}
|
||||
}
|
||||
println!("***********************************");
|
||||
println!("***************NEXT*****************");
|
||||
println!("***********************************");
|
||||
// Open env again and keep adding
|
||||
{
|
||||
let store = store::Store::new(test_dir, Some("test1"), None)?;
|
||||
for i in 0..WRITE_CHUNK_SIZE * 2 {
|
||||
println!("Allocating chunk: {}", i);
|
||||
let chunk = PhatChunkStruct::new();
|
||||
let mut key_val = format!("phat_chunk_set_2_{}", i).as_bytes().to_vec();
|
||||
let batch = store.batch()?;
|
||||
let key = store::to_key(b'P', &mut key_val);
|
||||
batch.put_ser(&key, &chunk)?;
|
||||
batch.commit()?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
|
@ -29,7 +29,7 @@ extern crate lazy_static;
|
|||
extern crate serde_derive;
|
||||
// Re-export so only has to be included once
|
||||
pub use parking_lot::Mutex;
|
||||
pub use parking_lot::RwLock;
|
||||
pub use parking_lot::{RwLock, RwLockReadGuard};
|
||||
|
||||
// Re-export so only has to be included once
|
||||
pub use secp256k1zkp as secp;
|
||||
|
|
|
@ -89,7 +89,7 @@ pub fn write_all(stream: &mut dyn Write, mut buf: &[u8], timeout: Duration) -> i
|
|||
return Err(io::Error::new(
|
||||
io::ErrorKind::WriteZero,
|
||||
"failed to write whole buffer",
|
||||
))
|
||||
));
|
||||
}
|
||||
Ok(n) => buf = &buf[n..],
|
||||
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
|
||||
|
|
Loading…
Add table
Reference in a new issue