replace submissions HashMap with SwapStore in server.rs

This commit is contained in:
scilio 2022-08-28 01:39:58 -04:00
parent b393222ac8
commit f2241a611b
4 changed files with 151 additions and 78 deletions

View file

@ -1,7 +1,9 @@
use config::ServerConfig;
use node::HttpGrinNode;
use store::SwapStore;
use wallet::HttpWallet;
use crate::store::StoreError;
use clap::App;
use grin_core::global::ChainTypes;
use grin_util::{StopState, ZeroingString};
@ -143,6 +145,16 @@ fn real_main() -> Result<(), Box<dyn std::error::Error>> {
&server_config.node_api_secret(),
);
// Open SwapStore
let store = SwapStore::new(
config::get_grin_path(&chain_type) // todo: load from config
.join("db")
.to_str()
.ok_or(StoreError::OpenError(grin_store::lmdb::Error::FileErr(
"db_root path error".to_string(),
)))?,
)?;
let stop_state = Arc::new(StopState::new());
let stop_state_clone = stop_state.clone();
@ -153,7 +165,13 @@ fn real_main() -> Result<(), Box<dyn std::error::Error>> {
});
// Start the mwixnet JSON-RPC HTTP server
rpc::listen(server_config, Arc::new(wallet), Arc::new(node), stop_state)
rpc::listen(
server_config,
Arc::new(wallet),
Arc::new(node),
store,
stop_state,
)
}
async fn build_signals_fut() {

View file

@ -3,6 +3,7 @@ use crate::node::GrinNode;
use crate::onion::Onion;
use crate::secp::{self, ComSignature};
use crate::server::{Server, ServerImpl, SwapError};
use crate::store::SwapStore;
use crate::wallet::Wallet;
use grin_util::StopState;
@ -87,9 +88,10 @@ pub fn listen(
server_config: ServerConfig,
wallet: Arc<dyn Wallet>,
node: Arc<dyn GrinNode>,
store: SwapStore,
stop_state: Arc<StopState>,
) -> std::result::Result<(), Box<dyn std::error::Error>> {
let server = ServerImpl::new(server_config.clone(), wallet.clone(), node.clone());
let server = ServerImpl::new(server_config.clone(), wallet.clone(), node.clone(), store);
let server = Arc::new(Mutex::new(server));
let rpc_server = RPCServer {

View file

@ -1,33 +1,17 @@
use crate::config::ServerConfig;
use crate::node::{self, GrinNode};
use crate::onion::{Onion, OnionError};
use crate::secp::{ComSignature, Commitment, RangeProof, Secp256k1, SecretKey};
use crate::secp::{ComSignature, Commitment, Secp256k1, SecretKey};
use crate::store::{StoreError, SwapData, SwapStore};
use crate::wallet::{self, Wallet};
use grin_core::core::{Input, Output, OutputFeatures, TransactionBody};
use grin_core::global::DEFAULT_ACCEPT_FEE_BASE;
use itertools::Itertools;
use std::collections::HashMap;
use std::result::Result;
use std::sync::{Arc, Mutex};
use thiserror::Error;
#[derive(Clone, Debug, PartialEq)]
struct Submission {
/// The total excess for the output commitment
excess: SecretKey,
/// The derived output commitment after applying excess and fee
output_commit: Commitment,
/// The rangeproof, included only for the final hop (node N)
rangeproof: Option<RangeProof>,
/// Transaction input being spent
input: Input,
/// Transaction fee
fee: u64,
/// The remaining onion after peeling off our layer
onion: Onion,
}
/// Swap error types
#[derive(Clone, Error, Debug, PartialEq)]
pub enum SwapError {
@ -47,6 +31,8 @@ pub enum SwapError {
PeelOnionFailure(OnionError),
#[error("Fee too low (expected >= {minimum_fee:?}, actual {actual_fee:?})")]
FeeTooLow { minimum_fee: u64, actual_fee: u64 },
#[error("Error saving swap to data store: {0}")]
StoreError(StoreError),
#[error("{0}")]
UnknownError(String),
}
@ -69,7 +55,7 @@ pub struct ServerImpl {
server_config: ServerConfig,
wallet: Arc<dyn Wallet>,
node: Arc<dyn GrinNode>,
submissions: Arc<Mutex<HashMap<Commitment, Submission>>>,
store: Arc<Mutex<SwapStore>>,
}
impl ServerImpl {
@ -78,12 +64,13 @@ impl ServerImpl {
server_config: ServerConfig,
wallet: Arc<dyn Wallet>,
node: Arc<dyn GrinNode>,
store: SwapStore,
) -> Self {
ServerImpl {
server_config,
wallet,
node,
submissions: Arc::new(Mutex::new(HashMap::new())),
store: Arc::new(Mutex::new(store)),
}
}
@ -147,40 +134,37 @@ impl Server for ServerImpl {
return Err(SwapError::MissingRangeproof);
}
let mut locked = self.submissions.lock().unwrap();
if locked.contains_key(&onion.commit) {
return Err(SwapError::AlreadySwapped {
commit: onion.commit.clone(),
});
}
let locked = self.store.lock().unwrap();
locked.insert(
onion.commit,
Submission {
locked
.save_swap(&SwapData {
excess: peeled.0.excess,
output_commit: peeled.1.commit,
rangeproof: peeled.0.rangeproof,
input,
fee,
onion: peeled.1,
},
);
})
.map_err(|e| match e {
StoreError::AlreadyExists(_) => SwapError::AlreadySwapped {
commit: onion.commit.clone(),
},
_ => SwapError::StoreError(e),
})?;
Ok(())
}
fn execute_round(&self) -> Result<(), Box<dyn std::error::Error>> {
let mut locked_state = self.submissions.lock().unwrap();
let locked_store = self.store.lock().unwrap();
let next_block_height = self.node.get_chain_height()? + 1;
let spendable: Vec<Submission> = locked_state
.values()
.into_iter()
let spendable: Vec<SwapData> = locked_store
.swaps_iter()?
.unique_by(|s| s.output_commit)
.filter(|s| {
node::is_spendable(&self.node, &s.input.commit, next_block_height).unwrap_or(false)
})
.filter(|s| !node::is_unspent(&self.node, &s.output_commit).unwrap_or(true))
.cloned()
.collect();
if spendable.len() == 0 {
@ -219,7 +203,7 @@ impl Server for ServerImpl {
)?;
self.node.post_tx(&tx)?;
locked_state.clear();
// todo: Update swap statuses
Ok(())
}
@ -273,7 +257,8 @@ mod tests {
use crate::secp::{
self, ComSignature, Commitment, PublicKey, RangeProof, Secp256k1, SecretKey,
};
use crate::server::{Server, ServerImpl, Submission, SwapError};
use crate::server::{Server, ServerImpl, SwapError};
use crate::store::{SwapData, SwapStore};
use crate::types::Payload;
use crate::wallet::mock::MockWallet;
@ -294,9 +279,14 @@ mod tests {
}
fn new_server(
test_name: &str,
server_key: &SecretKey,
utxos: &Vec<&Commitment>,
) -> (ServerImpl, Arc<MockGrinNode>) {
global::set_local_chain_type(ChainTypes::AutomatedTesting);
let db_root = format!("./target/tmp/.{}", test_name);
let _ = std::fs::remove_dir_all(db_root.as_str());
let config = ServerConfig {
key: server_key.clone(),
interval_s: 1,
@ -315,8 +305,9 @@ mod tests {
mut_node.add_default_utxo(&utxo);
}
let node = Arc::new(mut_node);
let store = SwapStore::new(db_root.as_str()).unwrap();
let server = ServerImpl::new(config, wallet.clone(), node.clone());
let server = ServerImpl::new(config, wallet.clone(), node.clone(), store);
(server, node)
}
@ -370,14 +361,14 @@ mod tests {
let onion = test_util::create_onion(&input_commit, &vec![hop])?;
let comsig = ComSignature::sign(value, &blind, &onion.serialize()?)?;
let (server, node) = new_server(&server_key, &vec![&input_commit]);
let (server, node) = new_server("swap_lifecycle", &server_key, &vec![&input_commit]);
server.swap(&onion, &comsig)?;
// Make sure entry is added to server.
let output_commit = secp::add_excess(&input_commit, &hop_excess)?;
let output_commit = secp::sub_value(&output_commit, fee)?;
let expected = Submission {
let expected = SwapData {
excess: hop_excess.clone(),
output_commit: output_commit.clone(),
rangeproof: Some(proof),
@ -391,16 +382,19 @@ mod tests {
};
{
let submissions = server.submissions.lock().unwrap();
assert_eq!(1, submissions.len());
assert!(submissions.contains_key(&input_commit));
assert_eq!(&expected, submissions.get(&input_commit).unwrap());
let store = server.store.lock().unwrap();
assert_eq!(1, store.swaps_iter().unwrap().count());
assert!(store.swap_exists(&input_commit).unwrap());
assert_eq!(expected, store.get_swap(&input_commit).unwrap());
}
server.execute_round()?;
// Make sure entry is removed from server.submissions
assert!(server.submissions.lock().unwrap().is_empty());
// todo: Make sure entry is removed from server.submissions
// assert_eq!(
// 0,
// server.store.lock().unwrap().swaps_iter().unwrap().count()
// );
// check that the transaction was posted
let posted_txns = node.get_posted_txns();
@ -410,7 +404,6 @@ mod tests {
assert!(posted_txn.outputs_committed().contains(&output_commit));
// todo: check that outputs also contain the commitment generated by our wallet
global::set_local_chain_type(ChainTypes::AutomatedTesting);
posted_txn.validate(Weighting::AsTransaction)?;
Ok(())
@ -433,7 +426,8 @@ mod tests {
let onion = test_util::create_onion(&input_commit, &hops)?;
let comsig = ComSignature::sign(value, &blind, &onion.serialize()?)?;
let (server, _node) = new_server(&server_key, &vec![&input_commit]);
let (server, _node) =
new_server("swap_too_many_payloads", &server_key, &vec![&input_commit]);
let result = server.swap(&onion, &comsig);
assert_eq!(
Err(SwapError::InvalidPayloadLength {
@ -443,8 +437,11 @@ mod tests {
result
);
// Make sure no entry is added to server.submissions
assert!(server.submissions.lock().unwrap().is_empty());
// Make sure no entry is added to the store
assert_eq!(
0,
server.store.lock().unwrap().swaps_iter().unwrap().count()
);
Ok(())
}
@ -467,12 +464,19 @@ mod tests {
let wrong_blind = secp::random_secret();
let comsig = ComSignature::sign(value, &wrong_blind, &onion.serialize()?)?;
let (server, _node) = new_server(&server_key, &vec![&input_commit]);
let (server, _node) = new_server(
"swap_invalid_com_signature",
&server_key,
&vec![&input_commit],
);
let result = server.swap(&onion, &comsig);
assert_eq!(Err(SwapError::InvalidComSignature), result);
// Make sure no entry is added to server.submissions
assert!(server.submissions.lock().unwrap().is_empty());
// Make sure no entry is added to the store
assert_eq!(
0,
server.store.lock().unwrap().swaps_iter().unwrap().count()
);
Ok(())
}
@ -494,12 +498,16 @@ mod tests {
let onion = test_util::create_onion(&input_commit, &vec![hop])?;
let comsig = ComSignature::sign(value, &blind, &onion.serialize()?)?;
let (server, _node) = new_server(&server_key, &vec![&input_commit]);
let (server, _node) =
new_server("swap_invalid_rangeproof", &server_key, &vec![&input_commit]);
let result = server.swap(&onion, &comsig);
assert_eq!(Err(SwapError::InvalidRangeproof), result);
// Make sure no entry is added to server.submissions
assert!(server.submissions.lock().unwrap().is_empty());
// Make sure no entry is added to the store
assert_eq!(
0,
server.store.lock().unwrap().swaps_iter().unwrap().count()
);
Ok(())
}
@ -519,12 +527,16 @@ mod tests {
let onion = test_util::create_onion(&input_commit, &vec![hop])?;
let comsig = ComSignature::sign(value, &blind, &onion.serialize()?)?;
let (server, _node) = new_server(&server_key, &vec![&input_commit]);
let (server, _node) =
new_server("swap_missing_rangeproof", &server_key, &vec![&input_commit]);
let result = server.swap(&onion, &comsig);
assert_eq!(Err(SwapError::MissingRangeproof), result);
// Make sure no entry is added to server.submissions
assert!(server.submissions.lock().unwrap().is_empty());
// Make sure no entry is added to the store
assert_eq!(
0,
server.store.lock().unwrap().swaps_iter().unwrap().count()
);
Ok(())
}
@ -545,7 +557,7 @@ mod tests {
let onion = test_util::create_onion(&input_commit, &vec![hop])?;
let comsig = ComSignature::sign(value, &blind, &onion.serialize()?)?;
let (server, _node) = new_server(&server_key, &vec![]);
let (server, _node) = new_server("swap_utxo_missing", &server_key, &vec![]);
let result = server.swap(&onion, &comsig);
assert_eq!(
Err(SwapError::CoinNotFound {
@ -554,8 +566,11 @@ mod tests {
result
);
// Make sure no entry is added to server.submissions
assert!(server.submissions.lock().unwrap().is_empty());
// Make sure no entry is added to the store
assert_eq!(
0,
server.store.lock().unwrap().swaps_iter().unwrap().count()
);
Ok(())
}
@ -576,7 +591,7 @@ mod tests {
let onion = test_util::create_onion(&input_commit, &vec![hop])?;
let comsig = ComSignature::sign(value, &blind, &onion.serialize()?)?;
let (server, _node) = new_server(&server_key, &vec![&input_commit]);
let (server, _node) = new_server("swap_already_swapped", &server_key, &vec![&input_commit]);
server.swap(&onion, &comsig)?;
// Call swap a second time
@ -609,7 +624,8 @@ mod tests {
let onion = test_util::create_onion(&input_commit, &vec![hop])?;
let comsig = ComSignature::sign(value, &blind, &onion.serialize()?)?;
let (server, _node) = new_server(&server_key, &vec![&input_commit]);
let (server, _node) =
new_server("swap_peel_onion_failure", &server_key, &vec![&input_commit]);
let result = server.swap(&onion, &comsig);
assert!(result.is_err());
@ -634,7 +650,7 @@ mod tests {
let onion = test_util::create_onion(&input_commit, &vec![hop])?;
let comsig = ComSignature::sign(value, &blind, &onion.serialize()?)?;
let (server, _node) = new_server(&server_key, &vec![&input_commit]);
let (server, _node) = new_server("swap_fee_too_low", &server_key, &vec![&input_commit]);
let result = server.swap(&onion, &comsig);
assert_eq!(
Err(SwapError::FeeTooLow {

View file

@ -3,7 +3,9 @@ use crate::secp::{self, Commitment, RangeProof, SecretKey};
use crate::types::{read_optional, write_optional};
use grin_core::core::Input;
use grin_core::ser::{self, ProtocolVersion, Readable, Reader, Writeable, Writer};
use grin_core::ser::{
self, DeserializationMode, ProtocolVersion, Readable, Reader, Writeable, Writer,
};
use grin_store::{self as store, Store};
use grin_util::ToHex;
use thiserror::Error;
@ -78,6 +80,8 @@ pub struct SwapStore {
/// Store error types
#[derive(Clone, Error, Debug, PartialEq)]
pub enum StoreError {
#[error("Swap entry already exists for '{0:?}'")]
AlreadyExists(Commitment),
#[error("Error occurred while attempting to open db: {0}")]
OpenError(store::lmdb::Error),
#[error("Serialization error occurred: {0}")]
@ -96,7 +100,6 @@ impl From<ser::Error> for StoreError {
impl SwapStore {
/// Create new chain store
#[allow(dead_code)]
pub fn new(db_root: &str) -> Result<SwapStore, StoreError> {
let db = Store::new(db_root, Some(DB_NAME), Some(STORE_SUBPATH), None)
.map_err(StoreError::OpenError)?;
@ -109,10 +112,16 @@ impl SwapStore {
prefix: u8,
k: K,
value: &Vec<u8>,
) -> Result<(), store::lmdb::Error> {
) -> Result<bool, store::lmdb::Error> {
let batch = self.db.batch()?;
batch.put(&store::to_key(prefix, k)[..], &value[..])?;
batch.commit()
let key = store::to_key(prefix, k);
if batch.exists(&key[..])? {
Ok(false)
} else {
batch.put(&key[..], &value[..])?;
batch.commit()?;
Ok(true)
}
}
/// Reads a single value by key
@ -124,16 +133,44 @@ impl SwapStore {
}
/// Saves a swap to the database
#[allow(dead_code)]
pub fn save_swap(&self, s: &SwapData) -> Result<(), StoreError> {
let data = ser::ser_vec(&s, ProtocolVersion::local())?;
self.write(SWAP_PREFIX, &s.output_commit, &data)
.map_err(StoreError::WriteError)
let saved = self
.write(SWAP_PREFIX, &s.input.commit, &data)
.map_err(StoreError::WriteError)?;
if !saved {
Err(StoreError::AlreadyExists(s.input.commit.clone()))
} else {
Ok(())
}
}
/// Iterator over all swaps.
pub fn swaps_iter(&self) -> Result<impl Iterator<Item = SwapData>, StoreError> {
let key = store::to_key(SWAP_PREFIX, "");
let protocol_version = self.db.protocol_version();
self.db
.iter(&key[..], move |_, mut v| {
ser::deserialize(&mut v, protocol_version, DeserializationMode::default())
.map_err(From::from)
})
.map_err(|e| StoreError::ReadError(e))
}
/// Checks if a matching swap exists in the database
#[allow(dead_code)]
pub fn swap_exists(&self, input_commit: &Commitment) -> Result<bool, StoreError> {
let key = store::to_key(SWAP_PREFIX, input_commit);
self.db
.batch()
.map_err(StoreError::ReadError)?
.exists(&key[..])
.map_err(StoreError::ReadError)
}
/// Reads a swap from the database
#[allow(dead_code)]
pub fn get_swap(&self, commit: &Commitment) -> Result<SwapData, StoreError> {
self.read(SWAP_PREFIX, commit)
pub fn get_swap(&self, input_commit: &Commitment) -> Result<SwapData, StoreError> {
self.read(SWAP_PREFIX, input_commit)
}
}