From f2241a611bd92babefc256a98710b451931d4980 Mon Sep 17 00:00:00 2001 From: scilio Date: Sun, 28 Aug 2022 01:39:58 -0400 Subject: [PATCH] replace submissions HashMap with SwapStore in server.rs --- src/main.rs | 20 ++++++- src/rpc.rs | 4 +- src/server.rs | 148 ++++++++++++++++++++++++++++---------------------- src/store.rs | 57 +++++++++++++++---- 4 files changed, 151 insertions(+), 78 deletions(-) diff --git a/src/main.rs b/src/main.rs index 765adf5..26eb8ad 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,9 @@ use config::ServerConfig; use node::HttpGrinNode; +use store::SwapStore; use wallet::HttpWallet; +use crate::store::StoreError; use clap::App; use grin_core::global::ChainTypes; use grin_util::{StopState, ZeroingString}; @@ -143,6 +145,16 @@ fn real_main() -> Result<(), Box> { &server_config.node_api_secret(), ); + // Open SwapStore + let store = SwapStore::new( + config::get_grin_path(&chain_type) // todo: load from config + .join("db") + .to_str() + .ok_or(StoreError::OpenError(grin_store::lmdb::Error::FileErr( + "db_root path error".to_string(), + )))?, + )?; + let stop_state = Arc::new(StopState::new()); let stop_state_clone = stop_state.clone(); @@ -153,7 +165,13 @@ fn real_main() -> Result<(), Box> { }); // Start the mwixnet JSON-RPC HTTP server - rpc::listen(server_config, Arc::new(wallet), Arc::new(node), stop_state) + rpc::listen( + server_config, + Arc::new(wallet), + Arc::new(node), + store, + stop_state, + ) } async fn build_signals_fut() { diff --git a/src/rpc.rs b/src/rpc.rs index 2d09bca..58bc426 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -3,6 +3,7 @@ use crate::node::GrinNode; use crate::onion::Onion; use crate::secp::{self, ComSignature}; use crate::server::{Server, ServerImpl, SwapError}; +use crate::store::SwapStore; use crate::wallet::Wallet; use grin_util::StopState; @@ -87,9 +88,10 @@ pub fn listen( server_config: ServerConfig, wallet: Arc, node: Arc, + store: SwapStore, stop_state: Arc, ) -> std::result::Result<(), Box> { - let server = ServerImpl::new(server_config.clone(), wallet.clone(), node.clone()); + let server = ServerImpl::new(server_config.clone(), wallet.clone(), node.clone(), store); let server = Arc::new(Mutex::new(server)); let rpc_server = RPCServer { diff --git a/src/server.rs b/src/server.rs index 99dc49d..a082443 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,33 +1,17 @@ use crate::config::ServerConfig; use crate::node::{self, GrinNode}; use crate::onion::{Onion, OnionError}; -use crate::secp::{ComSignature, Commitment, RangeProof, Secp256k1, SecretKey}; +use crate::secp::{ComSignature, Commitment, Secp256k1, SecretKey}; +use crate::store::{StoreError, SwapData, SwapStore}; use crate::wallet::{self, Wallet}; use grin_core::core::{Input, Output, OutputFeatures, TransactionBody}; use grin_core::global::DEFAULT_ACCEPT_FEE_BASE; use itertools::Itertools; -use std::collections::HashMap; use std::result::Result; use std::sync::{Arc, Mutex}; use thiserror::Error; -#[derive(Clone, Debug, PartialEq)] -struct Submission { - /// The total excess for the output commitment - excess: SecretKey, - /// The derived output commitment after applying excess and fee - output_commit: Commitment, - /// The rangeproof, included only for the final hop (node N) - rangeproof: Option, - /// Transaction input being spent - input: Input, - /// Transaction fee - fee: u64, - /// The remaining onion after peeling off our layer - onion: Onion, -} - /// Swap error types #[derive(Clone, Error, Debug, PartialEq)] pub enum SwapError { @@ -47,6 +31,8 @@ pub enum SwapError { PeelOnionFailure(OnionError), #[error("Fee too low (expected >= {minimum_fee:?}, actual {actual_fee:?})")] FeeTooLow { minimum_fee: u64, actual_fee: u64 }, + #[error("Error saving swap to data store: {0}")] + StoreError(StoreError), #[error("{0}")] UnknownError(String), } @@ -69,7 +55,7 @@ pub struct ServerImpl { server_config: ServerConfig, wallet: Arc, node: Arc, - submissions: Arc>>, + store: Arc>, } impl ServerImpl { @@ -78,12 +64,13 @@ impl ServerImpl { server_config: ServerConfig, wallet: Arc, node: Arc, + store: SwapStore, ) -> Self { ServerImpl { server_config, wallet, node, - submissions: Arc::new(Mutex::new(HashMap::new())), + store: Arc::new(Mutex::new(store)), } } @@ -147,40 +134,37 @@ impl Server for ServerImpl { return Err(SwapError::MissingRangeproof); } - let mut locked = self.submissions.lock().unwrap(); - if locked.contains_key(&onion.commit) { - return Err(SwapError::AlreadySwapped { - commit: onion.commit.clone(), - }); - } + let locked = self.store.lock().unwrap(); - locked.insert( - onion.commit, - Submission { + locked + .save_swap(&SwapData { excess: peeled.0.excess, output_commit: peeled.1.commit, rangeproof: peeled.0.rangeproof, input, fee, onion: peeled.1, - }, - ); + }) + .map_err(|e| match e { + StoreError::AlreadyExists(_) => SwapError::AlreadySwapped { + commit: onion.commit.clone(), + }, + _ => SwapError::StoreError(e), + })?; Ok(()) } fn execute_round(&self) -> Result<(), Box> { - let mut locked_state = self.submissions.lock().unwrap(); + let locked_store = self.store.lock().unwrap(); let next_block_height = self.node.get_chain_height()? + 1; - let spendable: Vec = locked_state - .values() - .into_iter() + let spendable: Vec = locked_store + .swaps_iter()? .unique_by(|s| s.output_commit) .filter(|s| { node::is_spendable(&self.node, &s.input.commit, next_block_height).unwrap_or(false) }) .filter(|s| !node::is_unspent(&self.node, &s.output_commit).unwrap_or(true)) - .cloned() .collect(); if spendable.len() == 0 { @@ -219,7 +203,7 @@ impl Server for ServerImpl { )?; self.node.post_tx(&tx)?; - locked_state.clear(); + // todo: Update swap statuses Ok(()) } @@ -273,7 +257,8 @@ mod tests { use crate::secp::{ self, ComSignature, Commitment, PublicKey, RangeProof, Secp256k1, SecretKey, }; - use crate::server::{Server, ServerImpl, Submission, SwapError}; + use crate::server::{Server, ServerImpl, SwapError}; + use crate::store::{SwapData, SwapStore}; use crate::types::Payload; use crate::wallet::mock::MockWallet; @@ -294,9 +279,14 @@ mod tests { } fn new_server( + test_name: &str, server_key: &SecretKey, utxos: &Vec<&Commitment>, ) -> (ServerImpl, Arc) { + global::set_local_chain_type(ChainTypes::AutomatedTesting); + let db_root = format!("./target/tmp/.{}", test_name); + let _ = std::fs::remove_dir_all(db_root.as_str()); + let config = ServerConfig { key: server_key.clone(), interval_s: 1, @@ -315,8 +305,9 @@ mod tests { mut_node.add_default_utxo(&utxo); } let node = Arc::new(mut_node); + let store = SwapStore::new(db_root.as_str()).unwrap(); - let server = ServerImpl::new(config, wallet.clone(), node.clone()); + let server = ServerImpl::new(config, wallet.clone(), node.clone(), store); (server, node) } @@ -370,14 +361,14 @@ mod tests { let onion = test_util::create_onion(&input_commit, &vec![hop])?; let comsig = ComSignature::sign(value, &blind, &onion.serialize()?)?; - let (server, node) = new_server(&server_key, &vec![&input_commit]); + let (server, node) = new_server("swap_lifecycle", &server_key, &vec![&input_commit]); server.swap(&onion, &comsig)?; // Make sure entry is added to server. let output_commit = secp::add_excess(&input_commit, &hop_excess)?; let output_commit = secp::sub_value(&output_commit, fee)?; - let expected = Submission { + let expected = SwapData { excess: hop_excess.clone(), output_commit: output_commit.clone(), rangeproof: Some(proof), @@ -391,16 +382,19 @@ mod tests { }; { - let submissions = server.submissions.lock().unwrap(); - assert_eq!(1, submissions.len()); - assert!(submissions.contains_key(&input_commit)); - assert_eq!(&expected, submissions.get(&input_commit).unwrap()); + let store = server.store.lock().unwrap(); + assert_eq!(1, store.swaps_iter().unwrap().count()); + assert!(store.swap_exists(&input_commit).unwrap()); + assert_eq!(expected, store.get_swap(&input_commit).unwrap()); } server.execute_round()?; - // Make sure entry is removed from server.submissions - assert!(server.submissions.lock().unwrap().is_empty()); + // todo: Make sure entry is removed from server.submissions + // assert_eq!( + // 0, + // server.store.lock().unwrap().swaps_iter().unwrap().count() + // ); // check that the transaction was posted let posted_txns = node.get_posted_txns(); @@ -410,7 +404,6 @@ mod tests { assert!(posted_txn.outputs_committed().contains(&output_commit)); // todo: check that outputs also contain the commitment generated by our wallet - global::set_local_chain_type(ChainTypes::AutomatedTesting); posted_txn.validate(Weighting::AsTransaction)?; Ok(()) @@ -433,7 +426,8 @@ mod tests { let onion = test_util::create_onion(&input_commit, &hops)?; let comsig = ComSignature::sign(value, &blind, &onion.serialize()?)?; - let (server, _node) = new_server(&server_key, &vec![&input_commit]); + let (server, _node) = + new_server("swap_too_many_payloads", &server_key, &vec![&input_commit]); let result = server.swap(&onion, &comsig); assert_eq!( Err(SwapError::InvalidPayloadLength { @@ -443,8 +437,11 @@ mod tests { result ); - // Make sure no entry is added to server.submissions - assert!(server.submissions.lock().unwrap().is_empty()); + // Make sure no entry is added to the store + assert_eq!( + 0, + server.store.lock().unwrap().swaps_iter().unwrap().count() + ); Ok(()) } @@ -467,12 +464,19 @@ mod tests { let wrong_blind = secp::random_secret(); let comsig = ComSignature::sign(value, &wrong_blind, &onion.serialize()?)?; - let (server, _node) = new_server(&server_key, &vec![&input_commit]); + let (server, _node) = new_server( + "swap_invalid_com_signature", + &server_key, + &vec![&input_commit], + ); let result = server.swap(&onion, &comsig); assert_eq!(Err(SwapError::InvalidComSignature), result); - // Make sure no entry is added to server.submissions - assert!(server.submissions.lock().unwrap().is_empty()); + // Make sure no entry is added to the store + assert_eq!( + 0, + server.store.lock().unwrap().swaps_iter().unwrap().count() + ); Ok(()) } @@ -494,12 +498,16 @@ mod tests { let onion = test_util::create_onion(&input_commit, &vec![hop])?; let comsig = ComSignature::sign(value, &blind, &onion.serialize()?)?; - let (server, _node) = new_server(&server_key, &vec![&input_commit]); + let (server, _node) = + new_server("swap_invalid_rangeproof", &server_key, &vec![&input_commit]); let result = server.swap(&onion, &comsig); assert_eq!(Err(SwapError::InvalidRangeproof), result); - // Make sure no entry is added to server.submissions - assert!(server.submissions.lock().unwrap().is_empty()); + // Make sure no entry is added to the store + assert_eq!( + 0, + server.store.lock().unwrap().swaps_iter().unwrap().count() + ); Ok(()) } @@ -519,12 +527,16 @@ mod tests { let onion = test_util::create_onion(&input_commit, &vec![hop])?; let comsig = ComSignature::sign(value, &blind, &onion.serialize()?)?; - let (server, _node) = new_server(&server_key, &vec![&input_commit]); + let (server, _node) = + new_server("swap_missing_rangeproof", &server_key, &vec![&input_commit]); let result = server.swap(&onion, &comsig); assert_eq!(Err(SwapError::MissingRangeproof), result); - // Make sure no entry is added to server.submissions - assert!(server.submissions.lock().unwrap().is_empty()); + // Make sure no entry is added to the store + assert_eq!( + 0, + server.store.lock().unwrap().swaps_iter().unwrap().count() + ); Ok(()) } @@ -545,7 +557,7 @@ mod tests { let onion = test_util::create_onion(&input_commit, &vec![hop])?; let comsig = ComSignature::sign(value, &blind, &onion.serialize()?)?; - let (server, _node) = new_server(&server_key, &vec![]); + let (server, _node) = new_server("swap_utxo_missing", &server_key, &vec![]); let result = server.swap(&onion, &comsig); assert_eq!( Err(SwapError::CoinNotFound { @@ -554,8 +566,11 @@ mod tests { result ); - // Make sure no entry is added to server.submissions - assert!(server.submissions.lock().unwrap().is_empty()); + // Make sure no entry is added to the store + assert_eq!( + 0, + server.store.lock().unwrap().swaps_iter().unwrap().count() + ); Ok(()) } @@ -576,7 +591,7 @@ mod tests { let onion = test_util::create_onion(&input_commit, &vec![hop])?; let comsig = ComSignature::sign(value, &blind, &onion.serialize()?)?; - let (server, _node) = new_server(&server_key, &vec![&input_commit]); + let (server, _node) = new_server("swap_already_swapped", &server_key, &vec![&input_commit]); server.swap(&onion, &comsig)?; // Call swap a second time @@ -609,7 +624,8 @@ mod tests { let onion = test_util::create_onion(&input_commit, &vec![hop])?; let comsig = ComSignature::sign(value, &blind, &onion.serialize()?)?; - let (server, _node) = new_server(&server_key, &vec![&input_commit]); + let (server, _node) = + new_server("swap_peel_onion_failure", &server_key, &vec![&input_commit]); let result = server.swap(&onion, &comsig); assert!(result.is_err()); @@ -634,7 +650,7 @@ mod tests { let onion = test_util::create_onion(&input_commit, &vec![hop])?; let comsig = ComSignature::sign(value, &blind, &onion.serialize()?)?; - let (server, _node) = new_server(&server_key, &vec![&input_commit]); + let (server, _node) = new_server("swap_fee_too_low", &server_key, &vec![&input_commit]); let result = server.swap(&onion, &comsig); assert_eq!( Err(SwapError::FeeTooLow { diff --git a/src/store.rs b/src/store.rs index 8f7cd2a..2c8b4c5 100644 --- a/src/store.rs +++ b/src/store.rs @@ -3,7 +3,9 @@ use crate::secp::{self, Commitment, RangeProof, SecretKey}; use crate::types::{read_optional, write_optional}; use grin_core::core::Input; -use grin_core::ser::{self, ProtocolVersion, Readable, Reader, Writeable, Writer}; +use grin_core::ser::{ + self, DeserializationMode, ProtocolVersion, Readable, Reader, Writeable, Writer, +}; use grin_store::{self as store, Store}; use grin_util::ToHex; use thiserror::Error; @@ -78,6 +80,8 @@ pub struct SwapStore { /// Store error types #[derive(Clone, Error, Debug, PartialEq)] pub enum StoreError { + #[error("Swap entry already exists for '{0:?}'")] + AlreadyExists(Commitment), #[error("Error occurred while attempting to open db: {0}")] OpenError(store::lmdb::Error), #[error("Serialization error occurred: {0}")] @@ -96,7 +100,6 @@ impl From for StoreError { impl SwapStore { /// Create new chain store - #[allow(dead_code)] pub fn new(db_root: &str) -> Result { let db = Store::new(db_root, Some(DB_NAME), Some(STORE_SUBPATH), None) .map_err(StoreError::OpenError)?; @@ -109,10 +112,16 @@ impl SwapStore { prefix: u8, k: K, value: &Vec, - ) -> Result<(), store::lmdb::Error> { + ) -> Result { let batch = self.db.batch()?; - batch.put(&store::to_key(prefix, k)[..], &value[..])?; - batch.commit() + let key = store::to_key(prefix, k); + if batch.exists(&key[..])? { + Ok(false) + } else { + batch.put(&key[..], &value[..])?; + batch.commit()?; + Ok(true) + } } /// Reads a single value by key @@ -124,16 +133,44 @@ impl SwapStore { } /// Saves a swap to the database - #[allow(dead_code)] pub fn save_swap(&self, s: &SwapData) -> Result<(), StoreError> { let data = ser::ser_vec(&s, ProtocolVersion::local())?; - self.write(SWAP_PREFIX, &s.output_commit, &data) - .map_err(StoreError::WriteError) + let saved = self + .write(SWAP_PREFIX, &s.input.commit, &data) + .map_err(StoreError::WriteError)?; + if !saved { + Err(StoreError::AlreadyExists(s.input.commit.clone())) + } else { + Ok(()) + } + } + + /// Iterator over all swaps. + pub fn swaps_iter(&self) -> Result, StoreError> { + let key = store::to_key(SWAP_PREFIX, ""); + let protocol_version = self.db.protocol_version(); + self.db + .iter(&key[..], move |_, mut v| { + ser::deserialize(&mut v, protocol_version, DeserializationMode::default()) + .map_err(From::from) + }) + .map_err(|e| StoreError::ReadError(e)) + } + + /// Checks if a matching swap exists in the database + #[allow(dead_code)] + pub fn swap_exists(&self, input_commit: &Commitment) -> Result { + let key = store::to_key(SWAP_PREFIX, input_commit); + self.db + .batch() + .map_err(StoreError::ReadError)? + .exists(&key[..]) + .map_err(StoreError::ReadError) } /// Reads a swap from the database #[allow(dead_code)] - pub fn get_swap(&self, commit: &Commitment) -> Result { - self.read(SWAP_PREFIX, commit) + pub fn get_swap(&self, input_commit: &Commitment) -> Result { + self.read(SWAP_PREFIX, input_commit) } }