converted to async/.await to support integration testing framework

(cherry picked from commit 26c129fa787ffec44f229ad8047d5c375fb4d257)
This commit is contained in:
scilio 2023-12-18 19:12:05 -05:00
parent f4e259f802
commit 389581d759
27 changed files with 3893 additions and 1534 deletions

2915
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -9,46 +9,57 @@ edition = "2021"
members = ["onion"]
[dependencies]
async-std = { version = "1", features = ["tokio02"] }
async-trait = "0.1.74"
blake2 = { package = "blake2-rfc", version = "0.2"}
byteorder = "1"
bytes = "0.5.6"
chacha20 = "0.8.1"
bytes = "1.5.0"
chacha20 = "0.9.1"
chrono = "0.4.31"
clap = { version = "2.33", features = ["yaml"] }
ctrlc = { version = "3.1", features = ["termination"] }
curve25519-dalek = "2.1"
dirs = "2.0"
ed25519-dalek = "1.0.1"
function_name = "0.3.0"
futures = "0.3"
hmac = { version = "0.12.0", features = ["std"]}
hyper = { version = "0.14", features = ["full"] }
hyper = "0.13"
hyper-proxy = "0.9.1"
itertools = { version = "0.10.3"}
jsonrpc-core = "18.0"
jsonrpc-derive = "18.0"
jsonrpc-http-server = "18.0"
hyper-socks2 = "0.5.0"
hyper-timeout = { version = "0.3", features = [] }
itertools = { version = "0.12.0" }
jsonrpc-core = "17.1"
jsonrpc-derive = "17.1"
jsonrpc-http-server = "17.1"
lazy_static = "1"
pbkdf2 = "0.8.0"
rand = "0.7.3"
remove_dir_all = "0.8.2"
ring = "0.16"
rpassword = "4.0"
serde = { version = "1", features= ["derive"]}
serde_derive = "1"
serde_json = "1"
sha2 = "0.10.0"
thiserror = "1.0.31"
tokio = { version = "1", features = ["full"] }
toml = "0.5"
thiserror = "1.0.30"
tokio = { version = "0.2", features = ["full"] }
toml = "0.8.8"
x25519-dalek = "0.6.0"
grin_onion = { path = "./onion" }
grin_secp256k1zkp = { version = "0.7.11", features = ["bullet-proof-sizing"]}
grin_util = "5"
grin_api = { git = "https://github.com/mimblewimble/grin", version = "5.2.0-alpha.1" }
grin_core = { git = "https://github.com/mimblewimble/grin", version = "5.2.0-alpha.1" }
grin_chain = { git = "https://github.com/mimblewimble/grin", version = "5.2.0-alpha.1" }
grin_keychain = { git = "https://github.com/mimblewimble/grin", version = "5.2.0-alpha.1" }
grin_servers = { git = "https://github.com/mimblewimble/grin", version = "5.2.0-alpha.1" }
grin_store = { git = "https://github.com/mimblewimble/grin", version = "5.2.0-alpha.1" }
grin_wallet_api = { git = "https://github.com/mimblewimble/grin-wallet", branch = "master" }
grin_wallet_impls = { git = "https://github.com/mimblewimble/grin-wallet", branch = "master" }
grin_wallet_libwallet = { git = "https://github.com/mimblewimble/grin-wallet", branch = "master" }
grin_wallet_util = { git = "https://github.com/mimblewimble/grin-wallet", branch = "master" }
grin_secp256k1zkp = { version = "0.7.12", features = ["bullet-proof-sizing"]}
grin_api = { git = "https://github.com/mimblewimble/grin", tag = "v5.2.0-beta.3" }
grin_core = { git = "https://github.com/mimblewimble/grin", tag = "v5.2.0-beta.3" }
grin_chain = { git = "https://github.com/mimblewimble/grin", tag = "v5.2.0-beta.3" }
grin_keychain = { git = "https://github.com/mimblewimble/grin", tag = "v5.2.0-beta.3" }
grin_p2p = { git = "https://github.com/mimblewimble/grin", tag = "v5.2.0-beta.3" }
grin_servers = { git = "https://github.com/mimblewimble/grin", tag = "v5.2.0-beta.3" }
grin_store = { git = "https://github.com/mimblewimble/grin", tag = "v5.2.0-beta.3" }
grin_util = { git = "https://github.com/mimblewimble/grin", tag = "v5.2.0-beta.3" }
grin_wallet_api = { git = "https://github.com/mimblewimble/grin-wallet", version = "5.2.0-beta.1" }
grin_wallet_config = { git = "https://github.com/mimblewimble/grin-wallet", version = "5.2.0-beta.1" }
grin_wallet_controller = { git = "https://github.com/mimblewimble/grin-wallet", version = "5.2.0-beta.1" }
grin_wallet_impls = { git = "https://github.com/mimblewimble/grin-wallet", version = "5.2.0-beta.1" }
grin_wallet_libwallet = { git = "https://github.com/mimblewimble/grin-wallet", version = "5.2.0-beta.1" }
grin_wallet_util = { git = "https://github.com/mimblewimble/grin-wallet", version = "5.2.0-beta.1" }
log = "0.4.20"

View file

@ -21,18 +21,18 @@ serde = { version = "1", features= ["derive"]}
serde_derive = "1"
serde_json = "1"
sha2 = "0.10.0"
thiserror = "1.0.31"
thiserror = "1"
toml = "0.5"
x25519-dalek = "0.6.0"
grin_secp256k1zkp = { version = "0.7.11", features = ["bullet-proof-sizing"]}
grin_util = "5"
grin_api = { git = "https://github.com/mimblewimble/grin", version = "5.2.0-alpha.1" }
grin_core = { git = "https://github.com/mimblewimble/grin", version = "5.2.0-alpha.1" }
grin_chain = { git = "https://github.com/mimblewimble/grin", version = "5.2.0-alpha.1" }
grin_keychain = { git = "https://github.com/mimblewimble/grin", version = "5.2.0-alpha.1" }
grin_servers = { git = "https://github.com/mimblewimble/grin", version = "5.2.0-alpha.1" }
grin_store = { git = "https://github.com/mimblewimble/grin", version = "5.2.0-alpha.1" }
grin_wallet_api = { git = "https://github.com/mimblewimble/grin-wallet", branch = "master" }
grin_wallet_impls = { git = "https://github.com/mimblewimble/grin-wallet", branch = "master" }
grin_wallet_libwallet = { git = "https://github.com/mimblewimble/grin-wallet", branch = "master" }
grin_wallet_util = { git = "https://github.com/mimblewimble/grin-wallet", branch = "master" }
grin_secp256k1zkp = { version = "0.7.12", features = ["bullet-proof-sizing"]}
grin_api = { git = "https://github.com/mimblewimble/grin", tag = "v5.2.0-beta.3" }
grin_core = { git = "https://github.com/mimblewimble/grin", tag = "v5.2.0-beta.3" }
grin_chain = { git = "https://github.com/mimblewimble/grin", tag = "v5.2.0-beta.3" }
grin_keychain = { git = "https://github.com/mimblewimble/grin", tag = "v5.2.0-beta.3" }
grin_servers = { git = "https://github.com/mimblewimble/grin", tag = "v5.2.0-beta.3" }
grin_store = { git = "https://github.com/mimblewimble/grin", tag = "v5.2.0-beta.3" }
grin_util = { git = "https://github.com/mimblewimble/grin", tag = "v5.2.0-beta.3" }
grin_wallet_api = { git = "https://github.com/mimblewimble/grin-wallet", version = "5.2.0-beta.1" }
grin_wallet_impls = { git = "https://github.com/mimblewimble/grin-wallet", version = "5.2.0-beta.1" }
grin_wallet_libwallet = { git = "https://github.com/mimblewimble/grin-wallet", version = "5.2.0-beta.1" }
grin_wallet_util = { git = "https://github.com/mimblewimble/grin-wallet", version = "5.2.0-beta.1" }

View file

@ -162,7 +162,7 @@ pub fn sign(sk: &SecretKey, message: &[u8]) -> Result<DalekSignature, DalekError
#[cfg(test)]
mod tests {
use super::*;
use crate::crypto::dalek::test_util::rand_keypair;
use crate::test_util::rand_keypair;
use grin_core::ser::{self, ProtocolVersion};
use grin_util::ToHex;
use rand::Rng;

View file

@ -330,7 +330,7 @@ impl From<ser::Error> for OnionError {
pub mod tests {
use super::*;
use crate::crypto::secp::random_secret;
use crate::{new_hop, Hop};
use crate::{create_onion, new_hop, Hop};
use grin_core::core::FeeFields;
@ -376,7 +376,7 @@ pub mod tests {
hops.push(hop);
}
let mut onion_packet = test_util::create_onion(&commitment, &hops).unwrap();
let mut onion_packet = create_onion(&commitment, &hops).unwrap();
let mut payload = Payload {
next_ephemeral_pk: onion_packet.ephemeral_pubkey.clone(),

View file

@ -19,10 +19,10 @@ use grin_core::ser::{self, Readable, Reader, Writeable, Writer};
/// # Example
///
/// ```
/// let mut writer = vec![];
/// let mut buf = vec![];
/// let optional_value: Option<u32> = Some(10);
/// write_optional(&mut writer, &optional_value);
/// assert_eq!(buf, &[1, 0, 0, 0, 10]);
/// grin_onion::util::write_optional(&mut grin_core::ser::BinWriter::default(&mut buf), &optional_value).unwrap();
/// assert_eq!(&buf, &[1, 0, 0, 0, 10]);
/// ```
pub fn write_optional<O: Writeable, W: Writer>(
writer: &mut W,
@ -58,8 +58,8 @@ pub fn write_optional<O: Writeable, W: Writer>(
///
/// ```
/// let mut buf: &[u8] = &[1, 0, 0, 0, 10];
/// let mut reader = BinReader::new(&mut buf, ProtocolVersion::local(), DeserializationMode::default());
/// let optional_value: Option<u32> = read_optional(&mut reader).unwrap();
/// let mut reader = grin_core::ser::BinReader::new(&mut buf, grin_core::ser::ProtocolVersion::local(), grin_core::ser::DeserializationMode::default());
/// let optional_value: Option<u32> = grin_onion::util::read_optional(&mut reader).unwrap();
/// assert_eq!(optional_value, Some(10));
/// ```
pub fn read_optional<O: Readable, R: Reader>(reader: &mut R) -> Result<Option<O>, ser::Error> {
@ -87,7 +87,7 @@ pub fn read_optional<O: Readable, R: Reader>(reader: &mut R) -> Result<Option<O>
///
/// ```
/// let v = vec![0, 1, 2, 3, 4, 5];
/// let a = vec_to_array::<4>(&v).unwrap();
/// let a = grin_onion::util::vec_to_array::<4>(&v).unwrap();
/// assert_eq!(a, [0, 1, 2, 3]);
/// ```
pub fn vec_to_array<const S: usize>(vec: &Vec<u8>) -> Result<[u8; S], ser::Error> {

View file

@ -1,34 +1,28 @@
use config::ServerConfig;
use node::HttpGrinNode;
use store::SwapStore;
use wallet::HttpWallet;
use mwixnet::config::{self, ServerConfig};
use mwixnet::node::HttpGrinNode;
use mwixnet::servers;
use mwixnet::store::SwapStore;
use mwixnet::tor;
use mwixnet::wallet::HttpWallet;
use crate::client::{MixClient, MixClientImpl};
use crate::node::GrinNode;
use crate::store::StoreError;
use clap::App;
use grin_core::global;
use grin_core::global::ChainTypes;
use grin_onion::crypto;
use grin_onion::crypto::dalek::DalekPublicKey;
use grin_util::{StopState, ZeroingString};
use mwixnet::client::{MixClient, MixClientImpl};
use mwixnet::node::GrinNode;
use mwixnet::store::StoreError;
use rpassword;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::runtime::Runtime;
use std::thread::{sleep, spawn};
use std::time::Duration;
#[macro_use]
extern crate clap;
mod client;
mod config;
mod node;
mod servers;
mod store;
mod tor;
mod tx;
mod wallet;
const DEFAULT_INTERVAL: u32 = 12 * 60 * 60;
fn main() -> Result<(), Box<dyn std::error::Error>> {
@ -37,7 +31,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
}
fn real_main() -> Result<(), Box<dyn std::error::Error>> {
let yml = load_yaml!("../mwixnet.yml");
let yml = load_yaml!("mwixnet.yml");
let args = App::from_yaml(yml).get_matches();
let chain_type = if args.is_present("testnet") {
ChainTypes::Testnet
@ -168,18 +162,22 @@ fn real_main() -> Result<(), Box<dyn std::error::Error>> {
);
// Node API health check
if let Err(e) = node.get_chain_height() {
let mut rt = tokio::runtime::Builder::new()
.threaded_scheduler()
.enable_all()
.build()?;
if let Err(e) = rt.block_on(node.async_get_chain_height()) {
eprintln!("Node communication failure. Is node listening?");
return Err(e.into());
};
// Open wallet
let wallet_pass = prompt_wallet_password(&args.value_of("wallet_pass"));
let wallet = HttpWallet::open_wallet(
let wallet = rt.block_on(HttpWallet::async_open_wallet(
&server_config.wallet_owner_url,
&server_config.wallet_owner_api_secret(),
&wallet_pass,
);
));
let wallet = match wallet {
Ok(w) => w,
Err(e) => {
@ -188,12 +186,14 @@ fn real_main() -> Result<(), Box<dyn std::error::Error>> {
}
};
let mut tor_process = tor::init_tor_listener(&server_config)?;
let mut tor_process = tor::init_tor_listener(
&config::get_grin_path(&chain_type).to_str().unwrap(),
&server_config,
)?;
let stop_state = Arc::new(StopState::new());
let stop_state_clone = stop_state.clone();
let rt = Runtime::new()?;
rt.spawn(async move {
futures::executor::block_on(build_signals_fut());
let _ = tor_process.kill();
@ -213,13 +213,26 @@ fn real_main() -> Result<(), Box<dyn std::error::Error>> {
server_config.server_pubkey().to_hex()
);
servers::mix_rpc::listen(
let (_, http_server) = servers::mix_rpc::listen(
rt.handle(),
server_config,
next_mixer,
Arc::new(wallet),
Arc::new(node),
stop_state,
)
)?;
let close_handle = http_server.close_handle();
let round_handle = spawn(move || loop {
if stop_state.is_stopped() {
close_handle.close();
break;
}
sleep(Duration::from_millis(100));
});
http_server.wait();
round_handle.join().unwrap();
} else {
println!(
"Starting SWAP server with public key {:?}",
@ -237,15 +250,40 @@ fn real_main() -> Result<(), Box<dyn std::error::Error>> {
)?;
// Start the mwixnet JSON-RPC HTTP 'swap' server
servers::swap_rpc::listen(
server_config,
let (swap_server, http_server) = servers::swap_rpc::listen(
rt.handle(),
&server_config,
next_mixer,
Arc::new(wallet),
Arc::new(node),
store,
stop_state,
)
)?;
let close_handle = http_server.close_handle();
let round_handle = spawn(move || {
let mut secs = 0;
loop {
if stop_state.is_stopped() {
close_handle.close();
break;
}
sleep(Duration::from_secs(1));
secs = (secs + 1) % server_config.interval_s;
if secs == 0 {
let server = swap_server.clone();
rt.spawn(async move { server.lock().await.execute_round().await });
//let _ = swap_server.lock().unwrap().execute_round();
}
}
});
http_server.wait();
round_handle.join().unwrap();
}
Ok(())
}
#[cfg(unix)]

View file

@ -1,17 +1,17 @@
use crate::config::ServerConfig;
use crate::crypto::dalek;
use crate::servers::mix_rpc::MixReq;
use crate::tx::TxComponents;
use crate::{tor, DalekPublicKey};
use crate::servers::mix_rpc::{MixReq, MixResp};
use crate::tor;
use grin_onion::crypto::dalek::{self, DalekPublicKey};
use grin_onion::onion::Onion;
use grin_api::client;
use grin_api::json_rpc::build_request;
use async_trait::async_trait;
use grin_api::json_rpc::{build_request, Response};
use grin_core::ser;
use grin_core::ser::ProtocolVersion;
use grin_wallet_util::OnionV3Address;
use hyper::client::HttpConnector;
use hyper_proxy::{Intercept, Proxy, ProxyConnector};
use hyper::header::{ACCEPT, CONTENT_TYPE, USER_AGENT};
use hyper_socks2::SocksConnector;
use serde_json;
use thiserror::Error;
@ -24,16 +24,19 @@ pub enum ClientError {
API(grin_api::Error),
#[error("Dalek Error: {0:?}")]
Dalek(dalek::DalekError),
#[error("Error parsing response: {0:?}")]
ResponseParse(serde_json::Error),
#[error("Error decoding JSON response: {0:?}")]
DecodeResponseError(serde_json::Error),
#[error("Error in JSON-RPC response: {0:?}")]
ResponseError(grin_api::json_rpc::RpcError),
#[error("Custom client error: {0:?}")]
Custom(String),
}
/// A client for consuming a mix API
#[async_trait]
pub trait MixClient: Send + Sync {
/// Swaps the outputs provided and returns the final swapped outputs and kernels.
fn mix_outputs(&self, onions: &Vec<Onion>) -> Result<(Vec<usize>, TxComponents), ClientError>;
async fn mix_outputs(&self, onions: &Vec<Onion>) -> Result<MixResp, ClientError>;
}
pub struct MixClientImpl {
@ -47,63 +50,104 @@ impl MixClientImpl {
MixClientImpl { config, addr }
}
fn send_json_request<D: serde::de::DeserializeOwned>(
async fn async_send_json_request<D: serde::de::DeserializeOwned>(
&self,
addr: &OnionV3Address,
method: &str,
params: &serde_json::Value,
) -> Result<D, ClientError> {
let _tor = tor::init_tor_sender(&self.config).map_err(ClientError::Tor)?;
let proxy = {
let proxy_uri = format!("http://{:?}", self.config.socks_proxy_addr)
.parse()
.unwrap();
let proxy = Proxy::new(Intercept::All, proxy_uri);
//proxy.set_authorization(Authorization::basic("John Doe", "Agent1234"));
let connector = HttpConnector::new();
let proxy_connector = ProxyConnector::from_proxy(connector, proxy).unwrap();
let proxy_uri = format!(
"socks5://{}:{}",
self.config.socks_proxy_addr.ip(),
self.config.socks_proxy_addr.port()
)
.parse()
.unwrap();
let mut connector = HttpConnector::new();
connector.enforce_http(false);
let proxy_connector = SocksConnector {
proxy_addr: proxy_uri,
auth: None,
connector,
};
proxy_connector
};
let url = format!("{}/v1", addr.to_http_str());
let mut req = client::create_post_request(&url, None, &build_request(method, params))
.map_err(ClientError::API)?;
let uri = url.parse().unwrap();
if let Some(headers) = proxy.http_headers(&uri) {
req.headers_mut().extend(headers.clone().into_iter());
let body =
hyper::body::Body::from(serde_json::to_string(&build_request(method, params)).unwrap());
let req = hyper::Request::builder()
.method(hyper::Method::POST)
.uri(url)
.header(USER_AGENT, "grin-client")
.header(ACCEPT, "application/json")
.header(CONTENT_TYPE, "application/json")
.body(body)
.map_err(|e| {
ClientError::API(grin_api::Error::RequestError(format!(
"Cannot make request: {}",
e
)))
})?;
let client = hyper::Client::builder().build::<_, hyper::Body>(proxy);
let res = client.request(req).await.unwrap();
let body_bytes = hyper::body::to_bytes(res.into_body()).await.unwrap();
let res = String::from_utf8(body_bytes.to_vec()).unwrap();
let response: Response =
serde_json::from_str(&res).map_err(ClientError::DecodeResponseError)?;
if let Some(ref e) = response.error {
return Err(ClientError::ResponseError(e.clone()));
}
let res = client::send_request(req).map_err(ClientError::API)?;
let result = match response.result.clone() {
Some(r) => serde_json::from_value(r).map_err(ClientError::DecodeResponseError),
None => serde_json::from_value(serde_json::Value::Null)
.map_err(ClientError::DecodeResponseError),
}?;
serde_json::from_str(&res).map_err(ClientError::ResponseParse)
Ok(result)
}
}
#[async_trait]
impl MixClient for MixClientImpl {
fn mix_outputs(&self, onions: &Vec<Onion>) -> Result<(Vec<usize>, TxComponents), ClientError> {
async fn mix_outputs(&self, onions: &Vec<Onion>) -> Result<MixResp, ClientError> {
let serialized = ser::ser_vec(&onions, ProtocolVersion::local()).unwrap();
let sig =
dalek::sign(&self.config.key, serialized.as_slice()).map_err(ClientError::Dalek)?;
// println!(
// "Created sig ({:?}) with public key ({}) for server ({})",
// &sig,
// DalekPublicKey::from_secret(&self.config.key).to_hex(),
// self.config.next_server.as_ref().unwrap().to_hex()
// );
let mix = MixReq::new(onions.clone(), sig);
let params = serde_json::json!(mix);
let params = serde_json::json!([mix]);
self.send_json_request::<(Vec<usize>, TxComponents)>(&self.addr, "mix", &params)
self.async_send_json_request::<MixResp>(&self.addr, "mix", &params)
.await
}
}
#[cfg(test)]
pub mod mock {
use super::{ClientError, MixClient};
use crate::tx::TxComponents;
use grin_onion::onion::Onion;
use crate::servers::mix_rpc::MixResp;
use async_trait::async_trait;
use std::collections::HashMap;
pub struct MockMixClient {
results: HashMap<Vec<Onion>, (Vec<usize>, TxComponents)>,
results: HashMap<Vec<Onion>, MixResp>,
}
impl MockMixClient {
@ -113,16 +157,14 @@ pub mod mock {
}
}
pub fn set_response(&mut self, onions: &Vec<Onion>, r: (Vec<usize>, TxComponents)) {
pub fn set_response(&mut self, onions: &Vec<Onion>, r: MixResp) {
self.results.insert(onions.clone(), r);
}
}
#[async_trait]
impl MixClient for MockMixClient {
fn mix_outputs(
&self,
onions: &Vec<Onion>,
) -> Result<(Vec<usize>, TxComponents), ClientError> {
async fn mix_outputs(&self, onions: &Vec<Onion>) -> Result<MixResp, ClientError> {
self.results
.get(onions)
.map(|r| Ok(r.clone()))
@ -134,13 +176,13 @@ pub mod mock {
#[cfg(test)]
pub mod test_util {
use super::{ClientError, MixClient};
use crate::crypto::dalek;
use crate::crypto::secp::SecretKey;
use crate::servers::mix::MixServer;
use crate::tx::TxComponents;
use crate::DalekPublicKey;
use crate::servers::mix_rpc::MixResp;
use async_trait::async_trait;
use grin_core::ser;
use grin_core::ser::ProtocolVersion;
use grin_onion::crypto::dalek::{self, DalekPublicKey};
use grin_onion::crypto::secp::SecretKey;
use grin_onion::onion::Onion;
use std::sync::Arc;
@ -152,11 +194,9 @@ pub mod test_util {
pub mix_server: Arc<dyn MixServer>,
}
#[async_trait]
impl MixClient for DirectMixClient {
fn mix_outputs(
&self,
onions: &Vec<Onion>,
) -> Result<(Vec<usize>, TxComponents), ClientError> {
async fn mix_outputs(&self, onions: &Vec<Onion>) -> Result<MixResp, ClientError> {
let serialized = ser::ser_vec(&onions, ProtocolVersion::local()).unwrap();
let sig = dalek::sign(&self.key, serialized.as_slice()).map_err(ClientError::Dalek)?;
@ -165,7 +205,7 @@ pub mod test_util {
serialized.as_slice(),
)
.unwrap();
Ok(self.mix_server.mix_outputs(&onions, &sig).unwrap())
Ok(self.mix_server.mix_outputs(&onions, &sig).await.unwrap())
}
}
}

View file

@ -1,5 +1,5 @@
use crate::crypto::dalek::DalekPublicKey;
use crate::crypto::secp::SecretKey;
use grin_onion::crypto::dalek::DalekPublicKey;
use grin_onion::crypto::secp::SecretKey;
use core::num::NonZeroU32;
use grin_core::global::ChainTypes;
@ -39,10 +39,10 @@ pub struct ServerConfig {
/// path to file containing secret for the grin wallet's owner api
pub wallet_owner_secret_path: Option<String>,
/// public key of the previous mix/swap server (e.g. N_1 if this is N_2)
#[serde(with = "crate::crypto::dalek::option_dalek_pubkey_serde", default)]
#[serde(with = "grin_onion::crypto::dalek::option_dalek_pubkey_serde", default)]
pub prev_server: Option<DalekPublicKey>,
/// public key of the next mix server
#[serde(with = "crate::crypto::dalek::option_dalek_pubkey_serde", default)]
#[serde(with = "grin_onion::crypto::dalek::option_dalek_pubkey_serde", default)]
pub next_server: Option<DalekPublicKey>,
}
@ -186,9 +186,9 @@ struct RawConfig {
grin_node_secret_path: Option<String>,
wallet_owner_url: SocketAddr,
wallet_owner_secret_path: Option<String>,
#[serde(with = "crate::crypto::dalek::option_dalek_pubkey_serde", default)]
#[serde(with = "grin_onion::crypto::dalek::option_dalek_pubkey_serde", default)]
prev_server: Option<DalekPublicKey>,
#[serde(with = "crate::crypto::dalek::option_dalek_pubkey_serde", default)]
#[serde(with = "grin_onion::crypto::dalek::option_dalek_pubkey_serde", default)]
next_server: Option<DalekPublicKey>,
}
@ -289,7 +289,8 @@ pub fn wallet_owner_url(_chain_type: &ChainTypes) -> SocketAddr {
#[cfg(test)]
pub mod test_util {
use crate::{DalekPublicKey, ServerConfig};
use crate::config::ServerConfig;
use grin_onion::crypto::dalek::DalekPublicKey;
use secp256k1zkp::SecretKey;
use std::net::TcpListener;
@ -317,7 +318,7 @@ pub mod test_util {
#[cfg(test)]
mod tests {
use super::*;
use crate::crypto::secp;
use grin_onion::crypto::secp;
#[test]
fn server_key_encrypt() {

21
src/lib.rs Normal file
View file

@ -0,0 +1,21 @@
#[macro_use]
extern crate log;
pub mod client;
pub mod config;
pub mod node;
pub mod servers;
pub mod store;
pub mod tor;
pub mod tx;
pub mod wallet;
pub use client::MixClient;
pub use config::ServerConfig;
pub use node::{GrinNode, HttpGrinNode, NodeError};
pub use servers::mix::{MixError, MixServer};
pub use servers::mix_rpc::listen as mix_listen;
pub use servers::swap::{SwapError, SwapServer};
pub use servers::swap_rpc::listen as swap_listen;
pub use store::{StoreError, SwapStore};
pub use wallet::{HttpWallet, Wallet, WalletError};

View file

@ -1,26 +1,42 @@
use crate::crypto::secp::Commitment;
use grin_onion::crypto::secp::Commitment;
use grin_api::client;
use grin_api::json_rpc::{build_request, Request, Response};
use grin_api::{client, LocatedTxKernel};
use grin_api::{OutputPrintable, OutputType, Tip};
use grin_core::consensus::COINBASE_MATURITY;
use grin_core::core::{Input, OutputFeatures, Transaction};
use grin_util::ToHex;
use async_trait::async_trait;
use serde_json::json;
use std::net::SocketAddr;
use std::sync::Arc;
use thiserror::Error;
#[async_trait]
pub trait GrinNode: Send + Sync {
/// Retrieves the unspent output with a matching commitment
fn get_utxo(&self, output_commit: &Commitment) -> Result<Option<OutputPrintable>, NodeError>;
async fn async_get_utxo(
&self,
output_commit: &Commitment,
) -> Result<Option<OutputPrintable>, NodeError>;
/// Gets the height of the chain tip
fn get_chain_height(&self) -> Result<u64, NodeError>;
async fn async_get_chain_height(&self) -> Result<u64, NodeError>;
/// Posts a transaction to the grin node
fn post_tx(&self, tx: &Transaction) -> Result<(), NodeError>;
async fn async_post_tx(&self, tx: &Transaction) -> Result<(), NodeError>;
/// Returns a LocatedTxKernel based on the kernel excess.
/// The min_height and max_height parameters are both optional.
/// If not supplied, min_height will be set to 0 and max_height will be set to the head of the chain.
/// The method will start at the block height max_height and traverse the kernel MMR backwards, until either the kernel is found or min_height is reached.
async fn async_get_kernel(
&self,
excess: &Commitment,
min_height: Option<u64>,
max_height: Option<u64>,
) -> Result<Option<LocatedTxKernel>, NodeError>;
}
/// Error types for interacting with nodes
@ -35,18 +51,21 @@ pub enum NodeError {
}
/// Checks if a commitment is in the UTXO set
pub fn is_unspent(node: &Arc<dyn GrinNode>, commit: &Commitment) -> Result<bool, NodeError> {
let utxo = node.get_utxo(&commit)?;
pub async fn async_is_unspent(
node: &Arc<dyn GrinNode>,
commit: &Commitment,
) -> Result<bool, NodeError> {
let utxo = node.async_get_utxo(&commit).await?;
Ok(utxo.is_some())
}
/// Checks whether a commitment is spendable at the block height provided
pub fn is_spendable(
pub async fn async_is_spendable(
node: &Arc<dyn GrinNode>,
commit: &Commitment,
next_block_height: u64,
) -> Result<bool, NodeError> {
let output = node.get_utxo(&commit)?;
let output = node.async_get_utxo(&commit).await?;
if let Some(out) = output {
let is_coinbase = match out.output_type {
OutputType::Coinbase => true,
@ -70,11 +89,11 @@ pub fn is_spendable(
}
/// Builds an input for an unspent output commitment
pub fn build_input(
pub async fn async_build_input(
node: &Arc<dyn GrinNode>,
output_commit: &Commitment,
) -> Result<Option<Input>, NodeError> {
let output = node.get_utxo(&output_commit)?;
let output = node.async_get_utxo(&output_commit).await?;
if let Some(out) = output {
let features = match out.output_type {
@ -106,16 +125,20 @@ impl HttpGrinNode {
}
}
fn send_json_request<D: serde::de::DeserializeOwned>(
async fn async_send_request<D: serde::de::DeserializeOwned>(
&self,
method: &str,
params: &serde_json::Value,
) -> Result<D, NodeError> {
let url = format!("http://{}{}", self.node_url, ENDPOINT);
let req = build_request(method, params);
let res =
client::post::<Request, Response>(url.as_str(), self.node_api_secret.clone(), &req)
.map_err(NodeError::ApiCommError)?;
let res = client::post_async::<Request, Response>(
url.as_str(),
&req,
self.node_api_secret.clone(),
)
.await
.map_err(NodeError::ApiCommError)?;
let parsed = res
.clone()
.into_result()
@ -124,8 +147,12 @@ impl HttpGrinNode {
}
}
#[async_trait]
impl GrinNode for HttpGrinNode {
fn get_utxo(&self, output_commit: &Commitment) -> Result<Option<OutputPrintable>, NodeError> {
async fn async_get_utxo(
&self,
output_commit: &Commitment,
) -> Result<Option<OutputPrintable>, NodeError> {
let commits: Vec<String> = vec![output_commit.to_hex()];
let start_height: Option<u64> = None;
let end_height: Option<u64> = None;
@ -139,7 +166,9 @@ impl GrinNode for HttpGrinNode {
include_proof,
include_merkle_proof
]);
let outputs = self.send_json_request::<Vec<OutputPrintable>>("get_outputs", &params)?;
let outputs = self
.async_send_request::<Vec<OutputPrintable>>("get_outputs", &params)
.await?;
if outputs.is_empty() {
return Ok(None);
}
@ -147,29 +176,54 @@ impl GrinNode for HttpGrinNode {
Ok(Some(outputs[0].clone()))
}
fn get_chain_height(&self) -> Result<u64, NodeError> {
async fn async_get_chain_height(&self) -> Result<u64, NodeError> {
let params = json!([]);
let tip_json = self.send_json_request::<serde_json::Value>("get_tip", &params)?;
let tip_json = self
.async_send_request::<serde_json::Value>("get_tip", &params)
.await?;
let tip =
serde_json::from_value::<Tip>(tip_json).map_err(NodeError::DecodeResponseError)?;
Ok(tip.height)
}
fn post_tx(&self, tx: &Transaction) -> Result<(), NodeError> {
async fn async_post_tx(&self, tx: &Transaction) -> Result<(), NodeError> {
let params = json!([tx, true]);
self.send_json_request::<serde_json::Value>("push_transaction", &params)?;
self.async_send_request::<serde_json::Value>("push_transaction", &params)
.await?;
Ok(())
}
async fn async_get_kernel(
&self,
excess: &Commitment,
min_height: Option<u64>,
max_height: Option<u64>,
) -> Result<Option<LocatedTxKernel>, NodeError> {
let params = json!([excess.0.as_ref().to_hex(), min_height, max_height]);
let value = self
.async_send_request::<serde_json::Value>("get_kernel", &params)
.await?;
let contents = format!("{:?}", value);
if contents.contains("NotFound") {
return Ok(None);
}
let located_kernel = serde_json::from_value::<LocatedTxKernel>(value)
.map_err(NodeError::DecodeResponseError)?;
Ok(Some(located_kernel))
}
}
#[cfg(test)]
pub mod mock {
use super::{GrinNode, NodeError};
use crate::crypto::secp::Commitment;
use grin_api::{OutputPrintable, OutputType};
use async_trait::async_trait;
use grin_api::{LocatedTxKernel, OutputPrintable, OutputType};
use grin_core::core::Transaction;
use grin_onion::crypto::secp::Commitment;
use std::collections::HashMap;
use std::sync::RwLock;
@ -178,6 +232,7 @@ pub mod mock {
pub struct MockGrinNode {
utxos: HashMap<Commitment, OutputPrintable>,
txns_posted: RwLock<Vec<Transaction>>,
kernels: HashMap<Commitment, LocatedTxKernel>,
}
impl MockGrinNode {
@ -185,6 +240,7 @@ pub mod mock {
MockGrinNode {
utxos: HashMap::new(),
txns_posted: RwLock::new(Vec::new()),
kernels: HashMap::new(),
}
}
@ -192,6 +248,7 @@ pub mod mock {
let mut node = MockGrinNode {
utxos: HashMap::new(),
txns_posted: RwLock::new(Vec::new()),
kernels: HashMap::new(),
};
for utxo in utxos {
node.add_default_utxo(utxo);
@ -222,10 +279,16 @@ pub mod mock {
let read = self.txns_posted.read().unwrap();
read.clone()
}
pub fn add_kernel(&mut self, kernel: &LocatedTxKernel) {
self.kernels
.insert(kernel.tx_kernel.excess.clone(), kernel.clone());
}
}
#[async_trait]
impl GrinNode for MockGrinNode {
fn get_utxo(
async fn async_get_utxo(
&self,
output_commit: &Commitment,
) -> Result<Option<OutputPrintable>, NodeError> {
@ -236,14 +299,27 @@ pub mod mock {
Ok(None)
}
fn get_chain_height(&self) -> Result<u64, NodeError> {
async fn async_get_chain_height(&self) -> Result<u64, NodeError> {
Ok(100)
}
fn post_tx(&self, tx: &Transaction) -> Result<(), NodeError> {
async fn async_post_tx(&self, tx: &Transaction) -> Result<(), NodeError> {
let mut write = self.txns_posted.write().unwrap();
write.push(tx.clone());
Ok(())
}
async fn async_get_kernel(
&self,
excess: &Commitment,
_min_height: Option<u64>,
_max_height: Option<u64>,
) -> Result<Option<LocatedTxKernel>, NodeError> {
if let Some(kernel) = self.kernels.get(&excess) {
return Ok(Some(kernel.clone()));
}
Ok(None)
}
}
}

View file

@ -1,9 +1,12 @@
use crate::client::MixClient;
use crate::config::ServerConfig;
use crate::tx::TxComponents;
use crate::node::{self, GrinNode};
use crate::tx::{self, TxComponents};
use crate::wallet::Wallet;
use crate::{node, tx, GrinNode};
use crate::servers::mix_rpc::MixResp;
use async_trait::async_trait;
use futures::stream::{self, StreamExt};
use grin_core::core::{Output, OutputFeatures, TransactionBody};
use grin_core::global::DEFAULT_ACCEPT_FEE_BASE;
use grin_core::ser;
@ -47,13 +50,14 @@ pub enum MixError {
}
/// An internal MWixnet server - a "Mixer"
#[async_trait]
pub trait MixServer: Send + Sync {
/// Swaps the outputs provided and returns the final swapped outputs and kernels.
fn mix_outputs(
async fn mix_outputs(
&self,
onions: &Vec<Onion>,
sig: &DalekSignature,
) -> Result<(Vec<usize>, TxComponents), MixError>;
) -> Result<MixResp, MixError>;
}
/// The standard MWixnet "Mixer" implementation
@ -130,15 +134,19 @@ impl MixServerImpl {
Ok(peeled)
}
fn build_final_outputs(
async fn async_build_final_outputs(
&self,
peeled: &Vec<(usize, PeeledOnion)>,
) -> Result<(Vec<usize>, TxComponents), MixError> {
) -> Result<MixResp, MixError> {
// Filter out commitments that already exist in the UTXO set
let filtered: Vec<&(usize, PeeledOnion)> = peeled
.iter()
.filter(|(_, p)| !node::is_unspent(&self.node, &p.onion.commit).unwrap_or(true))
.collect();
let filtered: Vec<&(usize, PeeledOnion)> = stream::iter(peeled.iter())
.filter(|(_, p)| async {
!node::async_is_unspent(&self.node, &p.onion.commit)
.await
.unwrap_or(true)
})
.collect()
.await;
// Build plain outputs for each mix entry
let outputs: Vec<Output> = filtered
@ -158,7 +166,7 @@ impl MixServerImpl {
.map(|(_, p)| p.payload.excess.clone())
.collect();
let components = tx::assemble_components(
let components = tx::async_assemble_components(
&self.wallet,
&TxComponents {
offset: ZERO_KEY,
@ -169,17 +177,21 @@ impl MixServerImpl {
self.get_fee_base(),
fees_paid,
)
.await
.map_err(MixError::TxError)?;
let indices = filtered.iter().map(|(i, _)| *i).collect();
Ok((indices, components))
Ok(MixResp {
indices,
components,
})
}
fn call_next_mixer(
async fn call_next_mixer(
&self,
peeled: &Vec<(usize, PeeledOnion)>,
) -> Result<(Vec<usize>, TxComponents), MixError> {
) -> Result<MixResp, MixError> {
// Sort by commitment
let mut onions_with_index = peeled.clone();
onions_with_index
@ -191,15 +203,16 @@ impl MixServerImpl {
// Call next server
let onions = peeled.iter().map(|(_, p)| p.onion.clone()).collect();
let (mixed_indices, mixed_components) = self
let mixed = self
.mix_client
.as_ref()
.unwrap()
.mix_outputs(&onions)
.await
.map_err(MixError::Client)?;
// Remove filtered entries
let kept_next_indices = HashSet::<_>::from_iter(mixed_indices.clone());
let kept_next_indices = HashSet::<_>::from_iter(mixed.indices.clone());
let filtered_onions: Vec<&(usize, PeeledOnion)> = onions_with_index
.iter()
.filter(|(i, _)| {
@ -221,25 +234,30 @@ impl MixServerImpl {
let indices = kept_next_indices.into_iter().sorted().collect();
let components = tx::assemble_components(
let components = tx::async_assemble_components(
&self.wallet,
&mixed_components,
&mixed.components,
&excesses,
self.get_fee_base(),
fees_paid,
)
.await
.map_err(MixError::TxError)?;
Ok((indices, components))
Ok(MixResp {
indices,
components,
})
}
}
#[async_trait]
impl MixServer for MixServerImpl {
fn mix_outputs(
async fn mix_outputs(
&self,
onions: &Vec<Onion>,
sig: &DalekSignature,
) -> Result<(Vec<usize>, TxComponents), MixError> {
) -> Result<MixResp, MixError> {
// Verify Signature
let serialized = ser::ser_vec(&onions, ProtocolVersion::local()).unwrap();
sig.verify(
@ -252,10 +270,10 @@ impl MixServer for MixServerImpl {
let mut peeled: Vec<(usize, PeeledOnion)> = onions
.iter()
.enumerate()
.filter_map(|(i, o)| {
if let Some(p) = self.peel_onion(&o).ok() {
Some((i, p))
} else {
.filter_map(|(i, o)| match self.peel_onion(&o) {
Ok(p) => Some((i, p)),
Err(e) => {
println!("Error peeling onion: {:?}", e);
None
}
})
@ -271,9 +289,9 @@ impl MixServer for MixServerImpl {
}
if self.server_config.next_server.is_some() {
self.call_next_mixer(&peeled)
self.call_next_mixer(&peeled).await
} else {
self.build_final_outputs(&peeled)
self.async_build_final_outputs(&peeled).await
}
}
}
@ -281,11 +299,13 @@ impl MixServer for MixServerImpl {
#[cfg(test)]
mod test_util {
use crate::client::test_util::DirectMixClient;
use crate::client::MixClient;
use crate::config;
use crate::node::mock::MockGrinNode;
use crate::wallet::mock::MockWallet;
use crate::{config, DalekPublicKey, MixClient};
use crate::servers::mix::MixServerImpl;
use crate::wallet::mock::MockWallet;
use grin_onion::crypto::dalek::DalekPublicKey;
use secp256k1zkp::SecretKey;
use std::sync::Arc;
@ -320,10 +340,11 @@ mod test_util {
#[cfg(test)]
mod tests {
use crate::client::MixClient;
use crate::node::mock::MockGrinNode;
use crate::{DalekPublicKey, MixClient};
use ::function_name::named;
use grin_onion::crypto::dalek::DalekPublicKey;
use grin_onion::crypto::secp::{self, Commitment};
use grin_onion::test_util as onion_test_util;
use grin_onion::{create_onion, new_hop, Hop};
@ -373,9 +394,9 @@ mod tests {
/// * Swap Server - Simulated by test
/// * Mixer 1 - Internal MixServerImpl directly called by test
/// * Mixer 2 - Final MixServerImpl called by Mixer 1
#[test]
#[tokio::test]
#[named]
fn mix_lifecycle() -> Result<(), Box<dyn std::error::Error>> {
async fn mix_lifecycle() -> Result<(), Box<dyn std::error::Error>> {
init_test!();
// Setup Input(s)
@ -426,13 +447,15 @@ mod tests {
// Simulate the swap server peeling the onion and then calling mix1
let mix1_onion = onion.peel_layer(&swap_vars.sk)?;
let (mixed_indices, mixed_components) =
mixer1_client.mix_outputs(&vec![mix1_onion.onion.clone()])?;
let mixed = mixer1_client
.mix_outputs(&vec![mix1_onion.onion.clone()])
.await?;
// Verify 3 outputs are returned: mixed output, mixer1's output, and mixer2's output
assert_eq!(mixed_indices, vec![0 as usize]);
assert_eq!(mixed_components.outputs.len(), 3);
let output_commits: HashSet<Commitment> = mixed_components
assert_eq!(mixed.indices, vec![0 as usize]);
assert_eq!(mixed.components.outputs.len(), 3);
let output_commits: HashSet<Commitment> = mixed
.components
.outputs
.iter()
.map(|o| o.identifier.commit.clone())

View file

@ -1,19 +1,19 @@
use crate::client::MixClient;
use crate::config::ServerConfig;
use crate::crypto::dalek::{self, DalekSignature};
use crate::node::GrinNode;
use crate::servers::mix::{MixError, MixServer, MixServerImpl};
use crate::wallet::Wallet;
use crate::tx::TxComponents;
use futures::FutureExt;
use grin_onion::crypto::dalek::{self, DalekSignature};
use grin_onion::onion::Onion;
use grin_util::StopState;
use jsonrpc_core::BoxFuture;
use jsonrpc_derive::rpc;
use jsonrpc_http_server::jsonrpc_core::{self as jsonrpc, IoHandler};
use jsonrpc_http_server::{DomainsValidation, ServerBuilder};
use serde::{Deserialize, Serialize};
use std::sync::{Arc, Mutex};
use std::thread::{sleep, spawn};
use std::time::Duration;
use std::sync::Arc;
#[derive(Serialize, Deserialize)]
pub struct MixReq {
@ -22,6 +22,12 @@ pub struct MixReq {
sig: DalekSignature,
}
#[derive(Clone, Serialize, Deserialize)]
pub struct MixResp {
pub indices: Vec<usize>,
pub components: TxComponents,
}
impl MixReq {
pub fn new(onions: Vec<Onion>, sig: DalekSignature) -> Self {
MixReq { onions, sig }
@ -31,22 +37,23 @@ impl MixReq {
#[rpc(server)]
pub trait MixAPI {
#[rpc(name = "mix")]
fn mix(&self, mix: MixReq) -> jsonrpc::Result<jsonrpc::Value>;
fn mix(&self, mix: MixReq) -> BoxFuture<jsonrpc::Result<MixResp>>;
}
#[derive(Clone)]
struct RPCMixServer {
server_config: ServerConfig,
server: Arc<Mutex<dyn MixServer>>,
server: Arc<tokio::sync::Mutex<dyn MixServer>>,
}
impl RPCMixServer {
/// Spin up an instance of the JSON-RPC HTTP server.
fn start_http(&self) -> jsonrpc_http_server::Server {
fn start_http(&self, runtime_handle: tokio::runtime::Handle) -> jsonrpc_http_server::Server {
let mut io = IoHandler::new();
io.extend_with(RPCMixServer::to_delegate(self.clone()));
ServerBuilder::new(io)
.event_loop_executor(runtime_handle)
.cors(DomainsValidation::Disabled)
.request_middleware(|request: hyper::Request<hyper::Body>| {
if request.uri() == "/v1" {
@ -67,50 +74,48 @@ impl From<MixError> for jsonrpc::Error {
}
impl MixAPI for RPCMixServer {
fn mix(&self, mix: MixReq) -> jsonrpc::Result<jsonrpc::Value> {
self.server
.lock()
.unwrap()
.mix_outputs(&mix.onions, &mix.sig)?;
Ok(jsonrpc::Value::String("success".into()))
fn mix(&self, mix: MixReq) -> BoxFuture<jsonrpc::Result<MixResp>> {
let server = self.server.clone();
async move {
let response = server
.lock()
.await
.mix_outputs(&mix.onions, &mix.sig)
.await?;
Ok(response)
}
.boxed()
}
}
/// Spin up the JSON-RPC web server
pub fn listen(
rt_handle: &tokio::runtime::Handle,
server_config: ServerConfig,
next_server: Option<Arc<dyn MixClient>>,
wallet: Arc<dyn Wallet>,
node: Arc<dyn GrinNode>,
stop_state: Arc<StopState>,
) -> Result<(), Box<dyn std::error::Error>> {
) -> Result<
(
Arc<tokio::sync::Mutex<dyn MixServer>>,
jsonrpc_http_server::Server,
),
Box<dyn std::error::Error>,
> {
let server = MixServerImpl::new(
server_config.clone(),
next_server,
wallet.clone(),
node.clone(),
);
let server = Arc::new(Mutex::new(server));
let server = Arc::new(tokio::sync::Mutex::new(server));
let rpc_server = RPCMixServer {
server_config: server_config.clone(),
server: server.clone(),
};
let http_server = rpc_server.start_http();
let http_server = rpc_server.start_http(rt_handle.clone());
let close_handle = http_server.close_handle();
let round_handle = spawn(move || loop {
if stop_state.is_stopped() {
close_handle.close();
break;
}
sleep(Duration::from_millis(100));
});
http_server.wait();
round_handle.join().unwrap();
Ok(())
Ok((server, http_server))
}

View file

@ -1,21 +1,21 @@
use crate::client::MixClient;
use crate::config::ServerConfig;
use crate::crypto::comsig::ComSignature;
use crate::crypto::secp::{Commitment, Secp256k1, SecretKey};
use crate::node::{self, GrinNode};
use crate::store::{StoreError, SwapData, SwapStatus, SwapStore};
use crate::tx;
use crate::wallet::Wallet;
use grin_core::core::hash::Hashed;
use async_trait::async_trait;
use grin_core::core::{Input, Output, OutputFeatures, Transaction, TransactionBody};
use grin_core::global::DEFAULT_ACCEPT_FEE_BASE;
use grin_onion::crypto::comsig::ComSignature;
use grin_onion::crypto::secp::{Commitment, Secp256k1, SecretKey};
use grin_onion::onion::{Onion, OnionError};
use itertools::Itertools;
use secp256k1zkp::key::ZERO_KEY;
use std::collections::HashSet;
use std::result::Result;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use thiserror::Error;
/// Swap error types
@ -39,20 +39,43 @@ pub enum SwapError {
FeeTooLow { minimum_fee: u64, actual_fee: u64 },
#[error("Error saving swap to data store: {0}")]
StoreError(StoreError),
#[error("Error building transaction: {0}")]
TxError(String),
#[error("Node communication error: {0}")]
NodeError(String),
#[error("Client communication error: {0:?}")]
ClientError(String),
#[error("{0}")]
UnknownError(String),
}
impl From<StoreError> for SwapError {
fn from(e: StoreError) -> SwapError {
SwapError::StoreError(e)
}
}
impl From<tx::TxError> for SwapError {
fn from(e: tx::TxError) -> SwapError {
SwapError::TxError(e.to_string())
}
}
impl From<node::NodeError> for SwapError {
fn from(e: node::NodeError) -> SwapError {
SwapError::NodeError(e.to_string())
}
}
/// A public MWixnet server - the "Swap Server"
#[async_trait]
pub trait SwapServer: Send + Sync {
/// Submit a new output to be swapped.
fn swap(&self, onion: &Onion, comsig: &ComSignature) -> Result<(), SwapError>;
async fn swap(&self, onion: &Onion, comsig: &ComSignature) -> Result<(), SwapError>;
/// Iterate through all saved submissions, filter out any inputs that are no longer spendable,
/// and assemble the coinswap transaction, posting the transaction to the configured node.
fn execute_round(&self) -> Result<Option<Transaction>, Box<dyn std::error::Error>>;
async fn execute_round(&self) -> Result<Option<Arc<Transaction>>, SwapError>;
}
/// The standard MWixnet server implementation
@ -62,7 +85,7 @@ pub struct SwapServerImpl {
next_server: Option<Arc<dyn MixClient>>,
wallet: Arc<dyn Wallet>,
node: Arc<dyn GrinNode>,
store: Arc<Mutex<SwapStore>>,
store: Arc<tokio::sync::Mutex<SwapStore>>,
}
impl SwapServerImpl {
@ -79,7 +102,7 @@ impl SwapServerImpl {
next_server,
wallet,
node,
store: Arc::new(Mutex::new(store)),
store: Arc::new(tokio::sync::Mutex::new(store)),
}
}
@ -93,10 +116,29 @@ impl SwapServerImpl {
fn get_minimum_swap_fee(&self) -> u64 {
TransactionBody::weight_by_iok(1, 1, 1) * self.get_fee_base()
}
async fn async_is_spendable(&self, next_block_height: u64, swap: &SwapData) -> bool {
if let SwapStatus::Unprocessed = swap.status {
if node::async_is_spendable(&self.node, &swap.input.commit, next_block_height)
.await
.unwrap_or(false)
{
if !node::async_is_unspent(&self.node, &swap.output_commit)
.await
.unwrap_or(true)
{
return true;
}
}
}
false
}
}
#[async_trait]
impl SwapServer for SwapServerImpl {
fn swap(&self, onion: &Onion, comsig: &ComSignature) -> Result<(), SwapError> {
async fn swap(&self, onion: &Onion, comsig: &ComSignature) -> Result<(), SwapError> {
// Verify that more than 1 payload exists when there's a next server,
// or that exactly 1 payload exists when this is the final server
if self.server_config.next_server.is_some() && onion.enc_payloads.len() <= 1
@ -114,7 +156,8 @@ impl SwapServer for SwapServerImpl {
.map_err(|_| SwapError::InvalidComSignature)?;
// Verify that commitment is unspent
let input = node::build_input(&self.node, &onion.commit)
let input = node::async_build_input(&self.node, &onion.commit)
.await
.map_err(|e| SwapError::UnknownError(e.to_string()))?;
let input = input.ok_or(SwapError::CoinNotFound {
commit: onion.commit.clone(),
@ -144,7 +187,7 @@ impl SwapServer for SwapServerImpl {
return Err(SwapError::MissingRangeproof);
}
let locked = self.store.lock().unwrap();
let locked = self.store.lock().await;
locked
.save_swap(
@ -168,23 +211,22 @@ impl SwapServer for SwapServerImpl {
Ok(())
}
fn execute_round(&self) -> Result<Option<Transaction>, Box<dyn std::error::Error>> {
let locked_store = self.store.lock().unwrap();
let next_block_height = self.node.get_chain_height()? + 1;
async fn execute_round(&self) -> Result<Option<Arc<Transaction>>, SwapError> {
let next_block_height = self.node.async_get_chain_height().await? + 1;
let spendable: Vec<SwapData> = locked_store
let locked_store = self.store.lock().await;
let swaps: Vec<SwapData> = locked_store
.swaps_iter()?
.unique_by(|s| s.output_commit)
.filter(|s| match s.status {
SwapStatus::Unprocessed => true,
_ => false,
})
.filter(|s| {
node::is_spendable(&self.node, &s.input.commit, next_block_height).unwrap_or(false)
})
.filter(|s| !node::is_unspent(&self.node, &s.output_commit).unwrap_or(true))
.sorted_by(|a, b| a.output_commit.partial_cmp(&b.output_commit).unwrap())
.collect();
let mut spendable: Vec<SwapData> = vec![];
for swap in &swaps {
if self.async_is_spendable(next_block_height, &swap).await {
spendable.push(swap.clone());
}
}
spendable.sort_by(|a, b| a.output_commit.partial_cmp(&b.output_commit).unwrap());
if spendable.len() == 0 {
return Ok(None);
@ -193,12 +235,13 @@ impl SwapServer for SwapServerImpl {
let (filtered, failed, offset, outputs, kernels) = if let Some(client) = &self.next_server {
// Call next mix server
let onions = spendable.iter().map(|s| s.onion.clone()).collect();
let (indices, mixed) = client
let mixed = client
.mix_outputs(&onions)
.await
.map_err(|e| SwapError::ClientError(e.to_string()))?;
// Filter out failed entries
let kept_indices = HashSet::<_>::from_iter(indices.clone());
let kept_indices = HashSet::<_>::from_iter(mixed.indices.clone());
let filtered = spendable
.iter()
.enumerate()
@ -213,7 +256,13 @@ impl SwapServer for SwapServerImpl {
.map(|(_, j)| j.clone())
.collect();
(filtered, failed, mixed.offset, mixed.outputs, mixed.kernels)
(
filtered,
failed,
mixed.components.offset,
mixed.components.outputs,
mixed.components.kernels,
)
} else {
// Build plain outputs for each swap entry
let outputs: Vec<Output> = spendable
@ -234,7 +283,7 @@ impl SwapServer for SwapServerImpl {
let inputs: Vec<Input> = filtered.iter().map(|s| s.input).collect();
let output_excesses: Vec<SecretKey> = filtered.iter().map(|s| s.excess.clone()).collect();
let tx = tx::assemble_tx(
let tx = tx::async_assemble_tx(
&self.wallet,
&inputs,
&outputs,
@ -243,14 +292,15 @@ impl SwapServer for SwapServerImpl {
fees_paid,
&offset,
&output_excesses,
)?;
)
.await?;
self.node.post_tx(&tx)?;
self.node.async_post_tx(&tx).await?;
// Update status to in process
let kernel_hash = tx.kernels().first().unwrap().hash();
let kernel_commit = tx.kernels().first().unwrap().excess;
for mut swap in filtered {
swap.status = SwapStatus::InProcess { kernel_hash };
swap.status = SwapStatus::InProcess { kernel_commit };
locked_store.save_swap(&swap, true)?;
}
@ -260,16 +310,17 @@ impl SwapServer for SwapServerImpl {
locked_store.save_swap(&swap, true)?;
}
Ok(Some(tx))
Ok(Some(Arc::new(tx)))
}
}
#[cfg(test)]
pub mod mock {
use super::{SwapError, SwapServer};
use crate::crypto::comsig::ComSignature;
use async_trait::async_trait;
use grin_core::core::Transaction;
use grin_onion::crypto::comsig::ComSignature;
use grin_onion::onion::Onion;
use std::collections::HashMap;
@ -289,8 +340,9 @@ pub mod mock {
}
}
#[async_trait]
impl SwapServer for MockSwapServer {
fn swap(&self, onion: &Onion, _comsig: &ComSignature) -> Result<(), SwapError> {
async fn swap(&self, onion: &Onion, _comsig: &ComSignature) -> Result<(), SwapError> {
if let Some(e) = self.errors.get(&onion) {
return Err(e.clone());
}
@ -298,7 +350,7 @@ pub mod mock {
Ok(())
}
fn execute_round(&self) -> Result<Option<Transaction>, Box<dyn std::error::Error>> {
async fn execute_round(&self) -> Result<Option<std::sync::Arc<Transaction>>, SwapError> {
Ok(None)
}
}
@ -306,11 +358,15 @@ pub mod mock {
#[cfg(test)]
pub mod test_util {
use crate::crypto::dalek::DalekPublicKey;
use crate::crypto::secp::SecretKey;
use crate::client::MixClient;
use crate::config;
use crate::node::GrinNode;
use crate::servers::swap::SwapServerImpl;
use crate::store::SwapStore;
use crate::wallet::mock::MockWallet;
use crate::{config, GrinNode, MixClient, SwapStore};
use grin_onion::crypto::dalek::DalekPublicKey;
use grin_onion::crypto::secp::SecretKey;
use std::sync::Arc;
pub fn new_swapper(
@ -339,14 +395,15 @@ pub mod test_util {
#[cfg(test)]
mod tests {
use crate::client::{self, MixClient};
use crate::node::mock::MockGrinNode;
use crate::servers::mix_rpc::MixResp;
use crate::servers::swap::{SwapError, SwapServer};
use crate::store::{SwapData, SwapStatus};
use crate::tx;
use crate::tx::TxComponents;
use crate::{client, tx, MixClient};
use ::function_name::named;
use grin_core::core::hash::Hashed;
use grin_core::core::{Committed, Input, Output, OutputFeatures, Transaction, Weighting};
use grin_onion::crypto::comsig::ComSignature;
use grin_onion::crypto::secp;
@ -380,9 +437,9 @@ mod tests {
}
/// Standalone swap server to demonstrate request validation and onion unwrapping.
#[test]
#[tokio::test]
#[named]
fn swap_standalone() -> Result<(), Box<dyn std::error::Error>> {
async fn swap_standalone() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let test_dir = init_test!();
let value: u64 = 200_000_000;
@ -400,7 +457,7 @@ mod tests {
let node: Arc<MockGrinNode> = Arc::new(MockGrinNode::new_with_utxos(&vec![&input_commit]));
let (server, _) = super::test_util::new_swapper(&test_dir, &server_key, None, node.clone());
server.swap(&onion, &comsig)?;
server.swap(&onion, &comsig).await?;
// Make sure entry is added to server.
let expected = SwapData {
@ -418,21 +475,21 @@ mod tests {
};
{
let store = server.store.lock().unwrap();
let store = server.store.lock().await;
assert_eq!(1, store.swaps_iter().unwrap().count());
assert!(store.swap_exists(&input_commit).unwrap());
assert_eq!(expected, store.get_swap(&input_commit).unwrap());
}
let tx = server.execute_round()?;
let tx = server.execute_round().await?;
assert!(tx.is_some());
{
// check that status was updated
let store = server.store.lock().unwrap();
let store = server.store.lock().await;
assert!(match store.get_swap(&input_commit)?.status {
SwapStatus::InProcess { kernel_hash } =>
kernel_hash == tx.unwrap().kernels().first().unwrap().hash(),
SwapStatus::InProcess { kernel_commit } =>
kernel_commit == tx.unwrap().kernels().first().unwrap().excess,
_ => false,
});
}
@ -451,9 +508,9 @@ mod tests {
}
/// Multi-server test to verify proper MixClient communication.
#[test]
#[tokio::test]
#[named]
fn swap_multiserver() -> Result<(), Box<dyn std::error::Error>> {
async fn swap_multiserver() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let test_dir = init_test!();
// Setup input
@ -498,7 +555,10 @@ mod tests {
};
mock_mixer.set_response(
&vec![mixer_onion.clone()],
(vec![0 as usize], mixer_response),
MixResp {
indices: vec![0 as usize],
components: mixer_response,
},
);
let mixer: Arc<dyn MixClient> = Arc::new(mock_mixer);
@ -508,9 +568,9 @@ mod tests {
Some((&mixer_pk, &mixer)),
node.clone(),
);
swapper.swap(&onion, &comsig)?;
swapper.swap(&onion, &comsig).await?;
let tx = swapper.execute_round()?;
let tx = swapper.execute_round().await?;
assert!(tx.is_some());
// check that the transaction was posted
@ -527,9 +587,9 @@ mod tests {
}
/// Returns InvalidPayloadLength when too many payloads are provided.
#[test]
#[tokio::test]
#[named]
fn swap_too_many_payloads() -> Result<(), Box<dyn std::error::Error>> {
async fn swap_too_many_payloads() -> Result<(), Box<dyn std::error::Error>> {
let test_dir = init_test!();
let value: u64 = 200_000_000;
@ -549,22 +609,19 @@ mod tests {
let node: Arc<MockGrinNode> = Arc::new(MockGrinNode::new_with_utxos(&vec![&input_commit]));
let (server, _) = super::test_util::new_swapper(&test_dir, &server_key, None, node.clone());
let result = server.swap(&onion, &comsig);
let result = server.swap(&onion, &comsig).await;
assert_eq!(Err(SwapError::InvalidPayloadLength), result);
// Make sure no entry is added to the store
assert_eq!(
0,
server.store.lock().unwrap().swaps_iter().unwrap().count()
);
assert_eq!(0, server.store.lock().await.swaps_iter().unwrap().count());
Ok(())
}
/// Returns InvalidComSignature when ComSignature fails to verify.
#[test]
#[tokio::test]
#[named]
fn swap_invalid_com_signature() -> Result<(), Box<dyn std::error::Error>> {
async fn swap_invalid_com_signature() -> Result<(), Box<dyn std::error::Error>> {
let test_dir = init_test!();
let value: u64 = 200_000_000;
@ -585,22 +642,19 @@ mod tests {
let node: Arc<MockGrinNode> = Arc::new(MockGrinNode::new_with_utxos(&vec![&input_commit]));
let (server, _) = super::test_util::new_swapper(&test_dir, &server_key, None, node.clone());
let result = server.swap(&onion, &comsig);
let result = server.swap(&onion, &comsig).await;
assert_eq!(Err(SwapError::InvalidComSignature), result);
// Make sure no entry is added to the store
assert_eq!(
0,
server.store.lock().unwrap().swaps_iter().unwrap().count()
);
assert_eq!(0, server.store.lock().await.swaps_iter().unwrap().count());
Ok(())
}
/// Returns InvalidRangeProof when the rangeproof fails to verify for the commitment.
#[test]
#[tokio::test]
#[named]
fn swap_invalid_rangeproof() -> Result<(), Box<dyn std::error::Error>> {
async fn swap_invalid_rangeproof() -> Result<(), Box<dyn std::error::Error>> {
let test_dir = init_test!();
let value: u64 = 200_000_000;
@ -620,22 +674,19 @@ mod tests {
let node: Arc<MockGrinNode> = Arc::new(MockGrinNode::new_with_utxos(&vec![&input_commit]));
let (server, _) = super::test_util::new_swapper(&test_dir, &server_key, None, node.clone());
let result = server.swap(&onion, &comsig);
let result = server.swap(&onion, &comsig).await;
assert_eq!(Err(SwapError::InvalidRangeproof), result);
// Make sure no entry is added to the store
assert_eq!(
0,
server.store.lock().unwrap().swaps_iter().unwrap().count()
);
assert_eq!(0, server.store.lock().await.swaps_iter().unwrap().count());
Ok(())
}
/// Returns MissingRangeproof when no rangeproof is provided.
#[test]
#[tokio::test]
#[named]
fn swap_missing_rangeproof() -> Result<(), Box<dyn std::error::Error>> {
async fn swap_missing_rangeproof() -> Result<(), Box<dyn std::error::Error>> {
let test_dir = init_test!();
let value: u64 = 200_000_000;
@ -652,22 +703,19 @@ mod tests {
let node: Arc<MockGrinNode> = Arc::new(MockGrinNode::new_with_utxos(&vec![&input_commit]));
let (server, _) = super::test_util::new_swapper(&test_dir, &server_key, None, node.clone());
let result = server.swap(&onion, &comsig);
let result = server.swap(&onion, &comsig).await;
assert_eq!(Err(SwapError::MissingRangeproof), result);
// Make sure no entry is added to the store
assert_eq!(
0,
server.store.lock().unwrap().swaps_iter().unwrap().count()
);
assert_eq!(0, server.store.lock().await.swaps_iter().unwrap().count());
Ok(())
}
/// Returns CoinNotFound when there's no matching output in the UTXO set.
#[test]
#[tokio::test]
#[named]
fn swap_utxo_missing() -> Result<(), Box<dyn std::error::Error>> {
async fn swap_utxo_missing() -> Result<(), Box<dyn std::error::Error>> {
let test_dir = init_test!();
let value: u64 = 200_000_000;
@ -686,7 +734,7 @@ mod tests {
let node: Arc<MockGrinNode> = Arc::new(MockGrinNode::new());
let (server, _) = super::test_util::new_swapper(&test_dir, &server_key, None, node.clone());
let result = server.swap(&onion, &comsig);
let result = server.swap(&onion, &comsig).await;
assert_eq!(
Err(SwapError::CoinNotFound {
commit: input_commit.clone()
@ -695,18 +743,15 @@ mod tests {
);
// Make sure no entry is added to the store
assert_eq!(
0,
server.store.lock().unwrap().swaps_iter().unwrap().count()
);
assert_eq!(0, server.store.lock().await.swaps_iter().unwrap().count());
Ok(())
}
/// Returns AlreadySwapped when trying to swap the same commitment multiple times.
#[test]
#[tokio::test]
#[named]
fn swap_already_swapped() -> Result<(), Box<dyn std::error::Error>> {
async fn swap_already_swapped() -> Result<(), Box<dyn std::error::Error>> {
let test_dir = init_test!();
let value: u64 = 200_000_000;
@ -725,10 +770,10 @@ mod tests {
let node: Arc<MockGrinNode> = Arc::new(MockGrinNode::new_with_utxos(&vec![&input_commit]));
let (server, _) = super::test_util::new_swapper(&test_dir, &server_key, None, node.clone());
server.swap(&onion, &comsig)?;
server.swap(&onion, &comsig).await?;
// Call swap a second time
let result = server.swap(&onion, &comsig);
let result = server.swap(&onion, &comsig).await;
assert_eq!(
Err(SwapError::AlreadySwapped {
commit: input_commit.clone()
@ -740,9 +785,9 @@ mod tests {
}
/// Returns PeelOnionFailure when a failure occurs trying to decrypt the onion payload.
#[test]
#[tokio::test]
#[named]
fn swap_peel_onion_failure() -> Result<(), Box<dyn std::error::Error>> {
async fn swap_peel_onion_failure() -> Result<(), Box<dyn std::error::Error>> {
let test_dir = init_test!();
let value: u64 = 200_000_000;
@ -763,7 +808,7 @@ mod tests {
let node: Arc<MockGrinNode> = Arc::new(MockGrinNode::new_with_utxos(&vec![&input_commit]));
let (server, _) = super::test_util::new_swapper(&test_dir, &server_key, None, node.clone());
let result = server.swap(&onion, &comsig);
let result = server.swap(&onion, &comsig).await;
assert!(result.is_err());
assert_error_type!(result, SwapError::PeelOnionFailure(_));
@ -772,9 +817,9 @@ mod tests {
}
/// Returns FeeTooLow when the minimum fee is not met.
#[test]
#[tokio::test]
#[named]
fn swap_fee_too_low() -> Result<(), Box<dyn std::error::Error>> {
async fn swap_fee_too_low() -> Result<(), Box<dyn std::error::Error>> {
let test_dir = init_test!();
let value: u64 = 200_000_000;
@ -793,7 +838,7 @@ mod tests {
let node: Arc<MockGrinNode> = Arc::new(MockGrinNode::new_with_utxos(&vec![&input_commit]));
let (server, _) = super::test_util::new_swapper(&test_dir, &server_key, None, node.clone());
let result = server.swap(&onion, &comsig);
let result = server.swap(&onion, &comsig).await;
assert_eq!(
Err(SwapError::FeeTooLow {
minimum_fee: 12_500_000,

View file

@ -5,17 +5,15 @@ use crate::servers::swap::{SwapError, SwapServer, SwapServerImpl};
use crate::store::SwapStore;
use crate::wallet::Wallet;
use futures::FutureExt;
use grin_onion::crypto::comsig::{self, ComSignature};
use grin_onion::onion::Onion;
use grin_util::StopState;
use jsonrpc_core::Value;
use jsonrpc_derive::rpc;
use jsonrpc_http_server::jsonrpc_core::*;
use jsonrpc_http_server::*;
use jsonrpc_http_server::{DomainsValidation, ServerBuilder};
use serde::{Deserialize, Serialize};
use std::sync::{Arc, Mutex};
use std::thread::{sleep, spawn};
use std::time::Duration;
use std::sync::Arc;
#[derive(Serialize, Deserialize)]
pub struct SwapReq {
@ -27,22 +25,23 @@ pub struct SwapReq {
#[rpc(server)]
pub trait SwapAPI {
#[rpc(name = "swap")]
fn swap(&self, swap: SwapReq) -> jsonrpc_core::Result<Value>;
fn swap(&self, swap: SwapReq) -> BoxFuture<jsonrpc_core::Result<Value>>;
}
#[derive(Clone)]
struct RPCSwapServer {
server_config: ServerConfig,
server: Arc<Mutex<dyn SwapServer>>,
server: Arc<tokio::sync::Mutex<dyn SwapServer>>,
}
impl RPCSwapServer {
/// Spin up an instance of the JSON-RPC HTTP server.
fn start_http(&self) -> jsonrpc_http_server::Server {
fn start_http(&self, runtime_handle: tokio::runtime::Handle) -> jsonrpc_http_server::Server {
let mut io = IoHandler::new();
io.extend_with(RPCSwapServer::to_delegate(self.clone()));
ServerBuilder::new(io)
.event_loop_executor(runtime_handle)
.cors(DomainsValidation::Disabled)
.request_middleware(|request: hyper::Request<hyper::Body>| {
if request.uri() == "/v1" {
@ -70,24 +69,31 @@ impl From<SwapError> for Error {
}
impl SwapAPI for RPCSwapServer {
fn swap(&self, swap: SwapReq) -> jsonrpc_core::Result<Value> {
self.server
.lock()
.unwrap()
.swap(&swap.onion, &swap.comsig)?;
Ok(Value::String("success".into()))
fn swap(&self, swap: SwapReq) -> BoxFuture<jsonrpc_core::Result<Value>> {
let server = self.server.clone();
async move {
server.lock().await.swap(&swap.onion, &swap.comsig).await?;
Ok(Value::String("success".into()))
}
.boxed()
}
}
/// Spin up the JSON-RPC web server
pub fn listen(
server_config: ServerConfig,
rt_handle: &tokio::runtime::Handle,
server_config: &ServerConfig,
next_server: Option<Arc<dyn MixClient>>,
wallet: Arc<dyn Wallet>,
node: Arc<dyn GrinNode>,
store: SwapStore,
stop_state: Arc<StopState>,
) -> std::result::Result<(), Box<dyn std::error::Error>> {
) -> std::result::Result<
(
Arc<tokio::sync::Mutex<dyn SwapServer>>,
jsonrpc_http_server::Server,
),
Box<dyn std::error::Error>,
> {
let server = SwapServerImpl::new(
server_config.clone(),
next_server,
@ -95,54 +101,33 @@ pub fn listen(
node.clone(),
store,
);
let server = Arc::new(Mutex::new(server));
let server = Arc::new(tokio::sync::Mutex::new(server));
let rpc_server = RPCSwapServer {
server_config: server_config.clone(),
server: server.clone(),
};
let http_server = rpc_server.start_http();
let http_server = rpc_server.start_http(rt_handle.clone());
let close_handle = http_server.close_handle();
let round_handle = spawn(move || {
let mut secs = 0;
loop {
if stop_state.is_stopped() {
close_handle.close();
break;
}
sleep(Duration::from_secs(1));
secs = (secs + 1) % server_config.interval_s;
if secs == 0 {
let _ = server.lock().unwrap().execute_round();
}
}
});
http_server.wait();
round_handle.join().unwrap();
Ok(())
Ok((server, http_server))
}
#[cfg(test)]
mod tests {
use crate::config::ServerConfig;
use crate::crypto::comsig::ComSignature;
use crate::crypto::secp;
use crate::servers::swap::mock::MockSwapServer;
use crate::servers::swap::{SwapError, SwapServer};
use crate::servers::swap_rpc::{RPCSwapServer, SwapReq};
use grin_onion::create_onion;
use grin_onion::crypto::comsig::ComSignature;
use grin_onion::crypto::secp;
use std::net::TcpListener;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use hyper::{Body, Client, Request, Response};
use tokio::runtime::Runtime;
use tokio::sync::Mutex;
async fn body_to_string(req: Response<Body>) -> String {
let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap();
@ -150,10 +135,11 @@ mod tests {
}
/// Spin up a temporary web service, query the API, then cleanup and return response
fn make_request(
server: Arc<Mutex<dyn SwapServer>>,
async fn async_make_request(
server: Arc<tokio::sync::Mutex<dyn SwapServer>>,
req: String,
) -> Result<String, Box<dyn std::error::Error>> {
runtime_handle: &tokio::runtime::Handle,
) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
let server_config = ServerConfig {
key: secp::random_secret(),
interval_s: 1,
@ -173,28 +159,21 @@ mod tests {
};
// Start the JSON-RPC server
let http_server = rpc_server.start_http();
let http_server = rpc_server.start_http(runtime_handle.clone());
let uri = format!("http://{}/v1", server_config.addr);
let threaded_rt = Runtime::new()?;
let do_request = async move {
let request = Request::post(uri)
.header("Content-Type", "application/json")
.body(Body::from(req))
.unwrap();
let request = Request::post(uri)
.header("Content-Type", "application/json")
.body(Body::from(req))
.unwrap();
Client::new().request(request).await
};
let response = Client::new().request(request).await?;
let response = threaded_rt.block_on(do_request)?;
let response_str: String = threaded_rt.block_on(body_to_string(response));
// Wait for shutdown
threaded_rt.shutdown_background();
let response_str: String = body_to_string(response).await;
// Execute one round
server.lock().unwrap().execute_round()?;
server.lock().await.execute_round().await?;
// Stop the server
http_server.close();
@ -206,7 +185,11 @@ mod tests {
/// Demonstrates a successful swap response
#[test]
fn swap_success() -> Result<(), Box<dyn std::error::Error>> {
fn swap_success() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut rt = tokio::runtime::Builder::new()
.threaded_scheduler()
.enable_all()
.build()?;
let commitment = secp::commit(1234, &secp::random_secret())?;
let onion = create_onion(&commitment, &vec![])?;
let comsig = ComSignature::sign(1234, &secp::random_secret(), &onion.serialize()?)?;
@ -221,7 +204,8 @@ mod tests {
"{{\"jsonrpc\": \"2.0\", \"method\": \"swap\", \"params\": [{}], \"id\": \"1\"}}",
serde_json::json!(swap)
);
let response = make_request(server, req)?;
let rt_handle = rt.handle().clone();
let response = rt.block_on(async_make_request(server, req, &rt_handle))?;
let expected = "{\"jsonrpc\":\"2.0\",\"result\":\"success\",\"id\":\"1\"}\n";
assert_eq!(response, expected);
@ -229,7 +213,11 @@ mod tests {
}
#[test]
fn swap_bad_request() -> Result<(), Box<dyn std::error::Error>> {
fn swap_bad_request() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut rt = tokio::runtime::Builder::new()
.threaded_scheduler()
.enable_all()
.build()?;
let server: Arc<Mutex<dyn SwapServer>> = Arc::new(Mutex::new(MockSwapServer::new()));
let params = "{ \"param\": \"Not a valid Swap request\" }";
@ -237,7 +225,8 @@ mod tests {
"{{\"jsonrpc\": \"2.0\", \"method\": \"swap\", \"params\": [{}], \"id\": \"1\"}}",
params
);
let response = make_request(server, req)?;
let rt_handle = rt.handle().clone();
let response = rt.block_on(async_make_request(server, req, &rt_handle))?;
let expected = "{\"jsonrpc\":\"2.0\",\"error\":{\"code\":-32602,\"message\":\"Invalid params: missing field `onion`.\"},\"id\":\"1\"}\n";
assert_eq!(response, expected);
Ok(())
@ -245,7 +234,12 @@ mod tests {
/// Returns "Commitment not found" when there's no matching output in the UTXO set.
#[test]
fn swap_utxo_missing() -> Result<(), Box<dyn std::error::Error>> {
fn swap_utxo_missing() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut rt = tokio::runtime::Builder::new()
.threaded_scheduler()
.enable_all()
.build()?;
let commitment = secp::commit(1234, &secp::random_secret())?;
let onion = create_onion(&commitment, &vec![])?;
let comsig = ComSignature::sign(1234, &secp::random_secret(), &onion.serialize()?)?;
@ -267,7 +261,8 @@ mod tests {
"{{\"jsonrpc\": \"2.0\", \"method\": \"swap\", \"params\": [{}], \"id\": \"1\"}}",
serde_json::json!(swap)
);
let response = make_request(server, req)?;
let rt_handle = rt.handle().clone();
let response = rt.block_on(async_make_request(server, req, &rt_handle))?;
let expected = format!(
"{{\"jsonrpc\":\"2.0\",\"error\":{{\"code\":-32602,\"message\":\"Output {:?} does not exist, or is already spent.\"}},\"id\":\"1\"}}\n",
commitment

View file

@ -21,8 +21,13 @@ const SWAP_PREFIX: u8 = b'S';
#[derive(Clone, Debug, PartialEq)]
pub enum SwapStatus {
Unprocessed,
InProcess { kernel_hash: Hash },
Completed { kernel_hash: Hash, block_hash: Hash },
InProcess {
kernel_commit: Commitment,
},
Completed {
kernel_commit: Commitment,
block_hash: Hash,
},
Failed,
}
@ -32,16 +37,16 @@ impl Writeable for SwapStatus {
SwapStatus::Unprocessed => {
writer.write_u8(0)?;
}
SwapStatus::InProcess { kernel_hash } => {
SwapStatus::InProcess { kernel_commit } => {
writer.write_u8(1)?;
kernel_hash.write(writer)?;
kernel_commit.write(writer)?;
}
SwapStatus::Completed {
kernel_hash,
kernel_commit,
block_hash,
} => {
writer.write_u8(2)?;
kernel_hash.write(writer)?;
kernel_commit.write(writer)?;
block_hash.write(writer)?;
}
SwapStatus::Failed => {
@ -58,14 +63,14 @@ impl Readable for SwapStatus {
let status = match reader.read_u8()? {
0 => SwapStatus::Unprocessed,
1 => {
let kernel_hash = Hash::read(reader)?;
SwapStatus::InProcess { kernel_hash }
let kernel_commit = Commitment::read(reader)?;
SwapStatus::InProcess { kernel_commit }
}
2 => {
let kernel_hash = Hash::read(reader)?;
let kernel_commit = Commitment::read(reader)?;
let block_hash = Hash::read(reader)?;
SwapStatus::Completed {
kernel_hash,
kernel_commit,
block_hash,
}
}
@ -244,8 +249,7 @@ impl SwapStore {
#[cfg(test)]
mod tests {
use crate::store::{SwapData, SwapStatus, SwapStore};
use crate::StoreError;
use crate::store::{StoreError, SwapData, SwapStatus, SwapStore};
use grin_core::core::{Input, OutputFeatures};
use grin_core::global::{self, ChainTypes};
use grin_onion::crypto::secp;
@ -278,11 +282,11 @@ mod tests {
SwapStatus::Unprocessed
} else if s == 1 {
SwapStatus::InProcess {
kernel_hash: onion_test_util::rand_hash(),
kernel_commit: onion_test_util::rand_commit(),
}
} else {
SwapStatus::Completed {
kernel_hash: onion_test_util::rand_hash(),
kernel_commit: onion_test_util::rand_commit(),
block_hash: onion_test_util::rand_hash(),
}
};
@ -330,7 +334,7 @@ mod tests {
assert!(store.swap_exists(&swap.input.commit)?);
swap.status = SwapStatus::InProcess {
kernel_hash: onion_test_util::rand_hash(),
kernel_commit: onion_test_util::rand_commit(),
};
let result = store.save_swap(&swap, false);
assert_eq!(

View file

@ -1,6 +1,5 @@
use crate::config::{self, ServerConfig};
use crate::config::ServerConfig;
use grin_core::global;
use grin_wallet_impls::tor::config as tor_config;
use grin_wallet_impls::tor::process::TorProcess;
use std::collections::HashMap;
@ -10,72 +9,96 @@ use thiserror::Error;
#[derive(Error, Debug)]
pub enum TorError {
#[error("Error generating config: {0:?}")]
ConfigError(String),
ConfigError(grin_wallet_impls::Error),
#[error("Error starting process: {0:?}")]
ProcessError(grin_wallet_impls::tor::process::Error),
}
pub fn init_tor_listener(server_config: &ServerConfig) -> Result<TorProcess, TorError> {
println!("Initializing tor listener");
pub fn init_tor_listener(
data_dir: &str,
server_config: &ServerConfig,
) -> Result<TorProcess, TorError> {
warn!("Initializing tor listener");
let mut tor_dir = config::get_grin_path(&global::get_chain_type());
tor_dir.push("tor/listener");
let tor_dir = format!("{}/tor/listener", &data_dir);
trace!(
"Dir: {}, Proxy: {}",
&tor_dir,
server_config.socks_proxy_addr.to_string()
);
let mut torrc_dir = tor_dir.clone();
torrc_dir.push("torrc");
// create data directory if it doesn't exist
std::fs::create_dir_all(&format!("{}/data", tor_dir)).unwrap();
tor_config::output_tor_listener_config(
tor_dir.to_str().unwrap(),
let service_dir = tor_config::output_onion_service_config(tor_dir.as_str(), &server_config.key)
.map_err(|e| TorError::ConfigError(e))?;
let service_dirs = vec![service_dir.to_string()];
tor_config::output_torrc(
tor_dir.as_str(),
server_config.addr.to_string().as_str(),
&vec![server_config.key.clone()],
&server_config.socks_proxy_addr.to_string(),
&service_dirs,
HashMap::new(),
HashMap::new(),
)
.map_err(|e| TorError::ConfigError(e.to_string()))?;
.map_err(|e| TorError::ConfigError(e))?;
// Start TOR process
let mut process = TorProcess::new();
process
.torrc_path(torrc_dir.to_str().unwrap())
.working_dir(tor_dir.to_str().unwrap())
.timeout(20)
.completion_percent(100)
.launch()
.map_err(TorError::ProcessError)?;
.torrc_path("./torrc")
.working_dir(tor_dir.as_str())
.timeout(30)
.completion_percent(100);
println!(
let mut attempts = 0;
let max_attempts = 3;
let mut result;
loop {
attempts += 1;
info!("Launching TorProcess... Attempt {}", attempts);
result = process.launch();
if result.is_ok() || attempts >= max_attempts {
break;
}
}
result.map_err(TorError::ProcessError)?;
warn!(
"Server listening at http://{}.onion",
server_config.onion_address().to_ov3_str()
);
Ok(process)
}
pub fn init_tor_sender(server_config: &ServerConfig) -> Result<TorProcess, TorError> {
println!(
pub fn init_tor_sender(
data_dir: &str,
server_config: &ServerConfig,
) -> Result<TorProcess, TorError> {
warn!(
"Starting TOR Process for send at {:?}",
server_config.socks_proxy_addr
);
let mut tor_dir = config::get_grin_path(&global::get_chain_type());
tor_dir.push("tor/sender");
let mut torrc_dir = tor_dir.clone();
torrc_dir.push("torrc");
let tor_dir = format!("{}/tor/sender", data_dir);
tor_config::output_tor_sender_config(
tor_dir.to_str().unwrap(),
tor_dir.as_str(),
&server_config.socks_proxy_addr.to_string(),
HashMap::new(),
HashMap::new(),
)
.map_err(|e| TorError::ConfigError(e.to_string()))?;
.map_err(|e| TorError::ConfigError(e))?;
// Start TOR process
let mut tor_process = TorProcess::new();
tor_process
.torrc_path(torrc_dir.to_str().unwrap())
.working_dir(tor_dir.to_str().unwrap())
.timeout(20)
.torrc_path("./torrc")
.working_dir(tor_dir.as_str())
.timeout(40)
.completion_percent(100)
.launch()
.map_err(TorError::ProcessError)?;

View file

@ -1,10 +1,10 @@
use crate::crypto::secp;
use crate::wallet::Wallet;
use grin_core::core::{
FeeFields, Input, Inputs, KernelFeatures, Output, Transaction, TransactionBody, TxKernel,
};
use grin_keychain::BlindingFactor;
use grin_onion::crypto::secp;
use secp256k1zkp::{ContextFlag, Secp256k1, SecretKey};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
@ -43,7 +43,7 @@ pub struct TxComponents {
}
/// Builds and verifies the finalized swap 'Transaction' using the provided components.
pub fn assemble_tx(
pub async fn async_assemble_tx(
wallet: &Arc<dyn Wallet>,
inputs: &Vec<Input>,
outputs: &Vec<Output>,
@ -57,7 +57,7 @@ pub fn assemble_tx(
let min_kernel_fee =
TransactionBody::weight_by_iok(inputs.len() as u64, outputs.len() as u64, 1) * fee_base;
let components = add_kernel_and_collect_fees(
let components = async_add_kernel_and_collect_fees(
&wallet,
&outputs,
&kernels,
@ -66,7 +66,8 @@ pub fn assemble_tx(
fees_paid,
&prev_offset,
&output_excesses,
)?;
)
.await?;
// assemble the transaction
let tx = Transaction::new(
@ -79,7 +80,7 @@ pub fn assemble_tx(
}
/// Adds a kernel and output to a collection of transaction components to consume fees and offset excesses.
pub fn assemble_components(
pub async fn async_assemble_components(
wallet: &Arc<dyn Wallet>,
components: &TxComponents,
output_excesses: &Vec<SecretKey>,
@ -89,7 +90,7 @@ pub fn assemble_components(
// calculate minimum fee required for the kernel
let min_kernel_fee = TransactionBody::weight_by_iok(0, 0, 1) * fee_base;
add_kernel_and_collect_fees(
async_add_kernel_and_collect_fees(
&wallet,
&components.outputs,
&components.kernels,
@ -99,9 +100,10 @@ pub fn assemble_components(
&components.offset,
&output_excesses,
)
.await
}
fn add_kernel_and_collect_fees(
async fn async_add_kernel_and_collect_fees(
wallet: &Arc<dyn Wallet>,
outputs: &Vec<Output>,
kernels: &Vec<TxKernel>,
@ -128,7 +130,10 @@ fn add_kernel_and_collect_fees(
let amount = fees_paid - (min_kernel_fee + fee_to_collect);
kernel_fee -= amount;
let wallet_output = wallet.build_output(amount).map_err(TxError::WalletError)?;
let wallet_output = wallet
.async_build_output(amount)
.await
.map_err(TxError::WalletError)?;
txn_outputs.push(wallet_output.1);
let output_excess = SecretKey::from_slice(&secp, &wallet_output.0.as_ref())
@ -202,11 +207,12 @@ fn add_kernel_and_collect_fees(
///
/// ```rust
/// use secp256k1zkp::key::SecretKey;
/// use crate::crypto::secp;
/// use secp256k1zkp::rand::thread_rng;
/// use grin_onion::crypto::secp;
///
/// let secret_key = SecretKey::new(&mut secp::rand::thread_rng());
/// let secret_key = secp::random_secret();
/// let fee = 10; // 10 nanogrin
/// let kernel = build_kernel(&secret_key, fee);
/// let kernel = mwixnet::tx::build_kernel(&secret_key, fee);
/// ```
pub fn build_kernel(excess: &SecretKey, fee: u64) -> Result<TxKernel, TxError> {
let mut kernel = TxKernel::with_features(KernelFeatures::Plain {

View file

@ -1,10 +1,10 @@
use crate::crypto::secp;
use async_trait::async_trait;
use grin_api::client;
use grin_api::json_rpc::{build_request, Request, Response};
use grin_api::json_rpc::{build_request, Request, Response, RpcError};
use grin_core::core::Output;
use grin_core::libtx::secp_ser;
use grin_keychain::BlindingFactor;
use grin_onion::crypto::secp;
use grin_util::{ToHex, ZeroingString};
use grin_wallet_api::{EncryptedRequest, EncryptedResponse, JsonId, Token};
use secp256k1zkp::{PublicKey, Secp256k1, SecretKey};
@ -13,9 +13,13 @@ use serde_json::json;
use std::net::SocketAddr;
use thiserror::Error;
#[async_trait]
pub trait Wallet: Send + Sync {
/// Builds an output for the wallet with the provided amount.
fn build_output(&self, amount: u64) -> Result<(BlindingFactor, Output), WalletError>;
async fn async_build_output(
&self,
amount: u64,
) -> Result<(BlindingFactor, Output), WalletError>;
}
/// Error types for interacting with wallets
@ -31,6 +35,8 @@ pub enum WalletError {
ApiCommError(grin_api::Error),
#[error("Error decoding JSON-RPC response: {0:?}")]
ResponseParseError(grin_api::json_rpc::Error),
#[error("Unsucessful response returned: {0:?}")]
ResponseRpcError(Option<RpcError>),
}
/// HTTP (JSONRPC) implementation of the 'Wallet' trait.
@ -55,26 +61,27 @@ pub struct ECDHPubkey {
impl HttpWallet {
/// Calls the 'open_wallet' using the RPC API.
pub fn open_wallet(
pub async fn async_open_wallet(
wallet_owner_url: &SocketAddr,
wallet_owner_secret: &Option<String>,
wallet_pass: &ZeroingString,
) -> Result<HttpWallet, WalletError> {
println!("Opening wallet at {}", wallet_owner_url);
let shared_key = HttpWallet::init_secure_api(&wallet_owner_url, &wallet_owner_secret)?;
info!("Opening wallet at {}", wallet_owner_url);
let shared_key =
HttpWallet::async_init_secure_api(&wallet_owner_url, &wallet_owner_secret).await?;
let open_wallet_params = json!({
"name": null,
"password": wallet_pass.to_string()
});
let token: Token = HttpWallet::send_enc_request(
let token: Token = HttpWallet::async_send_enc_request(
&wallet_owner_url,
&wallet_owner_secret,
"open_wallet",
&open_wallet_params,
&shared_key,
)?;
println!("Connected to wallet");
)
.await?;
info!("Connected to wallet");
Ok(HttpWallet {
wallet_owner_url: wallet_owner_url.clone(),
@ -84,7 +91,7 @@ impl HttpWallet {
})
}
fn init_secure_api(
async fn async_init_secure_api(
wallet_owner_url: &SocketAddr,
wallet_owner_secret: &Option<String>,
) -> Result<SecretKey, WalletError> {
@ -95,12 +102,13 @@ impl HttpWallet {
"ecdh_pubkey": ephemeral_pk.serialize_vec(&secp, true).to_hex()
});
let response_pk: ECDHPubkey = HttpWallet::send_json_request(
let response_pk: ECDHPubkey = HttpWallet::async_send_json_request(
&wallet_owner_url,
&wallet_owner_secret,
"init_secure_api",
&init_params,
)?;
)
.await?;
let shared_key = {
let mut shared_pubkey = response_pk.ecdh_pubkey.clone();
@ -113,7 +121,22 @@ impl HttpWallet {
Ok(shared_key)
}
fn send_enc_request<D: serde::de::DeserializeOwned>(
pub async fn async_perform_request<D: serde::de::DeserializeOwned>(
&self,
method: &str,
params: &serde_json::Value,
) -> Result<D, WalletError> {
HttpWallet::async_send_enc_request(
&self.wallet_owner_url,
&self.wallet_owner_secret,
method,
params,
&self.shared_key,
)
.await
}
async fn async_send_enc_request<D: serde::de::DeserializeOwned>(
wallet_owner_url: &SocketAddr,
wallet_owner_secret: &Option<String>,
method: &str,
@ -129,23 +152,30 @@ impl HttpWallet {
});
let enc_req = EncryptedRequest::from_json(&JsonId::IntId(1), &req, &shared_key)
.map_err(WalletError::EncryptRequestError)?;
let res = client::post::<EncryptedRequest, EncryptedResponse>(
let res = client::post_async::<EncryptedRequest, EncryptedResponse>(
url.as_str(),
wallet_owner_secret.clone(),
&enc_req,
wallet_owner_secret.clone(),
)
.await
.map_err(WalletError::ApiCommError)?;
let decrypted = res
.decrypt(&shared_key)
.map_err(WalletError::DecryptResponseError)?;
let response: Response =
serde_json::from_value(decrypted).map_err(WalletError::DecodeResponseError)?;
let ok = response.result.unwrap().get("Ok").unwrap().clone();
let parsed = serde_json::from_value(ok).map_err(WalletError::DecodeResponseError)?;
let result = response
.result
.ok_or(WalletError::ResponseRpcError(response.error.clone()))?;
let ok = result
.get("Ok")
.ok_or(WalletError::ResponseRpcError(response.error.clone()))?;
let parsed =
serde_json::from_value(ok.clone()).map_err(WalletError::DecodeResponseError)?;
Ok(parsed)
}
fn send_json_request<D: serde::de::DeserializeOwned>(
async fn async_send_json_request<D: serde::de::DeserializeOwned>(
wallet_owner_url: &SocketAddr,
wallet_owner_secret: &Option<String>,
method: &str,
@ -153,15 +183,23 @@ impl HttpWallet {
) -> Result<D, WalletError> {
let url = format!("http://{}{}", wallet_owner_url, ENDPOINT);
let req = build_request(method, params);
let res =
client::post::<Request, Response>(url.as_str(), wallet_owner_secret.clone(), &req)
.map_err(WalletError::ApiCommError)?;
let res = client::post_async::<Request, Response>(
url.as_str(),
&req,
wallet_owner_secret.clone(),
)
.await
.map_err(WalletError::ApiCommError)?;
let parsed = res
.clone()
.into_result()
.map_err(WalletError::ResponseParseError)?;
Ok(parsed)
}
pub fn get_token(&self) -> Token {
self.token.clone()
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
@ -174,21 +212,26 @@ pub struct OutputWithBlind {
output: Output,
}
#[async_trait]
impl Wallet for HttpWallet {
/// Builds an 'Output' for the wallet using the 'build_output' RPC API.
fn build_output(&self, amount: u64) -> Result<(BlindingFactor, Output), WalletError> {
async fn async_build_output(
&self,
amount: u64,
) -> Result<(BlindingFactor, Output), WalletError> {
let req_json = json!({
"token": self.token.keychain_mask.clone().unwrap().0,
"token": self.token,
"features": "Plain",
"amount": amount
});
let output: OutputWithBlind = HttpWallet::send_enc_request(
let output: OutputWithBlind = HttpWallet::async_send_enc_request(
&self.wallet_owner_url,
&self.wallet_owner_secret,
"build_output",
&req_json,
&self.shared_key,
)?;
)
.await?;
Ok((output.blind, output.output))
}
}
@ -196,11 +239,12 @@ impl Wallet for HttpWallet {
#[cfg(test)]
pub mod mock {
use super::{Wallet, WalletError};
use crate::crypto::secp;
use std::borrow::BorrowMut;
use async_trait::async_trait;
use grin_core::core::{Output, OutputFeatures};
use grin_keychain::BlindingFactor;
use grin_onion::crypto::secp;
use secp256k1zkp::pedersen::Commitment;
use secp256k1zkp::Secp256k1;
use std::sync::{Arc, Mutex};
@ -225,9 +269,13 @@ pub mod mock {
}
}
#[async_trait]
impl Wallet for MockWallet {
/// Builds an 'Output' for the wallet using the 'build_output' RPC API.
fn build_output(&self, amount: u64) -> Result<(BlindingFactor, Output), WalletError> {
async fn async_build_output(
&self,
amount: u64,
) -> Result<(BlindingFactor, Output), WalletError> {
let secp = Secp256k1::new();
let blind = secp::random_secret();
let commit = secp::commit(amount, &blind).unwrap();

255
tests/common/miner.rs Normal file
View file

@ -0,0 +1,255 @@
// Copyright 2021 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Mining service, gets a block to mine, and based on mining configuration
//! chooses a version of the cuckoo miner to mine the block and produce a valid
//! header with its proof-of-work. Any valid mined blocks are submitted to the
//! network.
use crate::common::types::BlockFees;
use crate::common::wallet::IntegrationGrinWallet;
use chrono::prelude::Utc;
use chrono::{DateTime, NaiveDateTime};
use grin_chain::Chain;
use grin_core::core::hash::{Hash, Hashed};
use grin_core::core::{Block, BlockHeader, Transaction};
use grin_core::{consensus, global};
use grin_keychain::Identifier;
use grin_util::Mutex;
use rand::{thread_rng, Rng};
use std::sync::Arc;
use std::time::Duration;
pub struct Miner {
chain: Arc<Chain>,
}
impl Miner {
// Creates a new Miner. Needs references to the chain state and its
/// storage.
pub fn new(chain: Arc<Chain>) -> Miner {
Miner { chain }
}
pub async fn async_mine_empty_blocks(
&self,
wallet: &Arc<Mutex<IntegrationGrinWallet>>,
num_blocks: usize,
) {
for _ in 0..num_blocks {
self.async_mine_next_block(wallet, &vec![]).await;
}
}
/// Builds a new block on top of the existing chain.
pub async fn async_mine_next_block(
&self,
wallet: &Arc<Mutex<IntegrationGrinWallet>>,
txs: &Vec<Transaction>,
) {
info!("Starting test miner loop.");
// iteration, we keep the returned derivation to provide it back when
// nothing has changed. We only want to create a new key_id for each new block.
let mut key_id = None;
loop {
// get the latest chain state and build a block on top of it
let head = self.chain.head_header().unwrap();
let mut latest_hash = self.chain.head().unwrap().last_block_h;
let (mut b, block_fees) = self.async_get_block(wallet, txs, key_id.clone()).await;
let sol = self.inner_mining_loop(&mut b, &head, &mut latest_hash);
// we found a solution, push our block through the chain processing pipeline
if sol {
info!(
"Found valid proof of work, adding block {} (prev_root {}).",
b.hash(),
b.header.prev_root,
);
let res = self.chain.process_block(b, grin_chain::Options::MINE);
if let Err(e) = res {
error!("Error validating mined block: {:?}", e);
} else {
return;
}
key_id = None;
} else {
key_id = block_fees.key_id();
}
}
}
/// The inner part of mining loop for the internal miner
/// kept around mostly for automated testing purposes
fn inner_mining_loop(&self, b: &mut Block, head: &BlockHeader, latest_hash: &mut Hash) -> bool {
while head.hash() == *latest_hash {
let mut ctx = global::create_pow_context::<u32>(
head.height,
global::min_edge_bits(),
global::proofsize(),
10,
)
.unwrap();
ctx.set_header_nonce(b.header.pre_pow(), None, true)
.unwrap();
if let Ok(proofs) = ctx.find_cycles() {
b.header.pow.proof = proofs[0].clone();
let proof_diff = b.header.pow.to_difficulty(b.header.height);
if proof_diff >= (b.header.total_difficulty() - head.total_difficulty()) {
return true;
}
}
b.header.pow.nonce += 1;
*latest_hash = self.chain.head().unwrap().last_block_h;
}
false
}
// Ensure a block suitable for mining is built and returned
// If a wallet listener URL is not provided the reward will be "burnt"
// Warning: This call does not return until/unless a new block can be built
async fn async_get_block(
&self,
wallet: &Arc<Mutex<IntegrationGrinWallet>>,
txs: &Vec<Transaction>,
key_id: Option<Identifier>,
) -> (Block, BlockFees) {
let wallet_retry_interval = 5;
// get the latest chain state and build a block on top of it
let mut result = self.async_build_block(wallet, txs, key_id.clone()).await;
while let Err(e) = result {
println!("Error: {:?}", &e);
let mut new_key_id = key_id.to_owned();
match e {
grin_servers::common::types::Error::Chain(c) => match c {
grin_chain::Error::DuplicateCommitment(_) => {
debug!(
"Duplicate commit for potential coinbase detected. Trying next derivation."
);
// use the next available key to generate a different coinbase commitment
new_key_id = None;
}
_ => {
error!("Chain Error: {}", c);
}
},
grin_servers::common::types::Error::WalletComm(_) => {
error!(
"Error building new block: Can't connect to wallet listener; will retry"
);
async_std::task::sleep(Duration::from_secs(wallet_retry_interval)).await;
}
ae => {
warn!("Error building new block: {:?}. Retrying.", ae);
}
}
// only wait if we are still using the same key: a different coinbase commitment is unlikely
// to have duplication
if new_key_id.is_some() {
async_std::task::sleep(Duration::from_millis(100)).await;
}
result = self.async_build_block(wallet, txs, new_key_id).await;
}
return result.unwrap();
}
/// Builds a new block with the chain head as previous and eligible
/// transactions from the pool.
async fn async_build_block(
&self,
wallet: &Arc<Mutex<IntegrationGrinWallet>>,
txs: &Vec<Transaction>,
key_id: Option<Identifier>,
) -> Result<(Block, BlockFees), grin_servers::common::types::Error> {
let head = self.chain.head_header()?;
// prepare the block header timestamp
let mut now_sec = Utc::now().timestamp();
let head_sec = head.timestamp.timestamp();
if now_sec <= head_sec {
now_sec = head_sec + 1;
}
// Determine the difficulty our block should be at.
// Note: do not keep the difficulty_iter in scope (it has an active batch).
let difficulty = consensus::next_difficulty(head.height + 1, self.chain.difficulty_iter()?);
// build the coinbase and the block itself
let fees = txs.iter().map(|tx| tx.fee()).sum();
let height = head.height + 1;
let block_fees = BlockFees {
fees,
key_id,
height,
};
let res = wallet.lock().async_create_coinbase(&block_fees).await?;
let output = res.output;
let kernel = res.kernel;
let block_fees = BlockFees {
key_id: res.key_id,
..block_fees
};
let mut b = Block::from_reward(&head, &txs, output, kernel, difficulty.difficulty)?;
// making sure we're not spending time mining a useless block
b.validate(&head.total_kernel_offset)?;
b.header.pow.nonce = thread_rng().gen();
b.header.pow.secondary_scaling = difficulty.secondary_scaling;
b.header.timestamp = DateTime::<Utc>::from_naive_utc_and_offset(
NaiveDateTime::from_timestamp_opt(now_sec, 0).unwrap(),
Utc,
);
debug!(
"Built new block with {} inputs and {} outputs, block difficulty: {}, cumulative difficulty {}",
b.inputs().len(),
b.outputs().len(),
difficulty.difficulty,
b.header.total_difficulty().to_num(),
);
// Now set txhashset roots and sizes on the header of the block being built.
match self.chain.set_txhashset_roots(&mut b) {
Ok(_) => Ok((b, block_fees)),
Err(e) => {
match e {
// If this is a duplicate commitment then likely trying to use
// a key that hass already been derived but not in the wallet
// for some reason, allow caller to retry.
grin_chain::Error::DuplicateCommitment(e) => {
Err(grin_servers::common::types::Error::Chain(
grin_chain::Error::DuplicateCommitment(e),
))
}
// Some other issue, possibly duplicate kernel
_ => {
error!("Error setting txhashset root to build a block: {:?}", e);
Err(grin_servers::common::types::Error::Chain(
grin_chain::Error::Other(format!("{:?}", e)),
))
}
}
}
}
}
}

5
tests/common/mod.rs Normal file
View file

@ -0,0 +1,5 @@
pub mod miner;
pub mod node;
pub mod server;
pub mod types;
pub mod wallet;

122
tests/common/node.rs Normal file
View file

@ -0,0 +1,122 @@
extern crate grin_wallet_api as apiwallet;
extern crate grin_wallet_config as wallet_config;
extern crate grin_wallet_controller as wallet_controller;
extern crate grin_wallet_impls as wallet;
extern crate grin_wallet_libwallet as libwallet;
use futures::channel::oneshot;
use grin_core as core;
use grin_p2p as p2p;
use grin_servers as servers;
use grin_util::logger::LogEntry;
use grin_util::{Mutex, StopState};
use std::default::Default;
use std::net::SocketAddr;
use mwixnet::{GrinNode, HttpGrinNode};
use std::sync::{mpsc, Arc};
use std::thread;
#[allow(dead_code)]
pub struct IntegrationGrinNode {
server_config: servers::ServerConfig,
stop_state: Arc<StopState>,
server: Option<Arc<servers::Server>>,
}
impl IntegrationGrinNode {
pub fn start(&mut self) -> Arc<servers::Server> {
let stop_state_thread = self.stop_state.clone();
let server_config_thread = self.server_config.clone();
// Create a channel to communicate between threads
let (tx, rx) = mpsc::channel();
// Start the node in a new thread
thread::spawn(move || {
let api_chan: &'static mut (oneshot::Sender<()>, oneshot::Receiver<()>) =
Box::leak(Box::new(oneshot::channel::<()>()));
servers::Server::start(
server_config_thread.clone(),
None,
move |serv: servers::Server, _: Option<mpsc::Receiver<LogEntry>>| {
// Signal that the callback has been called
tx.send(serv).unwrap();
// Do other necessary stuff here
},
Some(stop_state_thread.clone()),
api_chan,
)
.unwrap();
});
// Wait for the signal from the node-running thread
let server = Arc::new(rx.recv().unwrap());
self.server = Some(server.clone());
server
}
pub fn stop(&self) {
self.stop_state.stop();
}
pub fn api_address(&self) -> SocketAddr {
self.server_config.api_http_addr.parse().unwrap()
}
pub fn to_client(&self) -> Arc<dyn GrinNode> {
Arc::new(HttpGrinNode::new(&self.api_address(), &None))
}
}
#[allow(dead_code)]
pub struct GrinNodeManager {
// base directory for the server instance
working_dir: String,
nodes: Vec<Arc<Mutex<IntegrationGrinNode>>>,
}
impl GrinNodeManager {
pub fn new(test_dir: &str) -> GrinNodeManager {
GrinNodeManager {
working_dir: String::from(test_dir),
nodes: vec![],
}
}
pub fn new_node(&mut self) -> Arc<Mutex<IntegrationGrinNode>> {
let server_config = servers::ServerConfig {
api_http_addr: format!("127.0.0.1:{}", 20000 + self.nodes.len()),
api_secret_path: None,
db_root: format!("{}/nodes/{}", self.working_dir, self.nodes.len()),
p2p_config: p2p::P2PConfig {
port: 13414,
seeding_type: p2p::Seeding::None,
..p2p::P2PConfig::default()
},
chain_type: core::global::ChainTypes::AutomatedTesting,
skip_sync_wait: Some(true),
stratum_mining_config: None,
..Default::default()
};
let node = Arc::new(Mutex::new(IntegrationGrinNode {
server_config,
stop_state: Arc::new(StopState::new()),
server: None,
}));
self.nodes.push(node.clone());
node
}
pub fn stop_all(&self) {
for node in &self.nodes {
node.lock().stop();
}
}
}

244
tests/common/server.rs Normal file
View file

@ -0,0 +1,244 @@
use crate::common::node::IntegrationGrinNode;
use crate::common::wallet::{GrinWalletManager, IntegrationGrinWallet};
use grin_core::core::Transaction;
use grin_onion::crypto::comsig::ComSignature;
use grin_onion::crypto::dalek::DalekPublicKey;
use grin_onion::onion::Onion;
use grin_wallet_impls::tor::process::TorProcess;
use mwixnet::client::MixClientImpl;
use mwixnet::{tor, SwapError, SwapServer, SwapStore};
use secp256k1zkp::SecretKey;
use std::iter;
use std::net::TcpListener;
use std::sync::Arc;
use x25519_dalek::{PublicKey as xPublicKey, StaticSecret};
pub struct IntegrationSwapServer {
server_key: SecretKey,
tor_process: TorProcess,
swap_server: Arc<tokio::sync::Mutex<dyn SwapServer>>,
rpc_server: jsonrpc_http_server::Server,
_wallet: Arc<grin_util::Mutex<IntegrationGrinWallet>>,
}
impl IntegrationSwapServer {
pub async fn async_swap(&self, onion: &Onion, comsig: &ComSignature) -> Result<(), SwapError> {
self.swap_server.lock().await.swap(&onion, &comsig).await
}
pub async fn async_execute_round(&self) -> Result<Option<Arc<Transaction>>, SwapError> {
self.swap_server.lock().await.execute_round().await
}
}
pub struct IntegrationMixServer {
server_key: SecretKey,
tor_process: TorProcess,
rpc_server: jsonrpc_http_server::Server,
_wallet: Arc<grin_util::Mutex<IntegrationGrinWallet>>,
}
async fn async_new_swap_server(
data_dir: &str,
rt_handle: &tokio::runtime::Handle,
wallets: &mut GrinWalletManager,
server_key: &SecretKey,
node: &Arc<grin_util::Mutex<IntegrationGrinNode>>,
next_server: Option<&IntegrationMixServer>,
) -> IntegrationSwapServer {
let wallet = wallets.async_new_wallet(&node.lock().api_address()).await;
let server_config = mwixnet::ServerConfig {
key: server_key.clone(),
interval_s: 15,
addr: TcpListener::bind("127.0.0.1:0")
.unwrap()
.local_addr()
.unwrap(),
socks_proxy_addr: TcpListener::bind("127.0.0.1:0")
.unwrap()
.local_addr()
.unwrap(),
grin_node_url: node.lock().api_address(),
grin_node_secret_path: None,
wallet_owner_url: wallet.lock().owner_address(),
wallet_owner_secret_path: None,
prev_server: None,
next_server: match next_server {
Some(s) => Some(DalekPublicKey::from_secret(&s.server_key)),
None => None,
},
};
// Open SwapStore
let store = SwapStore::new(format!("{}/db", data_dir).as_str()).unwrap();
let tor_process = tor::init_tor_listener(&data_dir, &server_config).unwrap();
let (swap_server, rpc_server) = mwixnet::swap_listen(
rt_handle,
&server_config,
match next_server {
Some(s) => Some(Arc::new(MixClientImpl::new(
server_config.clone(),
DalekPublicKey::from_secret(&s.server_key),
))),
None => None,
},
wallet.lock().get_client(),
node.lock().to_client(),
store,
)
.unwrap();
IntegrationSwapServer {
server_key: server_key.clone(),
tor_process,
swap_server,
rpc_server,
_wallet: wallet,
}
}
async fn async_new_mix_server(
data_dir: &str,
rt_handle: &tokio::runtime::Handle,
wallets: &mut GrinWalletManager,
server_key: &SecretKey,
node: &Arc<grin_util::Mutex<IntegrationGrinNode>>,
prev_server: DalekPublicKey,
next_server: Option<&IntegrationMixServer>,
) -> IntegrationMixServer {
let wallet = wallets.async_new_wallet(&node.lock().api_address()).await;
let server_config = mwixnet::ServerConfig {
key: server_key.clone(),
interval_s: 15,
addr: TcpListener::bind("127.0.0.1:0")
.unwrap()
.local_addr()
.unwrap(),
socks_proxy_addr: TcpListener::bind("127.0.0.1:0")
.unwrap()
.local_addr()
.unwrap(),
grin_node_url: node.lock().api_address(),
grin_node_secret_path: None,
wallet_owner_url: wallet.lock().owner_address(),
wallet_owner_secret_path: None,
prev_server: Some(prev_server),
next_server: match next_server {
Some(s) => Some(DalekPublicKey::from_secret(&s.server_key)),
None => None,
},
};
let tor_process = tor::init_tor_listener(&data_dir, &server_config).unwrap();
let (_, rpc_server) = mwixnet::mix_listen(
rt_handle,
server_config.clone(),
match next_server {
Some(s) => Some(Arc::new(MixClientImpl::new(
server_config.clone(),
DalekPublicKey::from_secret(&s.server_key),
))),
None => None,
},
wallet.lock().get_client(),
node.lock().to_client(),
)
.unwrap();
IntegrationMixServer {
server_key: server_key.clone(),
tor_process,
rpc_server,
_wallet: wallet,
}
}
pub struct Servers {
pub swapper: IntegrationSwapServer,
pub mixers: Vec<IntegrationMixServer>,
}
impl Servers {
pub async fn async_setup(
test_dir: &str,
rt_handle: &tokio::runtime::Handle,
wallets: &mut GrinWalletManager,
node: &Arc<grin_util::Mutex<IntegrationGrinNode>>,
num_mixers: usize,
) -> Servers {
// Pre-generate all server keys
let server_keys: Vec<SecretKey> =
iter::repeat_with(|| grin_onion::crypto::secp::random_secret())
.take(num_mixers + 1)
.collect();
// Build mixers in reverse order
let mut mixers = Vec::new();
for i in (0..num_mixers).rev() {
let mix_server = async_new_mix_server(
format!("{}/mixers/{}", test_dir, i).as_str(),
rt_handle,
wallets,
&server_keys[i + 1],
&node,
DalekPublicKey::from_secret(&server_keys[i]),
mixers.last(),
)
.await;
println!(
"Mixer {}: server_key={}, prev_server={}, next_server={}",
i,
DalekPublicKey::from_secret(&server_keys[i + 1]).to_hex(),
DalekPublicKey::from_secret(&server_keys[i]).to_hex(),
match mixers.last() {
Some(s) => DalekPublicKey::from_secret(&s.server_key).to_hex(),
None => "NONE".to_string(),
},
);
mixers.push(mix_server);
}
mixers.reverse();
let swapper = async_new_swap_server(
format!("{}/swapper", test_dir).as_str(),
rt_handle,
wallets,
&server_keys[0],
&node,
mixers.first(),
)
.await;
println!(
"Swapper: server_key={}",
DalekPublicKey::from_secret(&server_keys[0]).to_hex()
);
Servers { swapper, mixers }
}
pub fn get_pub_keys(&self) -> Vec<xPublicKey> {
let mut pub_keys = vec![xPublicKey::from(&StaticSecret::from(
self.swapper.server_key.0.clone(),
))];
for mixer in &self.mixers {
pub_keys.push(xPublicKey::from(&StaticSecret::from(
mixer.server_key.0.clone(),
)))
}
pub_keys
}
pub fn stop_all(&mut self) {
self.swapper.rpc_server.close_handle().close();
self.swapper.tor_process.kill().unwrap();
self.mixers.iter_mut().for_each(|mixer| {
mixer.rpc_server.close_handle().close();
mixer.tor_process.kill().unwrap();
});
}
}

24
tests/common/types.rs Normal file
View file

@ -0,0 +1,24 @@
use grin_core::libtx::secp_ser;
use grin_keychain::Identifier;
use serde_derive::{Deserialize, Serialize};
/// Fees in block to use for coinbase amount calculation
/// (Duplicated from Grin wallet project)
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct BlockFees {
/// fees
#[serde(with = "secp_ser::string_or_u64")]
pub fees: u64,
/// height
#[serde(with = "secp_ser::string_or_u64")]
pub height: u64,
/// key id
pub key_id: Option<Identifier>,
}
impl BlockFees {
/// return key id
pub fn key_id(&self) -> Option<Identifier> {
self.key_id.clone()
}
}

431
tests/common/wallet.rs Normal file
View file

@ -0,0 +1,431 @@
use crate::common::types::BlockFees;
use grin_api::client;
use grin_api::json_rpc::Response;
use grin_core::core::{FeeFields, Output, OutputFeatures, Transaction, TxKernel};
use grin_core::global::ChainTypes;
use grin_core::libtx::tx_fee;
use grin_keychain::{BlindingFactor, ExtKeychain, Identifier, Keychain, SwitchCommitmentType};
use grin_onion::crypto::comsig::ComSignature;
use grin_onion::onion::Onion;
use grin_onion::Hop;
use grin_util::{Mutex, ToHex, ZeroingString};
use grin_wallet_api::Owner;
use grin_wallet_config::WalletConfig;
use grin_wallet_controller::controller;
use grin_wallet_impls::{DefaultLCProvider, DefaultWalletImpl, HTTPNodeClient};
use grin_wallet_libwallet::{InitTxArgs, Slate, VersionedSlate, WalletInfo, WalletInst};
use log::error;
use mwixnet::wallet::HttpWallet;
use secp256k1zkp::pedersen::Commitment;
use secp256k1zkp::{Secp256k1, SecretKey};
use serde_derive::{Deserialize, Serialize};
use serde_json::json;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;
use std::thread;
use x25519_dalek::PublicKey as xPublicKey;
/// Response to build a coinbase output.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct CbData {
/// Output
pub output: Output,
/// Kernel
pub kernel: TxKernel,
/// Key Id
pub key_id: Option<Identifier>,
}
pub struct IntegrationGrinWallet {
wallet: Arc<
Mutex<
Box<
dyn WalletInst<
'static,
DefaultLCProvider<'static, HTTPNodeClient, ExtKeychain>,
HTTPNodeClient,
ExtKeychain,
>,
>,
>,
>,
api_listen_port: u16,
owner_api: Arc<
Owner<DefaultLCProvider<'static, HTTPNodeClient, ExtKeychain>, HTTPNodeClient, ExtKeychain>,
>,
http_client: Arc<HttpWallet>,
}
impl IntegrationGrinWallet {
pub async fn async_new_wallet(
wallet_dir: String,
api_listen_port: u16,
node_api: String,
) -> IntegrationGrinWallet {
let node_client = HTTPNodeClient::new(&node_api, None).unwrap();
let mut wallet = Box::new(
DefaultWalletImpl::<'static, HTTPNodeClient>::new(node_client.clone()).unwrap(),
)
as Box<
dyn WalletInst<
'static,
DefaultLCProvider<HTTPNodeClient, ExtKeychain>,
HTTPNodeClient,
ExtKeychain,
>,
>;
// Wallet LifeCycle Provider provides all functions init wallet and work with seeds, etc...
let lc = wallet.lc_provider().unwrap();
let mut wallet_config = WalletConfig::default();
wallet_config.check_node_api_http_addr = node_api.clone();
wallet_config.owner_api_listen_port = Some(api_listen_port);
wallet_config.api_secret_path = None;
wallet_config.data_file_dir = wallet_dir.clone();
// The top level wallet directory should be set manually (in the reference implementation,
// this is provided in the WalletConfig)
let _ = lc.set_top_level_directory(&wallet_config.data_file_dir);
lc.create_config(
&ChainTypes::AutomatedTesting,
"grin-wallet.toml",
Some(wallet_config.clone()),
None,
None,
)
.unwrap();
lc.create_wallet(None, None, 12, ZeroingString::from("pass"), false)
.unwrap();
// Start owner API
let km = Arc::new(Mutex::new(None));
let wallet = Arc::new(Mutex::new(wallet));
let owner_api = Arc::new(Owner::new(wallet.clone(), None));
let address_str = format!("127.0.0.1:{}", api_listen_port);
let owner_addr: SocketAddr = address_str.parse().unwrap();
let thr_wallet = wallet.clone();
let _thread_handle = thread::spawn(move || {
controller::owner_listener(
thr_wallet,
km,
address_str.as_str(),
None,
None,
Some(true),
None,
false,
)
.unwrap()
});
let http_client = Arc::new(
HttpWallet::async_open_wallet(&owner_addr, &None, &ZeroingString::from("pass"))
.await
.unwrap(),
);
IntegrationGrinWallet {
wallet,
api_listen_port,
owner_api,
http_client,
}
}
pub async fn async_retrieve_summary_info(&self) -> Result<WalletInfo, mwixnet::WalletError> {
let params = json!({
"token": self.http_client.clone().get_token(),
"refresh_from_node": true,
"minimum_confirmations": 1
});
let (_, wallet_info): (bool, WalletInfo) = self
.http_client
.clone()
.async_perform_request("retrieve_summary_info", &params)
.await?;
Ok(wallet_info)
}
pub async fn async_send(
&self,
receiving_wallet: &IntegrationGrinWallet,
amount: u64,
) -> Result<Transaction, mwixnet::WalletError> {
let slate = self.async_init_send_tx(amount).await.unwrap();
let slate = receiving_wallet.async_receive_tx(&slate).await.unwrap();
let slate = self.async_finalize_tx(&slate).await.unwrap();
let tx = Slate::from(slate).tx_or_err().unwrap().clone();
Ok(tx)
}
async fn async_init_send_tx(
&self,
amount: u64,
) -> Result<VersionedSlate, mwixnet::WalletError> {
let args = InitTxArgs {
src_acct_name: None,
amount,
minimum_confirmations: 0,
max_outputs: 10,
num_change_outputs: 1,
selection_strategy_is_use_all: false,
..Default::default()
};
let params = json!({
"token": self.http_client.clone().get_token(),
"args": args
});
let slate: VersionedSlate = self
.http_client
.clone()
.async_perform_request("init_send_tx", &params)
.await?;
let params = json!({
"token": self.http_client.clone().get_token(),
"slate": &slate
});
self.http_client
.clone()
.async_perform_request("tx_lock_outputs", &params)
.await?;
Ok(slate)
}
pub async fn async_receive_tx(
&self,
slate: &VersionedSlate,
) -> Result<VersionedSlate, grin_servers::common::types::Error> {
let req_body = json!({
"jsonrpc": "2.0",
"method": "receive_tx",
"id": 1,
"params": [slate, null, null]
});
let res: Response = client::post_async(self.foreign_api().as_str(), &req_body, None)
.await
.map_err(|e| {
let report = format!("Failed to receive tx. Is the wallet listening? {}", e);
error!("{}", report);
grin_servers::common::types::Error::WalletComm(report)
})?;
let parsed: VersionedSlate = res.clone().into_result().map_err(|e| {
let report = format!("Error parsing result: {}", e);
error!("{}", report);
grin_servers::common::types::Error::WalletComm(report)
})?;
Ok(parsed)
}
async fn async_finalize_tx(
&self,
slate: &VersionedSlate,
) -> Result<VersionedSlate, mwixnet::WalletError> {
let params = json!({
"token": self.http_client.clone().get_token(),
"slate": slate
});
self.http_client
.clone()
.async_perform_request("finalize_tx", &params)
.await
}
async fn async_post_tx(
&self,
finalized_slate: &VersionedSlate,
fluff: bool,
) -> Result<VersionedSlate, mwixnet::WalletError> {
let params = json!({
"token": self.http_client.clone().get_token(),
"slate": finalized_slate,
"fluff": fluff
});
self.http_client
.clone()
.async_perform_request("post_tx", &params)
.await
}
/// Call the wallet API to create a coinbase output for the given block_fees.
/// Will retry based on default "retry forever with backoff" behavior.
pub async fn async_create_coinbase(
&self,
block_fees: &BlockFees,
) -> Result<CbData, grin_servers::common::types::Error> {
let req_body = json!({
"jsonrpc": "2.0",
"method": "build_coinbase",
"id": 1,
"params": {
"block_fees": block_fees
}
});
let res: Response = client::post_async(self.foreign_api().as_str(), &req_body, None)
.await
.map_err(|e| {
let report = format!("Failed to get coinbase. Is the wallet listening? {}", e);
error!("{}", report);
grin_servers::common::types::Error::WalletComm(report)
})?;
let parsed: CbData = res.clone().into_result().map_err(|e| {
let report = format!("Error parsing result: {}", e);
error!("{}", report);
grin_servers::common::types::Error::WalletComm(report)
})?;
Ok(parsed)
}
pub fn build_onion(
&self,
commitment: &Commitment,
server_pubkeys: &Vec<xPublicKey>,
) -> Result<(Onion, ComSignature), grin_wallet_libwallet::Error> {
let keychain = self
.wallet
.lock()
.lc_provider()?
.wallet_inst()?
.keychain(self.keychain_mask().as_ref())?;
let (_, outputs) =
self.owner_api
.retrieve_outputs(self.keychain_mask().as_ref(), false, false, None)?;
let mut output = None;
for o in &outputs {
if o.commit == *commitment {
output = Some(o.output.clone());
break;
}
}
if output.is_none() {
return Err(grin_wallet_libwallet::Error::GenericError(String::from(
"output not found",
)));
}
let amount = output.clone().unwrap().value;
let input_blind = keychain.derive_key(
amount,
&output.clone().unwrap().key_id,
SwitchCommitmentType::Regular,
)?;
let fee = tx_fee(1, 1, 1);
let new_amount = amount - (fee * server_pubkeys.len() as u64);
let new_output = self.owner_api.build_output(
self.keychain_mask().as_ref(),
OutputFeatures::Plain,
new_amount,
)?;
let secp = Secp256k1::new();
let mut blind_sum = new_output
.blind
.split(&BlindingFactor::from_secret_key(input_blind.clone()), &secp)?;
let hops = server_pubkeys
.iter()
.enumerate()
.map(|(i, &p)| {
if (i + 1) == server_pubkeys.len() {
Hop {
server_pubkey: p.clone(),
excess: blind_sum.secret_key(&secp).unwrap(),
fee: FeeFields::from(fee as u32),
rangeproof: Some(new_output.output.proof.clone()),
}
} else {
let hop_excess = BlindingFactor::rand(&secp);
blind_sum = blind_sum.split(&hop_excess, &secp).unwrap();
Hop {
server_pubkey: p.clone(),
excess: hop_excess.secret_key(&secp).unwrap(),
fee: FeeFields::from(fee as u32),
rangeproof: None,
}
}
})
.collect();
let onion = grin_onion::create_onion(&commitment, &hops).unwrap();
let comsig = ComSignature::sign(amount, &input_blind, &onion.serialize().unwrap()).unwrap();
Ok((onion, comsig))
}
pub fn owner_api(
&self,
) -> Arc<
Owner<DefaultLCProvider<'static, HTTPNodeClient, ExtKeychain>, HTTPNodeClient, ExtKeychain>,
> {
self.owner_api.clone()
}
pub fn foreign_api(&self) -> String {
format!("http://127.0.0.1:{}/v2/foreign", self.api_listen_port)
}
pub fn owner_address(&self) -> SocketAddr {
SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
self.api_listen_port,
)
}
pub fn keychain_mask(&self) -> Option<SecretKey> {
self.http_client.as_ref().get_token().keychain_mask.clone()
}
pub fn get_client(&self) -> Arc<HttpWallet> {
self.http_client.clone()
}
}
#[allow(dead_code)]
pub struct GrinWalletManager {
// base directory for the server instance
working_dir: String,
wallets: Vec<Arc<Mutex<IntegrationGrinWallet>>>,
}
impl GrinWalletManager {
pub fn new(test_dir: &str) -> GrinWalletManager {
GrinWalletManager {
working_dir: String::from(test_dir),
wallets: vec![],
}
}
pub async fn async_new_wallet(
&mut self,
node_api_addr: &SocketAddr,
) -> Arc<Mutex<IntegrationGrinWallet>> {
let wallet_dir = format!("{}/wallets/{}", self.working_dir, self.wallets.len());
let wallet = Arc::new(Mutex::new(
IntegrationGrinWallet::async_new_wallet(
wallet_dir,
21000 + self.wallets.len() as u16,
format!("http://{}", node_api_addr),
)
.await,
));
self.wallets.push(wallet.clone());
wallet
}
}

135
tests/e2e.rs Normal file
View file

@ -0,0 +1,135 @@
use crate::common::miner::Miner;
use crate::common::node::GrinNodeManager;
use crate::common::server::Servers;
use crate::common::wallet::GrinWalletManager;
use function_name::named;
use grin_core::global;
use grin_util::logger::LoggingConfig;
use log::Level;
use std::ops::Deref;
mod common;
#[macro_use]
extern crate log;
/// Just removes all results from previous runs
fn clean_all_output(test_dir: &str) {
if let Err(e) = remove_dir_all::remove_dir_all(test_dir) {
println!("can't remove output from previous test :{}, may be ok", e);
}
}
fn setup_test(test_name: &str) -> (GrinNodeManager, GrinWalletManager, String) {
let test_dir = format!("./target/tmp/.{}", test_name);
clean_all_output(test_dir.as_str());
let mut logger = LoggingConfig::default();
logger.log_to_file = false;
logger.stdout_log_level = Level::Error;
grin_util::init_logger(Some(logger), None);
global::set_local_chain_type(global::ChainTypes::AutomatedTesting);
global::set_local_accept_fee_base(50_000_000);
global::init_global_chain_type(global::ChainTypes::AutomatedTesting);
let nodes = GrinNodeManager::new(test_dir.as_str());
let wallets = GrinWalletManager::new(test_dir.as_str());
(nodes, wallets, test_dir)
}
#[test]
#[named]
fn integration_test() -> Result<(), Box<dyn std::error::Error>> {
let (mut nodes, mut wallets, test_dir) = setup_test(function_name!());
let mut rt = tokio::runtime::Builder::new()
.threaded_scheduler()
.enable_all()
.build()?;
// Create node
let node1 = nodes.new_node();
let node1_url = node1.lock().api_address();
let node1_server = node1.lock().start();
// Setup swap & mix servers and their wallets
let rt_handle = rt.handle().clone();
let mut servers = rt.block_on(Servers::async_setup(
test_dir.as_str(),
&rt_handle,
&mut wallets,
&node1,
2usize,
));
rt.block_on(async {
// Setup wallet to use with miner
let mining_wallet = wallets.async_new_wallet(&node1_url).await;
// Mine enough blocks to have spendable coins
let miner = Miner::new(node1_server.chain.clone());
miner
.async_mine_empty_blocks(&mining_wallet, 5 + global::coinbase_maturity() as usize)
.await;
// Setup wallets for swap users
let user1_wallet = wallets.async_new_wallet(&node1_url).await;
let user2_wallet = wallets.async_new_wallet(&node1_url).await;
// Send from mining_wallet to user1_wallet
let tx1 = mining_wallet
.lock()
.async_send(user1_wallet.lock().deref(), 10_000_000_000)
.await
.unwrap();
let tx2 = mining_wallet
.lock()
.async_send(user2_wallet.lock().deref(), 20_000_000_000)
.await
.unwrap();
miner
.async_mine_next_block(&mining_wallet, &vec![tx1, tx2])
.await;
let user1_km = user1_wallet.lock().keychain_mask();
let (_, outputs) = user1_wallet
.lock()
.owner_api()
.retrieve_outputs(user1_km.as_ref(), false, false, None)
.unwrap();
assert_eq!(outputs.len(), 1);
for output in &outputs {
let (onion, comsig) = user1_wallet
.lock()
.build_onion(&output.commit, &servers.get_pub_keys())
.unwrap();
servers.swapper.async_swap(&onion, &comsig).await.unwrap();
}
let mining_wallet_info = mining_wallet
.lock()
.async_retrieve_summary_info()
.await
.unwrap();
println!("Mining wallet: {:?}", mining_wallet_info);
let user1_wallet_info = user1_wallet
.lock()
.async_retrieve_summary_info()
.await
.unwrap();
println!("User1 wallet: {:?}", user1_wallet_info);
let user2_wallet_info = user2_wallet
.lock()
.async_retrieve_summary_info()
.await
.unwrap();
println!("User2 wallet: {:?}", user2_wallet_info);
let _tx = servers.swapper.async_execute_round().await.unwrap();
});
servers.stop_all();
nodes.stop_all();
Ok(())
}