replace TorProcess with rust-native arti client and upgrade to tokio1

This commit is contained in:
scilio 2024-04-16 11:16:53 -04:00
parent df2cb31258
commit 75f2bdf2ba
21 changed files with 5663 additions and 2432 deletions

3373
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -9,7 +9,9 @@ edition = "2021"
members = ["onion"]
[dependencies]
async-std = { version = "1", features = ["tokio02"] }
arti-client = { version = "0.17.0", default-features = false, features = ["async-std", "rustls", "onion-service-client", "onion-service-service"] }
arti-hyper = "0.17.0"
async-std = { version = "1", features = ["tokio1"] }
async-trait = "0.1.74"
blake2 = { package = "blake2-rfc", version = "0.2" }
byteorder = "1"
@ -18,20 +20,19 @@ chacha20 = "0.9.1"
chrono = "0.4.31"
clap = { version = "2.33", features = ["yaml"] }
ctrlc = { version = "3.1", features = ["termination"] }
curve25519-dalek = "2.1"
curve25519-dalek = "4.1.2"
dirs = "2.0"
ed25519-dalek = "1.0.1"
ed25519-dalek = "2.1.1"
function_name = "0.3.0"
futures = "0.3"
fs-mistrust = "0.7.9"
hmac = { version = "0.12.0", features = ["std"] }
hyper = "0.13"
hyper-proxy = "0.9.1"
hyper-socks2 = "0.5.0"
hyper-timeout = { version = "0.3", features = [] }
hyper = "0.14.28"
hyper-tls = "0.6.0"
itertools = { version = "0.12.0" }
jsonrpc-core = "17.1"
jsonrpc-derive = "17.1"
jsonrpc-http-server = "17.1"
jsonrpc-core = "18.0.0"
jsonrpc-derive = "18.0.0"
jsonrpc-http-server = "18.0.0"
lazy_static = "1"
pbkdf2 = "0.8.0"
rand = "0.7.3"
@ -43,8 +44,16 @@ serde_derive = "1"
serde_json = "1"
sha2 = "0.10.0"
thiserror = "1.0.30"
tokio = { version = "0.2", features = ["full"] }
tls-api = "0.9.0"
tls-api-native-tls = "0.9.0"
tokio = { version = "1.37.0", features = ["full"] }
toml = "0.8.8"
tor-hscrypto = "0.17.0"
tor-hsrproxy = "0.17.0"
tor-hsservice = "0.17.0"
tor-llcrypto = "0.17.0"
tor-keymgr = "0.17.0"
tor-rtcompat = "0.17.0"
x25519-dalek = "0.6.0"
grin_onion = { path = "./onion" }
grin_secp256k1zkp = { version = "0.7.12", features = ["bullet-proof-sizing"] }

View file

@ -1,15 +1,15 @@
pub mod crypto;
pub mod onion;
pub mod util;
use crate::crypto::secp::{random_secret, Commitment, SecretKey};
use crate::onion::{new_stream_cipher, Onion, OnionError, Payload, RawBytes};
use chacha20::cipher::StreamCipher;
use grin_core::core::FeeFields;
use secp256k1zkp::pedersen::RangeProof;
use x25519_dalek::PublicKey as xPublicKey;
use x25519_dalek::{SharedSecret, StaticSecret};
use x25519_dalek::PublicKey as xPublicKey;
use crate::crypto::secp::{Commitment, random_secret, SecretKey};
use crate::onion::{new_stream_cipher, Onion, OnionError, Payload, RawBytes};
pub mod crypto;
pub mod onion;
pub mod util;
#[derive(Clone)]
pub struct Hop {
@ -28,7 +28,7 @@ pub fn new_hop(
Hop {
server_pubkey: xPublicKey::from(&StaticSecret::from(server_key.0.clone())),
excess: hop_excess.clone(),
fee: FeeFields::from(fee as u32),
fee: FeeFields::from(fee),
rangeproof: proof,
}
}
@ -71,7 +71,7 @@ pub fn create_onion(commitment: &Commitment, hops: &Vec<Hop>) -> Result<Onion, O
for i in (0..shared_secrets.len()).rev() {
let mut cipher = new_stream_cipher(&shared_secrets[i])?;
for j in i..shared_secrets.len() {
cipher.apply_keystream(&mut enc_payloads[j]);
cipher.apply_keystream(enc_payloads[j].as_mut_slice());
}
}
@ -84,14 +84,15 @@ pub fn create_onion(commitment: &Commitment, hops: &Vec<Hop>) -> Result<Onion, O
}
pub mod test_util {
use super::*;
use grin_core::core::hash::Hash;
use grin_util::ToHex;
use rand::{RngCore, thread_rng};
use secp256k1zkp::Secp256k1;
use crate::crypto::dalek::DalekPublicKey;
use crate::crypto::secp;
use grin_core::core::hash::Hash;
use grin_util::ToHex;
use rand::{thread_rng, RngCore};
use secp256k1zkp::Secp256k1;
use super::*;
pub fn rand_onion() -> Onion {
let commit = rand_commit();
@ -116,20 +117,20 @@ pub mod test_util {
}
pub fn rand_commit() -> Commitment {
secp::commit(rand::thread_rng().next_u64(), &secp::random_secret()).unwrap()
secp::commit(thread_rng().next_u64(), &random_secret()).unwrap()
}
pub fn rand_hash() -> Hash {
Hash::from_hex(secp::random_secret().to_hex().as_str()).unwrap()
Hash::from_hex(random_secret().to_hex().as_str()).unwrap()
}
pub fn rand_proof() -> RangeProof {
let secp = Secp256k1::new();
secp.bullet_proof(
rand::thread_rng().next_u64(),
secp::random_secret(),
secp::random_secret(),
secp::random_secret(),
thread_rng().next_u64(),
random_secret(),
random_secret(),
random_secret(),
None,
None,
)
@ -153,8 +154,8 @@ pub mod test_util {
let rp = secp.bullet_proof(
out_value,
blind.clone(),
secp::random_secret(),
secp::random_secret(),
random_secret(),
random_secret(),
None,
None,
);

View file

@ -1,22 +1,23 @@
use crate::crypto::secp::{self, Commitment, RangeProof, SecretKey};
use crate::util::{read_optional, vec_to_array, write_optional};
use chacha20::cipher::{NewCipher, StreamCipher};
use chacha20::{ChaCha20, Key, Nonce};
use grin_core::core::FeeFields;
use grin_core::ser::{self, Readable, Reader, Writeable, Writer};
use grin_util::{self, ToHex};
use hmac::digest::InvalidLength;
use hmac::{Hmac, Mac};
use serde::ser::SerializeStruct;
use serde::Deserialize;
use sha2::Sha256;
use std::fmt;
use std::hash::{Hash, Hasher};
use std::result::Result;
use chacha20::{ChaCha20, Key, Nonce};
use chacha20::cipher::{NewCipher, StreamCipher};
use grin_core::core::FeeFields;
use grin_core::ser::{self, Readable, Reader, Writeable, Writer};
use grin_util::{self, ToHex};
use hmac::{Hmac, Mac};
use hmac::digest::InvalidLength;
use serde::Deserialize;
use serde::ser::SerializeStruct;
use sha2::Sha256;
use thiserror::Error;
use x25519_dalek::{PublicKey as xPublicKey, SharedSecret, StaticSecret};
use crate::crypto::secp::{self, Commitment, RangeProof, SecretKey};
use crate::util::{read_optional, vec_to_array, write_optional};
type HmacSha256 = Hmac<Sha256>;
pub type RawBytes = Vec<u8>;
@ -328,12 +329,13 @@ impl From<ser::Error> for OnionError {
#[cfg(test)]
pub mod tests {
use super::*;
use crate::crypto::secp::random_secret;
use crate::{create_onion, new_hop, Hop};
use grin_core::core::FeeFields;
use crate::{create_onion, Hop, new_hop};
use crate::crypto::secp::random_secret;
use super::*;
/// Test end-to-end Onion creation and unwrapping logic.
#[test]
fn onion() {
@ -381,7 +383,7 @@ pub mod tests {
let mut payload = Payload {
next_ephemeral_pk: onion_packet.ephemeral_pubkey.clone(),
excess: random_secret(),
fee: FeeFields::from(fee_per_hop as u32),
fee: FeeFields::from(fee_per_hop),
rangeproof: None,
};
for i in 0..5 {
@ -393,6 +395,6 @@ pub mod tests {
assert!(payload.rangeproof.is_some());
assert_eq!(payload.rangeproof.unwrap(), hops[4].rangeproof.unwrap());
assert_eq!(secp::commit(out_value, &final_blind).unwrap(), final_commit);
assert_eq!(payload.fee, FeeFields::from(fee_per_hop as u32));
assert_eq!(payload.fee, FeeFields::from(fee_per_hop));
}
}

View file

@ -1,28 +1,30 @@
use mwixnet::config::{self, ServerConfig};
use mwixnet::node::HttpGrinNode;
use mwixnet::servers;
use mwixnet::store::SwapStore;
use mwixnet::tor;
use mwixnet::wallet::HttpWallet;
#[macro_use]
extern crate clap;
use clap::App;
use grin_core::global;
use grin_core::global::ChainTypes;
use grin_onion::crypto;
use grin_onion::crypto::dalek::DalekPublicKey;
use grin_util::{StopState, ZeroingString};
use mwixnet::client::{MixClient, MixClientImpl};
use mwixnet::node::GrinNode;
use mwixnet::store::StoreError;
use rand::{thread_rng, Rng};
use rpassword;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::thread::{sleep, spawn};
use std::time::Duration;
#[macro_use]
extern crate clap;
use clap::App;
use grin_core::global;
use grin_core::global::ChainTypes;
use grin_util::{StopState, ZeroingString};
use rand::{Rng, thread_rng};
use rpassword;
use tor_rtcompat::PreferredRuntime;
use grin_onion::crypto;
use grin_onion::crypto::dalek::DalekPublicKey;
use mwixnet::config::{self, ServerConfig};
use mwixnet::mix_client::{MixClient, MixClientImpl};
use mwixnet::node::GrinNode;
use mwixnet::node::HttpGrinNode;
use mwixnet::servers;
use mwixnet::store::StoreError;
use mwixnet::store::SwapStore;
use mwixnet::tor;
use mwixnet::wallet::HttpWallet;
const DEFAULT_INTERVAL: u32 = 12 * 60 * 60;
@ -54,7 +56,6 @@ fn real_main() -> Result<(), Box<dyn std::error::Error>> {
.value_of("round_time")
.map(|t| t.parse::<u32>().unwrap());
let bind_addr = args.value_of("bind_addr");
let socks_addr = args.value_of("socks_addr");
let grin_node_url = args.value_of("grin_node_url");
let grin_node_secret_path = args.value_of("grin_node_secret_path");
let wallet_owner_url = args.value_of("wallet_owner_url");
@ -79,7 +80,6 @@ fn real_main() -> Result<(), Box<dyn std::error::Error>> {
key: crypto::secp::random_secret(),
interval_s: round_time.unwrap_or(DEFAULT_INTERVAL),
addr: bind_addr.unwrap_or("127.0.0.1:3000").parse()?,
socks_proxy_addr: socks_addr.unwrap_or("127.0.0.1:3001").parse()?,
grin_node_url: match grin_node_url {
Some(u) => u.parse()?,
None => config::grin_node_url(&chain_type),
@ -141,11 +141,6 @@ fn real_main() -> Result<(), Box<dyn std::error::Error>> {
server_config.addr = bind_addr.parse()?;
}
// Override socks_addr, if supplied
if let Some(socks_addr) = socks_addr {
server_config.socks_proxy_addr = socks_addr.parse()?;
}
// Override prev_server, if supplied
if let Some(prev_server) = prev_server {
server_config.prev_server = Some(prev_server);
@ -163,8 +158,7 @@ fn real_main() -> Result<(), Box<dyn std::error::Error>> {
);
// Node API health check
let mut rt = tokio::runtime::Builder::new()
.threaded_scheduler()
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
if let Err(e) = rt.block_on(node.async_get_chain_tip()) {
@ -187,23 +181,29 @@ fn real_main() -> Result<(), Box<dyn std::error::Error>> {
}
};
let mut tor_process = tor::init_tor_listener(
let tor_instance = rt.block_on(tor::async_init_tor(
PreferredRuntime::current().unwrap(),
&config::get_grin_path(&chain_type).to_str().unwrap(),
&server_config,
)?;
))?;
let tor_instance = Arc::new(grin_util::Mutex::new(tor_instance));
let tor_clone = tor_instance.clone();
let stop_state = Arc::new(StopState::new());
let stop_state_clone = stop_state.clone();
rt.spawn(async move {
futures::executor::block_on(build_signals_fut());
let _ = tor_process.kill();
tor_clone.lock().stop();
stop_state_clone.stop();
});
let next_mixer: Option<Arc<dyn MixClient>> = server_config.next_server.clone().map(|pk| {
let client: Arc<dyn MixClient> =
Arc::new(MixClientImpl::new(server_config.clone(), pk.clone()));
let client: Arc<dyn MixClient> = Arc::new(MixClientImpl::new(
server_config.clone(),
tor_instance.clone(),
pk.clone(),
));
client
});
@ -302,7 +302,7 @@ fn real_main() -> Result<(), Box<dyn std::error::Error>> {
}; // Lock is dropped here
if let Some(tx) = tx_option {
let result = server_clone.lock().await.check_reorg(tx).await;
let result = server_clone.lock().await.check_reorg(&tx).await;
let mut prev_tx_lock = prev_tx_clone.lock().unwrap();
*prev_tx_lock = match result {
Ok(Some(tx)) => Some(tx),

View file

@ -38,10 +38,6 @@ args:
help: Address to bind the rpc server to (e.g. 127.0.0.1:3000)
long: bind_addr
takes_value: true
- socks_addr:
help: Address to bind the SOCKS5 tor proxy to (e.g. 127.0.0.1:3001)
long: socks_addr
takes_value: true
- prev_server:
help: Hex public key of the previous swap/mix server
long: prev_server

View file

@ -1,20 +1,21 @@
use grin_onion::crypto::dalek::DalekPublicKey;
use grin_onion::crypto::secp::SecretKey;
use core::num::NonZeroU32;
use grin_core::global::ChainTypes;
use grin_util::{file, ToHex, ZeroingString};
use grin_wallet_util::OnionV3Address;
use rand::{thread_rng, Rng};
use ring::{aead, pbkdf2};
use serde_derive::{Deserialize, Serialize};
use std::fs::File;
use std::io::prelude::*;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::result::Result;
use grin_core::global::ChainTypes;
use grin_util::{file, ToHex, ZeroingString};
use grin_wallet_util::OnionV3Address;
use rand::{Rng, thread_rng};
use ring::{aead, pbkdf2};
use serde_derive::{Deserialize, Serialize};
use thiserror::Error;
use grin_onion::crypto::dalek::DalekPublicKey;
use grin_onion::crypto::secp::SecretKey;
const GRIN_HOME: &str = ".grin";
const NODE_API_SECRET_FILE_NAME: &str = ".api_secret";
const WALLET_OWNER_API_SECRET_FILE_NAME: &str = ".owner_api_secret";
@ -28,8 +29,6 @@ pub struct ServerConfig {
pub interval_s: u32,
/// socket address the server listener should bind to
pub addr: SocketAddr,
/// socket address the tor sender should bind to
pub socks_proxy_addr: SocketAddr,
/// foreign api address of the grin node
pub grin_node_url: SocketAddr,
/// path to file containing api secret for the grin node
@ -181,7 +180,6 @@ struct RawConfig {
nonce: String,
interval_s: u32,
addr: SocketAddr,
socks_proxy_addr: SocketAddr,
grin_node_url: SocketAddr,
grin_node_secret_path: Option<String>,
wallet_owner_url: SocketAddr,
@ -206,7 +204,6 @@ pub fn write_config(
nonce: encrypted.nonce,
interval_s: server_config.interval_s,
addr: server_config.addr,
socks_proxy_addr: server_config.socks_proxy_addr,
grin_node_url: server_config.grin_node_url,
grin_node_secret_path: server_config.grin_node_secret_path.clone(),
wallet_owner_url: server_config.wallet_owner_url,
@ -243,7 +240,6 @@ pub fn load_config(
key: secret_key,
interval_s: raw_config.interval_s,
addr: raw_config.addr,
socks_proxy_addr: raw_config.socks_proxy_addr,
grin_node_url: raw_config.grin_node_url,
grin_node_secret_path: raw_config.grin_node_secret_path,
wallet_owner_url: raw_config.wallet_owner_url,
@ -254,10 +250,7 @@ pub fn load_config(
}
pub fn get_grin_path(chain_type: &ChainTypes) -> PathBuf {
let mut grin_path = match dirs::home_dir() {
Some(p) => p,
None => PathBuf::new(),
};
let mut grin_path = dirs::home_dir().unwrap_or_else(|| PathBuf::new());
grin_path.push(GRIN_HOME);
grin_path.push(chain_type.shortname());
grin_path
@ -289,10 +282,12 @@ pub fn wallet_owner_url(_chain_type: &ChainTypes) -> SocketAddr {
#[cfg(test)]
pub mod test_util {
use crate::config::ServerConfig;
use std::net::TcpListener;
use grin_onion::crypto::dalek::DalekPublicKey;
use secp256k1zkp::SecretKey;
use std::net::TcpListener;
use crate::config::ServerConfig;
pub fn local_config(
server_key: &SecretKey,
@ -303,7 +298,6 @@ pub mod test_util {
key: server_key.clone(),
interval_s: 1,
addr: TcpListener::bind("127.0.0.1:0")?.local_addr()?,
socks_proxy_addr: TcpListener::bind("127.0.0.1:0")?.local_addr()?,
grin_node_url: "127.0.0.1:3413".parse()?,
grin_node_secret_path: None,
wallet_owner_url: "127.0.0.1:3420".parse()?,
@ -317,9 +311,10 @@ pub mod test_util {
#[cfg(test)]
mod tests {
use super::*;
use grin_onion::crypto::secp;
use super::*;
#[test]
fn server_key_encrypt() {
let password = ZeroingString::from("password");

127
src/http.rs Normal file
View file

@ -0,0 +1,127 @@
use std::time::Duration;
use grin_api::json_rpc;
use grin_util::to_base64;
use grin_wallet_api::{EncryptedRequest, EncryptedResponse, JsonId};
use hyper::body::Body as HyperBody;
use hyper::header::{ACCEPT, AUTHORIZATION, CONTENT_TYPE, USER_AGENT};
use hyper::Request;
use serde_json::json;
use thiserror::Error;
use secp256k1zkp::SecretKey;
/// Error types for HTTP client connections
#[derive(Error, Debug)]
pub enum HttpError {
#[error("Error decrypting response")]
DecryptResponseError(),
#[error("Hyper HTTP error: {0:?}")]
HyperHttpError(hyper::http::Error),
#[error("Hyper request failed with error: {0:?}")]
RequestFailed(hyper::Error),
#[error("Error with response body: {0:?}")]
ResponseBodyError(hyper::Error),
#[error("Error deserializing JSON response: {0:?}")]
ResponseJsonError(serde_json::Error),
#[error("Error decoding JSON-RPC response: {0:?}")]
ResponseParseError(json_rpc::Error),
#[error("Wrong response code: {0}")]
ResponseStatusError(hyper::StatusCode),
}
pub async fn async_send_enc_request<D: serde::de::DeserializeOwned>(
url: &String,
api_secret: &Option<String>,
method: &str,
params: &serde_json::Value,
shared_key: &SecretKey,
) -> Result<D, HttpError> {
let req = json!({
"method": method,
"params": params,
"id": JsonId::IntId(1),
"jsonrpc": "2.0",
});
let enc_req = EncryptedRequest::from_json(&JsonId::IntId(1), &req, &shared_key).unwrap();
let req = build_request(&url, &api_secret, serde_json::to_string(&enc_req).unwrap())?;
let response_str = send_request_async(req).await?;
let enc_res: EncryptedResponse =
serde_json::from_str(&response_str).map_err(HttpError::ResponseJsonError)?;
let decrypted = enc_res
.decrypt(&shared_key)
.map_err(|_| HttpError::DecryptResponseError())?;
let response: json_rpc::Response =
serde_json::from_value(decrypted).map_err(HttpError::ResponseJsonError)?;
let parsed = response
.clone()
.into_result()
.map_err(HttpError::ResponseParseError)?;
Ok(parsed)
}
pub async fn async_send_json_request<D: serde::de::DeserializeOwned>(
url: &String,
api_secret: &Option<String>,
method: &str,
params: &serde_json::Value,
) -> Result<D, HttpError> {
let req_body = json!({
"method": method,
"params": params,
"id": 1,
"jsonrpc": "2.0",
});
let req = build_request(&url, &api_secret, serde_json::to_string(&req_body).unwrap())?;
let data = send_request_async(req).await?;
let ser: json_rpc::Response =
serde_json::from_str(&data).map_err(HttpError::ResponseJsonError)?;
let parsed = ser
.clone()
.into_result()
.map_err(HttpError::ResponseParseError)?;
Ok(parsed)
}
pub fn build_request(
url: &String,
api_secret: &Option<String>,
req_body: String,
) -> Result<Request<HyperBody>, HttpError> {
let mut req_builder = hyper::Request::builder();
if let Some(api_secret) = api_secret {
let basic_auth = format!("Basic {}", to_base64(&format!("grin:{}", api_secret)));
req_builder = req_builder.header(AUTHORIZATION, basic_auth);
}
req_builder
.method(hyper::Method::POST)
.uri(url)
.header(USER_AGENT, "grin-client")
.header(ACCEPT, "application/json")
.header(CONTENT_TYPE, "application/json")
.body(HyperBody::from(req_body))
.map_err(HttpError::HyperHttpError)
}
async fn send_request_async(req: Request<HyperBody>) -> Result<String, HttpError> {
let client = hyper::Client::builder()
.pool_idle_timeout(Duration::from_secs(30))
.build_http();
let resp = client
.request(req)
.await
.map_err(HttpError::RequestFailed)?;
if !resp.status().is_success() {
return Err(HttpError::ResponseStatusError(resp.status()));
}
let raw = hyper::body::to_bytes(resp)
.await
.map_err(HttpError::ResponseBodyError)?;
Ok(String::from_utf8_lossy(&raw).to_string())
}

View file

@ -1,8 +1,9 @@
#[macro_use]
extern crate log;
pub mod client;
pub mod config;
pub mod http;
pub mod mix_client;
pub mod node;
pub mod servers;
pub mod store;
@ -10,8 +11,8 @@ pub mod tor;
pub mod tx;
pub mod wallet;
pub use client::MixClient;
pub use config::ServerConfig;
pub use mix_client::MixClient;
pub use node::{GrinNode, HttpGrinNode, NodeError};
pub use servers::mix::{MixError, MixServer};
pub use servers::mix_rpc::listen as mix_listen;

View file

@ -1,27 +1,30 @@
use crate::config::ServerConfig;
use crate::servers::mix_rpc::{MixReq, MixResp};
use crate::tor;
use grin_onion::crypto::dalek::{self, DalekPublicKey};
use grin_onion::onion::Onion;
use std::sync::Arc;
use async_trait::async_trait;
use grin_api::json_rpc::{build_request, Response};
use grin_core::ser;
use grin_core::ser::ProtocolVersion;
use grin_wallet_util::OnionV3Address;
use hyper::client::HttpConnector;
use hyper::header::{ACCEPT, CONTENT_TYPE, USER_AGENT};
use hyper_socks2::SocksConnector;
use serde_json;
use serde_json::json;
use thiserror::Error;
use tor_rtcompat::Runtime;
use grin_onion::crypto::dalek::{self, DalekPublicKey};
use grin_onion::onion::Onion;
use crate::config::ServerConfig;
use crate::servers::mix_rpc::{MixReq, MixResp};
use crate::tor::TorService;
use crate::{http, tor};
/// Error types for interacting with nodes
#[derive(Error, Debug)]
pub enum ClientError {
pub enum MixClientError {
#[error("Tor Error: {0:?}")]
Tor(tor::TorError),
#[error("API Error: {0:?}")]
API(grin_api::Error),
#[error("Communication Error: {0:?}")]
CommError(http::HttpError),
#[error("Dalek Error: {0:?}")]
Dalek(dalek::DalekError),
#[error("Error decoding JSON response: {0:?}")]
@ -36,18 +39,23 @@ pub enum ClientError {
#[async_trait]
pub trait MixClient: Send + Sync {
/// Swaps the outputs provided and returns the final swapped outputs and kernels.
async fn mix_outputs(&self, onions: &Vec<Onion>) -> Result<MixResp, ClientError>;
async fn mix_outputs(&self, onions: &Vec<Onion>) -> Result<MixResp, MixClientError>;
}
pub struct MixClientImpl {
pub struct MixClientImpl<R: Runtime> {
config: ServerConfig,
tor: Arc<grin_util::Mutex<TorService<R>>>,
addr: OnionV3Address,
}
impl MixClientImpl {
pub fn new(config: ServerConfig, next_pubkey: DalekPublicKey) -> Self {
impl<R: Runtime> MixClientImpl<R> {
pub fn new(
config: ServerConfig,
tor: Arc<grin_util::Mutex<TorService<R>>>,
next_pubkey: DalekPublicKey,
) -> Self {
let addr = OnionV3Address::from_bytes(next_pubkey.as_ref().to_bytes());
MixClientImpl { config, addr }
MixClientImpl { config, tor, addr }
}
async fn async_send_json_request<D: serde::de::DeserializeOwned>(
@ -55,61 +63,29 @@ impl MixClientImpl {
addr: &OnionV3Address,
method: &str,
params: &serde_json::Value,
) -> Result<D, ClientError> {
let proxy = {
let proxy_uri = format!(
"socks5://{}:{}",
self.config.socks_proxy_addr.ip(),
self.config.socks_proxy_addr.port()
)
.parse()
.unwrap();
let mut connector = HttpConnector::new();
connector.enforce_http(false);
let proxy_connector = SocksConnector {
proxy_addr: proxy_uri,
auth: None,
connector,
};
proxy_connector
};
) -> Result<D, MixClientError> {
let url = format!("{}/v1", addr.to_http_str());
let request_str = serde_json::to_string(&build_request(method, params)).unwrap();
let hyper_request =
http::build_request(&url, &None, request_str).map_err(MixClientError::CommError)?;
let body =
hyper::body::Body::from(serde_json::to_string(&build_request(method, params)).unwrap());
let req = hyper::Request::builder()
.method(hyper::Method::POST)
.uri(url)
.header(USER_AGENT, "grin-client")
.header(ACCEPT, "application/json")
.header(CONTENT_TYPE, "application/json")
.body(body)
.map_err(|e| {
ClientError::API(grin_api::Error::RequestError(format!(
"Cannot make request: {}",
e
)))
})?;
let client = hyper::Client::builder().build::<_, hyper::Body>(proxy);
let res = client.request(req).await.unwrap();
let hyper_client = self.tor.lock().new_hyper_client();
let res = hyper_client.request(hyper_request).await.unwrap();
let body_bytes = hyper::body::to_bytes(res.into_body()).await.unwrap();
let res = String::from_utf8(body_bytes.to_vec()).unwrap();
let response: Response =
serde_json::from_str(&res).map_err(ClientError::DecodeResponseError)?;
serde_json::from_str(&res).map_err(MixClientError::DecodeResponseError)?;
if let Some(ref e) = response.error {
return Err(ClientError::ResponseError(e.clone()));
return Err(MixClientError::ResponseError(e.clone()));
}
let result = match response.result.clone() {
Some(r) => serde_json::from_value(r).map_err(ClientError::DecodeResponseError),
Some(r) => serde_json::from_value(r).map_err(MixClientError::DecodeResponseError),
None => serde_json::from_value(serde_json::Value::Null)
.map_err(ClientError::DecodeResponseError),
.map_err(MixClientError::DecodeResponseError),
}?;
Ok(result)
@ -117,34 +93,29 @@ impl MixClientImpl {
}
#[async_trait]
impl MixClient for MixClientImpl {
async fn mix_outputs(&self, onions: &Vec<Onion>) -> Result<MixResp, ClientError> {
impl<R: Runtime> MixClient for MixClientImpl<R> {
async fn mix_outputs(&self, onions: &Vec<Onion>) -> Result<MixResp, MixClientError> {
let serialized = ser::ser_vec(&onions, ProtocolVersion::local()).unwrap();
let sig =
dalek::sign(&self.config.key, serialized.as_slice()).map_err(ClientError::Dalek)?;
// println!(
// "Created sig ({:?}) with public key ({}) for server ({})",
// &sig,
// DalekPublicKey::from_secret(&self.config.key).to_hex(),
// self.config.next_server.as_ref().unwrap().to_hex()
// );
dalek::sign(&self.config.key, serialized.as_slice()).map_err(MixClientError::Dalek)?;
let mix = MixReq::new(onions.clone(), sig);
let params = serde_json::json!([mix]);
self.async_send_json_request::<MixResp>(&self.addr, "mix", &params)
self.async_send_json_request::<MixResp>(&self.addr, "mix", &json!([mix]))
.await
}
}
#[cfg(test)]
pub mod mock {
use super::{ClientError, MixClient};
use std::collections::HashMap;
use async_trait::async_trait;
use grin_onion::onion::Onion;
use crate::servers::mix_rpc::MixResp;
use async_trait::async_trait;
use std::collections::HashMap;
use super::{MixClient, MixClientError};
pub struct MockMixClient {
results: HashMap<Vec<Onion>, MixResp>,
@ -164,27 +135,33 @@ pub mod mock {
#[async_trait]
impl MixClient for MockMixClient {
async fn mix_outputs(&self, onions: &Vec<Onion>) -> Result<MixResp, ClientError> {
async fn mix_outputs(&self, onions: &Vec<Onion>) -> Result<MixResp, MixClientError> {
self.results
.get(onions)
.map(|r| Ok(r.clone()))
.unwrap_or(Err(ClientError::Custom("No response set for input".into())))
.unwrap_or(Err(MixClientError::Custom(
"No response set for input".into(),
)))
}
}
}
#[cfg(test)]
pub mod test_util {
use super::{ClientError, MixClient};
use crate::servers::mix::MixServer;
use crate::servers::mix_rpc::MixResp;
use std::sync::Arc;
use async_trait::async_trait;
use grin_core::ser;
use grin_core::ser::ProtocolVersion;
use grin_onion::crypto::dalek::{self, DalekPublicKey};
use grin_onion::crypto::secp::SecretKey;
use grin_onion::onion::Onion;
use std::sync::Arc;
use crate::servers::mix::MixServer;
use crate::servers::mix_rpc::MixResp;
use super::{MixClient, MixClientError};
/// Implementation of the 'MixClient' trait that calls a mix server implementation directly.
/// No JSON-RPC serialization or socket communication occurs.
@ -196,9 +173,10 @@ pub mod test_util {
#[async_trait]
impl MixClient for DirectMixClient {
async fn mix_outputs(&self, onions: &Vec<Onion>) -> Result<MixResp, ClientError> {
async fn mix_outputs(&self, onions: &Vec<Onion>) -> Result<MixResp, MixClientError> {
let serialized = ser::ser_vec(&onions, ProtocolVersion::local()).unwrap();
let sig = dalek::sign(&self.key, serialized.as_slice()).map_err(ClientError::Dalek)?;
let sig =
dalek::sign(&self.key, serialized.as_slice()).map_err(MixClientError::Dalek)?;
sig.verify(
&DalekPublicKey::from_secret(&self.key),

View file

@ -1,19 +1,20 @@
use grin_onion::crypto::secp::Commitment;
use grin_api::json_rpc::{build_request, Request, Response};
use grin_api::{client, LocatedTxKernel};
use grin_api::{OutputPrintable, OutputType, Tip};
use grin_core::consensus::COINBASE_MATURITY;
use grin_core::core::{Committed, Input, OutputFeatures, Transaction};
use grin_util::ToHex;
use async_trait::async_trait;
use serde_json::json;
use std::net::SocketAddr;
use std::sync::Arc;
use async_trait::async_trait;
use grin_api::LocatedTxKernel;
use grin_api::{OutputPrintable, OutputType, Tip};
use grin_core::consensus::COINBASE_MATURITY;
use grin_core::core::hash::Hash;
use grin_core::core::{Committed, Input, OutputFeatures, Transaction};
use grin_util::ToHex;
use serde_json::json;
use thiserror::Error;
use grin_onion::crypto::secp::Commitment;
use crate::http;
#[async_trait]
pub trait GrinNode: Send + Sync {
/// Retrieves the unspent output with a matching commitment
@ -47,6 +48,8 @@ pub enum NodeError {
DecodeResponseError(serde_json::Error),
#[error("JSON-RPC API communication error: {0:?}")]
ApiCommError(grin_api::Error),
#[error("Client error: {0:?}")]
NodeCommError(http::HttpError),
#[error("Error decoding JSON-RPC response: {0:?}")]
ResponseParseError(grin_api::json_rpc::Error),
}
@ -109,7 +112,10 @@ pub async fn async_build_input(
Ok(None)
}
pub async fn async_is_tx_valid(node: &Arc<dyn GrinNode>, tx: &Transaction) -> Result<bool, NodeError> {
pub async fn async_is_tx_valid(
node: &Arc<dyn GrinNode>,
tx: &Transaction,
) -> Result<bool, NodeError> {
let next_block_height = node.async_get_chain_tip().await?.0 + 1;
for input_commit in &tx.inputs_committed() {
if !async_is_spendable(&node, &input_commit, next_block_height).await? {
@ -149,18 +155,9 @@ impl HttpGrinNode {
params: &serde_json::Value,
) -> Result<D, NodeError> {
let url = format!("http://{}{}", self.node_url, ENDPOINT);
let req = build_request(method, params);
let res = client::post_async::<Request, Response>(
url.as_str(),
&req,
self.node_api_secret.clone(),
)
let parsed = http::async_send_json_request(&url, &self.node_api_secret, &method, &params)
.await
.map_err(NodeError::ApiCommError)?;
let parsed = res
.clone()
.into_result()
.map_err(NodeError::ResponseParseError)?;
.map_err(NodeError::NodeCommError)?;
Ok(parsed)
}
}
@ -202,7 +199,10 @@ impl GrinNode for HttpGrinNode {
let tip =
serde_json::from_value::<Tip>(tip_json).map_err(NodeError::DecodeResponseError)?;
Ok((tip.height, Hash::from_hex(tip.last_block_pushed.as_str()).unwrap()))
Ok((
tip.height,
Hash::from_hex(tip.last_block_pushed.as_str()).unwrap(),
))
}
async fn async_post_tx(&self, tx: &Transaction) -> Result<(), NodeError> {
@ -236,15 +236,17 @@ impl GrinNode for HttpGrinNode {
#[cfg(test)]
pub mod mock {
use super::{GrinNode, NodeError};
use std::collections::HashMap;
use std::sync::RwLock;
use async_trait::async_trait;
use grin_api::{LocatedTxKernel, OutputPrintable, OutputType};
use grin_core::core::Transaction;
use grin_onion::crypto::secp::Commitment;
use std::collections::HashMap;
use std::sync::RwLock;
use grin_core::core::hash::Hash;
use grin_core::core::Transaction;
use grin_onion::crypto::secp::Commitment;
use super::{GrinNode, NodeError};
/// Implementation of 'GrinNode' trait that mocks a grin node instance.
/// Use only for testing purposes.

View file

@ -1,24 +1,26 @@
use crate::client::MixClient;
use crate::config::ServerConfig;
use crate::node::{self, GrinNode};
use crate::tx::{self, TxComponents};
use crate::wallet::Wallet;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use crate::servers::mix_rpc::MixResp;
use async_trait::async_trait;
use futures::stream::{self, StreamExt};
use grin_core::core::{Output, OutputFeatures, TransactionBody};
use grin_core::global::DEFAULT_ACCEPT_FEE_BASE;
use grin_core::ser;
use grin_core::ser::ProtocolVersion;
use itertools::Itertools;
use thiserror::Error;
use grin_onion::crypto::dalek::{self, DalekSignature};
use grin_onion::onion::{Onion, OnionError, PeeledOnion};
use itertools::Itertools;
use secp256k1zkp::key::ZERO_KEY;
use secp256k1zkp::Secp256k1;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use thiserror::Error;
use crate::config::ServerConfig;
use crate::mix_client::MixClient;
use crate::node::{self, GrinNode};
use crate::servers::mix_rpc::MixResp;
use crate::tx::{self, TxComponents};
use crate::wallet::Wallet;
/// Mixer error types
#[derive(Error, Debug)]
@ -46,7 +48,7 @@ pub enum MixError {
#[error("Wallet error: {0:?}")]
WalletError(crate::wallet::WalletError),
#[error("Client comm error: {0:?}")]
Client(crate::client::ClientError),
Client(crate::mix_client::MixClientError),
}
/// An internal MWixnet server - a "Mixer"
@ -298,16 +300,17 @@ impl MixServer for MixServerImpl {
#[cfg(test)]
mod test_util {
use crate::client::test_util::DirectMixClient;
use crate::client::MixClient;
use crate::config;
use crate::node::mock::MockGrinNode;
use crate::servers::mix::MixServerImpl;
use crate::wallet::mock::MockWallet;
use std::sync::Arc;
use grin_onion::crypto::dalek::DalekPublicKey;
use secp256k1zkp::SecretKey;
use std::sync::Arc;
use crate::config;
use crate::mix_client::MixClient;
use crate::mix_client::test_util::DirectMixClient;
use crate::node::mock::MockGrinNode;
use crate::servers::mix::MixServerImpl;
use crate::wallet::mock::MockWallet;
pub fn new_mixer(
server_key: &SecretKey,
@ -340,18 +343,20 @@ mod test_util {
#[cfg(test)]
mod tests {
use crate::client::MixClient;
use crate::node::mock::MockGrinNode;
use std::collections::HashSet;
use std::sync::Arc;
use ::function_name::named;
use grin_onion::{create_onion, Hop, new_hop};
use grin_onion::crypto::dalek::DalekPublicKey;
use grin_onion::crypto::secp::{self, Commitment};
use grin_onion::test_util as onion_test_util;
use grin_onion::{create_onion, new_hop, Hop};
use secp256k1zkp::pedersen::RangeProof;
use secp256k1zkp::SecretKey;
use std::collections::HashSet;
use std::sync::Arc;
use crate::mix_client::MixClient;
use crate::node::mock::MockGrinNode;
macro_rules! init_test {
() => {{

View file

@ -1,19 +1,20 @@
use crate::client::MixClient;
use crate::config::ServerConfig;
use crate::node::GrinNode;
use crate::servers::mix::{MixError, MixServer, MixServerImpl};
use crate::wallet::Wallet;
use std::sync::Arc;
use crate::tx::TxComponents;
use futures::FutureExt;
use jsonrpc_derive::rpc;
use jsonrpc_http_server::{DomainsValidation, ServerBuilder};
use jsonrpc_http_server::jsonrpc_core::{self, BoxFuture, IoHandler};
use serde::{Deserialize, Serialize};
use grin_onion::crypto::dalek::{self, DalekSignature};
use grin_onion::onion::Onion;
use jsonrpc_core::BoxFuture;
use jsonrpc_derive::rpc;
use jsonrpc_http_server::jsonrpc_core::{self as jsonrpc, IoHandler};
use jsonrpc_http_server::{DomainsValidation, ServerBuilder};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use crate::config::ServerConfig;
use crate::mix_client::MixClient;
use crate::node::GrinNode;
use crate::servers::mix::{MixError, MixServer, MixServerImpl};
use crate::tx::TxComponents;
use crate::wallet::Wallet;
#[derive(Serialize, Deserialize)]
pub struct MixReq {
@ -37,7 +38,7 @@ impl MixReq {
#[rpc(server)]
pub trait MixAPI {
#[rpc(name = "mix")]
fn mix(&self, mix: MixReq) -> BoxFuture<jsonrpc::Result<MixResp>>;
fn mix(&self, mix: MixReq) -> BoxFuture<jsonrpc_core::Result<MixResp>>;
}
#[derive(Clone)]
@ -67,14 +68,14 @@ impl RPCMixServer {
}
}
impl From<MixError> for jsonrpc::Error {
impl From<MixError> for jsonrpc_core::Error {
fn from(e: MixError) -> Self {
jsonrpc::Error::invalid_params(e.to_string())
jsonrpc_core::Error::invalid_params(e.to_string())
}
}
impl MixAPI for RPCMixServer {
fn mix(&self, mix: MixReq) -> BoxFuture<jsonrpc::Result<MixResp>> {
fn mix(&self, mix: MixReq) -> BoxFuture<jsonrpc_core::Result<MixResp>> {
let server = self.server.clone();
async move {
let response = server

View file

@ -1,22 +1,24 @@
use crate::client::MixClient;
use crate::config::ServerConfig;
use crate::node::{self, GrinNode};
use crate::store::{StoreError, SwapData, SwapStatus, SwapStore, SwapTx};
use crate::tx;
use crate::wallet::Wallet;
use std::collections::HashSet;
use std::result::Result;
use std::sync::Arc;
use async_trait::async_trait;
use grin_core::core::{Committed, Input, Output, OutputFeatures, Transaction, TransactionBody};
use grin_core::global::DEFAULT_ACCEPT_FEE_BASE;
use itertools::Itertools;
use secp256k1zkp::key::ZERO_KEY;
use thiserror::Error;
use grin_onion::crypto::comsig::ComSignature;
use grin_onion::crypto::secp::{Commitment, Secp256k1, SecretKey};
use grin_onion::onion::{Onion, OnionError};
use itertools::Itertools;
use secp256k1zkp::key::ZERO_KEY;
use std::collections::HashSet;
use std::result::Result;
use std::sync::Arc;
use thiserror::Error;
use crate::config::ServerConfig;
use crate::mix_client::MixClient;
use crate::node::{self, GrinNode};
use crate::store::{StoreError, SwapData, SwapStatus, SwapStore, SwapTx};
use crate::tx;
use crate::wallet::Wallet;
/// Swap error types
#[derive(Clone, Error, Debug, PartialEq)]
@ -82,7 +84,10 @@ pub trait SwapServer: Send + Sync {
/// Verify the previous swap transaction is in the active chain or mempool.
/// If it's not, rebroacast the transaction if it's still valid.
/// If the transaction is no longer valid, perform the swap again.
async fn check_reorg(&self, tx: Arc<Transaction>) -> Result<Option<Arc<Transaction>>, SwapError>;
async fn check_reorg(
&self,
tx: &Arc<Transaction>,
) -> Result<Option<Arc<Transaction>>, SwapError>;
}
/// The standard MWixnet server implementation
@ -142,7 +147,11 @@ impl SwapServerImpl {
false
}
async fn async_execute_round(&self, store: &SwapStore, mut swaps: Vec<SwapData>) -> Result<Option<Arc<Transaction>>, SwapError> {
async fn async_execute_round(
&self,
store: &SwapStore,
mut swaps: Vec<SwapData>,
) -> Result<Option<Arc<Transaction>>, SwapError> {
swaps.sort_by(|a, b| a.output_commit.partial_cmp(&b.output_commit).unwrap());
if swaps.len() == 0 {
@ -215,7 +224,10 @@ impl SwapServerImpl {
let chain_tip = self.node.async_get_chain_tip().await?;
self.node.async_post_tx(&tx).await?;
store.save_swap_tx(&SwapTx { tx: tx.clone(), chain_tip })?;
store.save_swap_tx(&SwapTx {
tx: tx.clone(),
chain_tip,
})?;
// Update status to in process
let kernel_commit = tx.kernels().first().unwrap().excess;
@ -294,7 +306,7 @@ impl SwapServer for SwapServerImpl {
output_commit: peeled.onion.commit,
rangeproof: peeled.payload.rangeproof,
input,
fee: fee as u64,
fee,
onion: peeled.onion,
status: SwapStatus::Unprocessed,
},
@ -327,19 +339,27 @@ impl SwapServer for SwapServerImpl {
self.async_execute_round(&locked_store, swaps).await
}
async fn check_reorg(&self, tx: Arc<Transaction>) -> Result<Option<Arc<Transaction>>, SwapError> {
async fn check_reorg(
&self,
tx: &Arc<Transaction>,
) -> Result<Option<Arc<Transaction>>, SwapError> {
let excess = tx.kernels().first().unwrap().excess;
let locked_store = self.store.lock().await;
if let Ok(swap_tx) = locked_store.get_swap_tx(&excess) {
// If kernel is in active chain, return tx
if self.node.async_get_kernel(&excess, Some(swap_tx.chain_tip.0), None).await?.is_some() {
return Ok(Some(tx));
if self
.node
.async_get_kernel(&excess, Some(swap_tx.chain_tip.0), None)
.await?
.is_some()
{
return Ok(Some(tx.clone()));
}
// If transaction is still valid, rebroadcast and return tx
if node::async_is_tx_valid(&self.node, &tx).await? {
self.node.async_post_tx(&tx).await?;
return Ok(Some(tx));
return Ok(Some(tx.clone()));
}
// Collect all swaps based on tx's inputs, and execute_round with those swaps
@ -362,14 +382,16 @@ impl SwapServer for SwapServerImpl {
#[cfg(test)]
pub mod mock {
use super::{SwapError, SwapServer};
use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use grin_core::core::Transaction;
use grin_onion::crypto::comsig::ComSignature;
use grin_onion::onion::Onion;
use std::collections::HashMap;
use std::sync::Arc;
use super::{SwapError, SwapServer};
pub struct MockSwapServer {
errors: HashMap<Onion, SwapError>,
@ -397,29 +419,33 @@ pub mod mock {
Ok(())
}
async fn execute_round(&self) -> Result<Option<std::sync::Arc<Transaction>>, SwapError> {
async fn execute_round(&self) -> Result<Option<Arc<Transaction>>, SwapError> {
Ok(None)
}
async fn check_reorg(&self, tx: Arc<Transaction>) -> Result<Option<Arc<Transaction>>, SwapError> {
Ok(Some(tx))
async fn check_reorg(
&self,
tx: &Arc<Transaction>,
) -> Result<Option<Arc<Transaction>>, SwapError> {
Ok(Some(tx.clone()))
}
}
}
#[cfg(test)]
pub mod test_util {
use crate::client::MixClient;
use std::sync::Arc;
use grin_onion::crypto::dalek::DalekPublicKey;
use grin_onion::crypto::secp::SecretKey;
use crate::config;
use crate::mix_client::MixClient;
use crate::node::GrinNode;
use crate::servers::swap::SwapServerImpl;
use crate::store::SwapStore;
use crate::wallet::mock::MockWallet;
use grin_onion::crypto::dalek::DalekPublicKey;
use grin_onion::crypto::secp::SecretKey;
use std::sync::Arc;
pub fn new_swapper(
test_dir: &str,
server_key: &SecretKey,
@ -446,7 +472,22 @@ pub mod test_util {
#[cfg(test)]
mod tests {
use crate::client::{self, MixClient};
use std::sync::Arc;
use ::function_name::named;
use grin_core::core::{
Committed, Input, Inputs, Output, OutputFeatures, Transaction, Weighting,
};
use secp256k1zkp::key::ZERO_KEY;
use x25519_dalek::PublicKey as xPublicKey;
use grin_onion::crypto::comsig::ComSignature;
use grin_onion::crypto::secp;
use grin_onion::onion::Onion;
use grin_onion::test_util as onion_test_util;
use grin_onion::{create_onion, new_hop, Hop};
use crate::mix_client::{self, MixClient};
use crate::node::mock::MockGrinNode;
use crate::servers::mix_rpc::MixResp;
use crate::servers::swap::{SwapError, SwapServer};
@ -454,17 +495,6 @@ mod tests {
use crate::tx;
use crate::tx::TxComponents;
use ::function_name::named;
use grin_core::core::{Committed, Input, Inputs, Output, OutputFeatures, Transaction, Weighting};
use grin_onion::crypto::comsig::ComSignature;
use grin_onion::crypto::secp;
use grin_onion::onion::Onion;
use grin_onion::test_util as onion_test_util;
use grin_onion::{create_onion, new_hop, Hop};
use secp256k1zkp::key::ZERO_KEY;
use std::sync::Arc;
use x25519_dalek::PublicKey as xPublicKey;
macro_rules! assert_error_type {
($result:expr, $error_type:pat) => {
assert!($result.is_err());
@ -594,7 +624,7 @@ mod tests {
// Mock mixer
let mixer_onion = onion.peel_layer(&swap_sk)?.onion;
let mut mock_mixer = client::mock::MockMixClient::new();
let mut mock_mixer = mix_client::mock::MockMixClient::new();
let mixer_response = TxComponents {
offset: ZERO_KEY,
outputs: vec![Output::new(
@ -845,12 +875,10 @@ mod tests {
let node: Arc<MockGrinNode> = Arc::new(MockGrinNode::new());
let (server, _) = super::test_util::new_swapper(&test_dir, &server_key, None, node.clone());
let kern = tx::build_kernel(&secp::random_secret(), 1000u64)?;
let tx: Arc<Transaction> = Arc::new(Transaction::new(Inputs::default(), &[], &[kern.clone()]));
let result = server.check_reorg(tx).await;
assert_eq!(
Err(SwapError::SwapTxNotFound(kern.excess())),
result
);
let tx: Arc<Transaction> =
Arc::new(Transaction::new(Inputs::default(), &[], &[kern.clone()]));
let result = server.check_reorg(&tx).await;
assert_eq!(Err(SwapError::SwapTxNotFound(kern.excess())), result);
Ok(())
}

View file

@ -1,20 +1,21 @@
use crate::client::MixClient;
use std::sync::Arc;
use futures::FutureExt;
use jsonrpc_core::{BoxFuture, Value};
use jsonrpc_derive::rpc;
use jsonrpc_http_server::{DomainsValidation, ServerBuilder};
use serde::{Deserialize, Serialize};
use grin_onion::crypto::comsig::{self, ComSignature};
use grin_onion::onion::Onion;
use crate::config::ServerConfig;
use crate::mix_client::MixClient;
use crate::node::GrinNode;
use crate::servers::swap::{SwapError, SwapServer, SwapServerImpl};
use crate::store::SwapStore;
use crate::wallet::Wallet;
use futures::FutureExt;
use grin_onion::crypto::comsig::{self, ComSignature};
use grin_onion::onion::Onion;
use jsonrpc_core::Value;
use jsonrpc_derive::rpc;
use jsonrpc_http_server::jsonrpc_core::*;
use jsonrpc_http_server::{DomainsValidation, ServerBuilder};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
#[derive(Serialize, Deserialize)]
pub struct SwapReq {
onion: Onion,
@ -37,7 +38,7 @@ struct RPCSwapServer {
impl RPCSwapServer {
/// Spin up an instance of the JSON-RPC HTTP server.
fn start_http(&self, runtime_handle: tokio::runtime::Handle) -> jsonrpc_http_server::Server {
let mut io = IoHandler::new();
let mut io = jsonrpc_core::IoHandler::new();
io.extend_with(RPCSwapServer::to_delegate(self.clone()));
ServerBuilder::new(io)
@ -55,15 +56,15 @@ impl RPCSwapServer {
}
}
impl From<SwapError> for Error {
impl From<SwapError> for jsonrpc_core::Error {
fn from(e: SwapError) -> Self {
match e {
SwapError::UnknownError(_) => Error {
SwapError::UnknownError(_) => jsonrpc_core::Error {
message: e.to_string(),
code: ErrorCode::InternalError,
code: jsonrpc_core::ErrorCode::InternalError,
data: None,
},
_ => Error::invalid_params(e.to_string()),
_ => jsonrpc_core::Error::invalid_params(e.to_string()),
}
}
}
@ -115,20 +116,21 @@ pub fn listen(
#[cfg(test)]
mod tests {
use crate::config::ServerConfig;
use crate::servers::swap::mock::MockSwapServer;
use crate::servers::swap::{SwapError, SwapServer};
use crate::servers::swap_rpc::{RPCSwapServer, SwapReq};
use grin_onion::create_onion;
use grin_onion::crypto::comsig::ComSignature;
use grin_onion::crypto::secp;
use std::net::TcpListener;
use std::sync::Arc;
use hyper::{Body, Client, Request, Response};
use tokio::sync::Mutex;
use grin_onion::create_onion;
use grin_onion::crypto::comsig::ComSignature;
use grin_onion::crypto::secp;
use crate::config::ServerConfig;
use crate::servers::swap::{SwapError, SwapServer};
use crate::servers::swap::mock::MockSwapServer;
use crate::servers::swap_rpc::{RPCSwapServer, SwapReq};
async fn body_to_string(req: Response<Body>) -> String {
let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap();
String::from_utf8(body_bytes.to_vec()).unwrap()
@ -144,7 +146,6 @@ mod tests {
key: secp::random_secret(),
interval_s: 1,
addr: TcpListener::bind("127.0.0.1:0")?.local_addr()?,
socks_proxy_addr: TcpListener::bind("127.0.0.1:0")?.local_addr()?,
grin_node_url: "127.0.0.1:3413".parse()?,
grin_node_secret_path: None,
wallet_owner_url: "127.0.0.1:3420".parse()?,
@ -186,8 +187,7 @@ mod tests {
/// Demonstrates a successful swap response
#[test]
fn swap_success() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut rt = tokio::runtime::Builder::new()
.threaded_scheduler()
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
let commitment = secp::commit(1234, &secp::random_secret())?;
@ -214,8 +214,7 @@ mod tests {
#[test]
fn swap_bad_request() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut rt = tokio::runtime::Builder::new()
.threaded_scheduler()
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
let server: Arc<Mutex<dyn SwapServer>> = Arc::new(Mutex::new(MockSwapServer::new()));
@ -235,8 +234,7 @@ mod tests {
/// Returns "Commitment not found" when there's no matching output in the UTXO set.
#[test]
fn swap_utxo_missing() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut rt = tokio::runtime::Builder::new()
.threaded_scheduler()
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;

View file

@ -1,9 +1,5 @@
use grin_core::core::hash::Hash;
use grin_onion::crypto::secp::{self, Commitment, RangeProof, SecretKey};
use grin_onion::onion::Onion;
use grin_onion::util::{read_optional, write_optional};
use grin_core::core::{Input, Transaction};
use grin_core::core::hash::Hash;
use grin_core::ser::{
self, DeserializationMode, ProtocolVersion, Readable, Reader, Writeable, Writer,
};
@ -11,6 +7,10 @@ use grin_store::{self as store, Store};
use grin_util::ToHex;
use thiserror::Error;
use grin_onion::crypto::secp::{self, Commitment, RangeProof, SecretKey};
use grin_onion::onion::Onion;
use grin_onion::util::{read_optional, write_optional};
const DB_NAME: &str = "swap";
const STORE_SUBPATH: &str = "swaps";
@ -191,6 +191,8 @@ pub struct SwapStore {
pub enum StoreError {
#[error("Swap entry already exists for '{0:?}'")]
AlreadyExists(Commitment),
#[error("Entry does not exist for '{0:?}'")]
NotFound(Commitment),
#[error("Error occurred while attempting to open db: {0}")]
OpenError(store::lmdb::Error),
#[error("Serialization error occurred: {0}")]
@ -286,8 +288,12 @@ impl SwapStore {
/// Saves a swap transaction to the database
pub fn save_swap_tx(&self, s: &SwapTx) -> Result<(), StoreError> {
let data = ser::ser_vec(&s, ProtocolVersion::local())?;
self
.write(TX_PREFIX, &s.tx.kernels().first().unwrap().excess, &data, true)
self.write(
TX_PREFIX,
&s.tx.kernels().first().unwrap().excess,
&data,
true,
)
.map_err(StoreError::WriteError)?;
Ok(())
@ -301,13 +307,16 @@ impl SwapStore {
#[cfg(test)]
mod tests {
use crate::store::{StoreError, SwapData, SwapStatus, SwapStore};
use std::cmp::Ordering;
use grin_core::core::{Input, OutputFeatures};
use grin_core::global::{self, ChainTypes};
use rand::RngCore;
use grin_onion::crypto::secp;
use grin_onion::test_util as onion_test_util;
use rand::RngCore;
use std::cmp::Ordering;
use crate::store::{StoreError, SwapData, SwapStatus, SwapStore};
fn new_store(test_name: &str) -> SwapStore {
global::set_local_chain_type(ChainTypes::AutomatedTesting);

View file

@ -1,9 +1,32 @@
use crate::config::ServerConfig;
use std::sync::Arc;
use grin_wallet_impls::tor::config as tor_config;
use grin_wallet_impls::tor::process::TorProcess;
use std::collections::HashMap;
use arti_client::config::TorClientConfigBuilder;
use arti_client::{TorClient, TorClientConfig};
use arti_hyper::ArtiHttpConnector;
use curve25519_dalek::digest::Digest;
use ed25519_dalek::hazmat::ExpandedSecretKey;
use futures::task::SpawnExt;
use sha2::Sha512;
use thiserror::Error;
use tls_api::{TlsConnector as TlsConnectorTrait, TlsConnectorBuilder};
use tls_api_native_tls::TlsConnector;
use tor_hscrypto::pk::{HsIdKey, HsIdKeypair};
use tor_hsrproxy::config::{
Encapsulation, ProxyAction, ProxyConfigBuilder, ProxyPattern, ProxyRule, TargetAddr,
};
use tor_hsrproxy::OnionServiceReverseProxy;
use tor_hsservice::config::OnionServiceConfigBuilder;
use tor_hsservice::{
HsIdKeypairSpecifier, HsIdPublicKeySpecifier, HsNickname, RunningOnionService,
};
use tor_keymgr::key_specifier_derive::internal;
use tor_keymgr::{ArtiNativeKeystore, KeyMgrBuilder, KeystoreSelector};
use tor_llcrypto::pk::ed25519::ExpandedKeypair;
use tor_rtcompat::Runtime;
use secp256k1zkp::SecretKey;
use crate::config::ServerConfig;
/// Tor error types
#[derive(Error, Debug)]
@ -14,93 +37,163 @@ pub enum TorError {
ProcessError(grin_wallet_impls::tor::process::Error),
}
pub fn init_tor_listener(
pub struct TorService<R: Runtime> {
tor_client: Option<TorClient<R>>,
hidden_services: Vec<Arc<RunningOnionService>>,
}
impl<R: Runtime> TorService<R> {
/// Builds a hyper::Client with an ArtiHttpConnector over the TorClient.
/// The returned Client makes HTTP requests through the TorClient directly, eliminating the need for a socks proxy.
pub fn new_hyper_client(
&self,
) -> hyper::Client<ArtiHttpConnector<R, TlsConnector>, hyper::Body> {
let tls_connector = TlsConnector::builder().unwrap().build().unwrap();
let tor_connector = ArtiHttpConnector::new(self.tor_client.clone().unwrap(), tls_connector);
hyper::Client::builder().build::<_, hyper::Body>(tor_connector)
}
pub fn stop(&mut self) {
self.tor_client = None;
self.hidden_services.clear();
}
}
pub async fn async_init_tor<R>(
runtime: R,
data_dir: &str,
server_config: &ServerConfig,
) -> Result<TorProcess, TorError> {
warn!("Initializing tor listener");
) -> Result<TorService<R>, TorError>
where
R: Runtime,
{
warn!("Initializing TOR");
let tor_dir = format!("{}/tor/listener", &data_dir);
trace!(
"Dir: {}, Proxy: {}",
&tor_dir,
server_config.socks_proxy_addr.to_string()
let state_dir = format!("{}/tor/state", &data_dir);
let cache_dir = format!("{}/tor/cache", &data_dir);
let hs_nickname = HsNickname::new("listener".to_string()).unwrap();
let mut client_config_builder =
TorClientConfigBuilder::from_directories(state_dir.clone(), cache_dir.clone());
client_config_builder
.address_filter()
.allow_onion_addrs(true);
let client_config = client_config_builder.build().unwrap();
add_key_to_store(&client_config, &state_dir, &server_config.key, &hs_nickname)?;
let tor_client = TorClient::with_runtime(runtime)
.config(client_config)
.create_bootstrapped()
.await
.unwrap();
let service =
async_launch_hidden_service(hs_nickname.clone(), &tor_client, &server_config).await?;
let tor_instance = TorService {
tor_client: Some(tor_client),
hidden_services: vec![service],
};
Ok(tor_instance)
}
async fn async_launch_hidden_service<R>(
hs_nickname: HsNickname,
tor_client: &TorClient<R>,
server_config: &ServerConfig,
) -> Result<Arc<RunningOnionService>, TorError>
where
R: Runtime,
{
let svc_cfg = OnionServiceConfigBuilder::default()
.nickname(hs_nickname.clone())
.build()
.unwrap();
let (service, request_stream) = tor_client.launch_onion_service(svc_cfg).unwrap();
let proxy_rule = ProxyRule::new(
ProxyPattern::one_port(80).unwrap(),
ProxyAction::Forward(Encapsulation::Simple, TargetAddr::Inet(server_config.addr)),
);
let mut proxy_cfg_builder = ProxyConfigBuilder::default();
proxy_cfg_builder.set_proxy_ports(vec![proxy_rule]);
let proxy = OnionServiceReverseProxy::new(proxy_cfg_builder.build().unwrap());
// create data directory if it doesn't exist
std::fs::create_dir_all(&format!("{}/data", tor_dir)).unwrap();
let service_dir = tor_config::output_onion_service_config(tor_dir.as_str(), &server_config.key)
.map_err(|e| TorError::ConfigError(e))?;
let service_dirs = vec![service_dir.to_string()];
tor_config::output_torrc(
tor_dir.as_str(),
server_config.addr.to_string().as_str(),
&server_config.socks_proxy_addr.to_string(),
&service_dirs,
HashMap::new(),
HashMap::new(),
)
.map_err(|e| TorError::ConfigError(e))?;
// Start TOR process
let mut process = TorProcess::new();
process
.torrc_path("./torrc")
.working_dir(tor_dir.as_str())
.timeout(30)
.completion_percent(100);
let mut attempts = 0;
let max_attempts = 3;
let mut result;
loop {
attempts += 1;
info!("Launching TorProcess... Attempt {}", attempts);
result = process.launch();
if result.is_ok() || attempts >= max_attempts {
break;
{
let proxy = proxy.clone();
let runtime_clone = tor_client.runtime().clone();
tor_client
.runtime()
.spawn(async move {
match proxy
.handle_requests(runtime_clone, hs_nickname.clone(), request_stream)
.await
{
Ok(()) => {
debug!("Onion service {} exited cleanly.", hs_nickname);
}
Err(e) => {
warn!("Onion service {} exited with an error: {}", hs_nickname, e);
}
}
result.map_err(TorError::ProcessError)?;
})
.unwrap();
}
warn!(
"Server listening at http://{}.onion",
server_config.onion_address().to_ov3_str()
);
Ok(process)
Ok(service)
}
pub fn init_tor_sender(
data_dir: &str,
server_config: &ServerConfig,
) -> Result<TorProcess, TorError> {
warn!(
"Starting TOR Process for send at {:?}",
server_config.socks_proxy_addr
// TODO: Add proper error handling
fn add_key_to_store(
tor_config: &TorClientConfig,
state_dir: &String,
secret_key: &SecretKey,
hs_nickname: &HsNickname,
) -> Result<(), TorError> {
let key_store_dir = format!("{}/keystore", &state_dir);
let arti_store =
ArtiNativeKeystore::from_path_and_mistrust(&key_store_dir, &tor_config.fs_mistrust())
.unwrap();
info!("Using keystore from {key_store_dir:?}");
let key_manager = KeyMgrBuilder::default()
.default_store(Box::new(arti_store))
.build()
.map_err(|_| internal!("failed to build keymgr"))
.unwrap();
let expanded_sk = ExpandedSecretKey::from_bytes(
Sha512::default()
.chain_update(secret_key)
.finalize()
.as_ref(),
);
let tor_dir = format!("{}/tor/sender", data_dir);
tor_config::output_tor_sender_config(
tor_dir.as_str(),
&server_config.socks_proxy_addr.to_string(),
HashMap::new(),
HashMap::new(),
)
.map_err(|e| TorError::ConfigError(e))?;
let mut sk_bytes = [0_u8; 64];
sk_bytes[0..32].copy_from_slice(&expanded_sk.scalar.to_bytes());
sk_bytes[32..64].copy_from_slice(&expanded_sk.hash_prefix);
let expanded_kp = ExpandedKeypair::from_secret_key_bytes(sk_bytes).unwrap();
// Start TOR process
let mut tor_process = TorProcess::new();
tor_process
.torrc_path("./torrc")
.working_dir(tor_dir.as_str())
.timeout(40)
.completion_percent(100)
.launch()
.map_err(TorError::ProcessError)?;
Ok(tor_process)
key_manager
.insert(
HsIdKey::from(expanded_kp.public().clone()),
&HsIdPublicKeySpecifier::new(hs_nickname.clone()),
KeystoreSelector::Default,
)
.unwrap();
key_manager
.insert(
HsIdKeypair::from(expanded_kp),
&HsIdKeypairSpecifier::new(hs_nickname.clone()),
KeystoreSelector::Default,
)
.unwrap();
Ok(())
}

View file

@ -1,18 +1,20 @@
use std::net::SocketAddr;
use async_trait::async_trait;
use grin_api::client;
use grin_api::json_rpc::{build_request, Request, Response, RpcError};
use grin_core::core::Output;
use grin_core::libtx::secp_ser;
use grin_keychain::BlindingFactor;
use grin_onion::crypto::secp;
use grin_util::{ToHex, ZeroingString};
use grin_wallet_api::{EncryptedRequest, EncryptedResponse, JsonId, Token};
use secp256k1zkp::{PublicKey, Secp256k1, SecretKey};
use grin_wallet_api::Token;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::net::SocketAddr;
use thiserror::Error;
use grin_onion::crypto::secp;
use secp256k1zkp::{PublicKey, Secp256k1, SecretKey};
use crate::http;
#[async_trait]
pub trait Wallet: Send + Sync {
/// Builds an output for the wallet with the provided amount.
@ -25,18 +27,8 @@ pub trait Wallet: Send + Sync {
/// Error types for interacting with wallets
#[derive(Error, Debug)]
pub enum WalletError {
#[error("Error encrypting request: {0:?}")]
EncryptRequestError(grin_wallet_libwallet::Error),
#[error("Error decrypting response: {0:?}")]
DecryptResponseError(grin_wallet_libwallet::Error),
#[error("Error decoding JSON response: {0:?}")]
DecodeResponseError(serde_json::Error),
#[error("JSON-RPC API communication error: {0:?}")]
ApiCommError(grin_api::Error),
#[error("Error decoding JSON-RPC response: {0:?}")]
ResponseParseError(grin_api::json_rpc::Error),
#[error("Unsucessful response returned: {0:?}")]
ResponseRpcError(Option<RpcError>),
#[error("Error communication with wallet: {0:?}")]
WalletCommError(http::HttpError),
}
/// HTTP (JSONRPC) implementation of the 'Wallet' trait.
@ -73,14 +65,16 @@ impl HttpWallet {
"name": null,
"password": wallet_pass.to_string()
});
let token: Token = HttpWallet::async_send_enc_request(
&wallet_owner_url,
let url = format!("http://{}{}", wallet_owner_url, ENDPOINT);
let token: Token = http::async_send_enc_request(
&url,
&wallet_owner_secret,
"open_wallet",
&open_wallet_params,
&shared_key,
)
.await?;
.await
.map_err(WalletError::WalletCommError)?;
info!("Connected to wallet");
Ok(HttpWallet {
@ -98,17 +92,20 @@ impl HttpWallet {
let secp = Secp256k1::new();
let ephemeral_sk = secp::random_secret();
let ephemeral_pk = PublicKey::from_secret_key(&secp, &ephemeral_sk).unwrap();
let ephemeral_pk_bytes = ephemeral_pk.serialize_vec(&secp, true);
let init_params = json!({
"ecdh_pubkey": ephemeral_pk.serialize_vec(&secp, true).to_hex()
"ecdh_pubkey": ephemeral_pk_bytes.to_hex()
});
let response_pk: ECDHPubkey = HttpWallet::async_send_json_request(
&wallet_owner_url,
let url = format!("http://{}{}", wallet_owner_url, ENDPOINT);
let response_pk: ECDHPubkey = http::async_send_json_request(
&url,
&wallet_owner_secret,
"init_secure_api",
&init_params,
)
.await?;
.await
.map_err(WalletError::WalletCommError)?;
let shared_key = {
let mut shared_pubkey = response_pk.ecdh_pubkey.clone();
@ -126,75 +123,16 @@ impl HttpWallet {
method: &str,
params: &serde_json::Value,
) -> Result<D, WalletError> {
HttpWallet::async_send_enc_request(
&self.wallet_owner_url,
let url = format!("http://{}{}", self.wallet_owner_url, ENDPOINT);
http::async_send_enc_request(
&url,
&self.wallet_owner_secret,
method,
params,
&self.shared_key,
)
.await
}
async fn async_send_enc_request<D: serde::de::DeserializeOwned>(
wallet_owner_url: &SocketAddr,
wallet_owner_secret: &Option<String>,
method: &str,
params: &serde_json::Value,
shared_key: &SecretKey,
) -> Result<D, WalletError> {
let url = format!("http://{}{}", wallet_owner_url, ENDPOINT);
let req = json!({
"method": method,
"params": params,
"id": JsonId::IntId(1),
"jsonrpc": "2.0",
});
let enc_req = EncryptedRequest::from_json(&JsonId::IntId(1), &req, &shared_key)
.map_err(WalletError::EncryptRequestError)?;
let res = client::post_async::<EncryptedRequest, EncryptedResponse>(
url.as_str(),
&enc_req,
wallet_owner_secret.clone(),
)
.await
.map_err(WalletError::ApiCommError)?;
let decrypted = res
.decrypt(&shared_key)
.map_err(WalletError::DecryptResponseError)?;
let response: Response =
serde_json::from_value(decrypted).map_err(WalletError::DecodeResponseError)?;
let result = response
.result
.ok_or(WalletError::ResponseRpcError(response.error.clone()))?;
let ok = result
.get("Ok")
.ok_or(WalletError::ResponseRpcError(response.error.clone()))?;
let parsed =
serde_json::from_value(ok.clone()).map_err(WalletError::DecodeResponseError)?;
Ok(parsed)
}
async fn async_send_json_request<D: serde::de::DeserializeOwned>(
wallet_owner_url: &SocketAddr,
wallet_owner_secret: &Option<String>,
method: &str,
params: &serde_json::Value,
) -> Result<D, WalletError> {
let url = format!("http://{}{}", wallet_owner_url, ENDPOINT);
let req = build_request(method, params);
let res = client::post_async::<Request, Response>(
url.as_str(),
&req,
wallet_owner_secret.clone(),
)
.await
.map_err(WalletError::ApiCommError)?;
let parsed = res
.clone()
.into_result()
.map_err(WalletError::ResponseParseError)?;
Ok(parsed)
.map_err(WalletError::WalletCommError)
}
pub fn get_token(&self) -> Token {
@ -219,35 +157,40 @@ impl Wallet for HttpWallet {
&self,
amount: u64,
) -> Result<(BlindingFactor, Output), WalletError> {
let req_json = json!({
let params = json!({
"token": self.token,
"features": "Plain",
"amount": amount
});
let output: OutputWithBlind = HttpWallet::async_send_enc_request(
&self.wallet_owner_url,
let url = format!("http://{}{}", self.wallet_owner_url, ENDPOINT);
let output: OutputWithBlind = http::async_send_enc_request(
&url,
&self.wallet_owner_secret,
"build_output",
&req_json,
&params,
&self.shared_key,
)
.await?;
.await
.map_err(WalletError::WalletCommError)?;
Ok((output.blind, output.output))
}
}
#[cfg(test)]
pub mod mock {
use super::{Wallet, WalletError};
use std::borrow::BorrowMut;
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use grin_core::core::{Output, OutputFeatures};
use grin_keychain::BlindingFactor;
use grin_onion::crypto::secp;
use secp256k1zkp::pedersen::Commitment;
use secp256k1zkp::Secp256k1;
use std::sync::{Arc, Mutex};
use super::{Wallet, WalletError};
/// Mock implementation of the 'Wallet' trait for unit-tests.
#[derive(Clone)]

View file

@ -1,27 +1,31 @@
use crate::common::node::IntegrationGrinNode;
use crate::common::wallet::{GrinWalletManager, IntegrationGrinWallet};
use grin_core::core::Transaction;
use grin_onion::crypto::comsig::ComSignature;
use grin_onion::crypto::dalek::DalekPublicKey;
use grin_onion::onion::Onion;
use grin_wallet_impls::tor::process::TorProcess;
use mwixnet::client::MixClientImpl;
use mwixnet::{tor, SwapError, SwapServer, SwapStore};
use secp256k1zkp::SecretKey;
use std::iter;
use std::net::TcpListener;
use std::sync::Arc;
use grin_core::core::Transaction;
use tor_rtcompat::PreferredRuntime;
use x25519_dalek::{PublicKey as xPublicKey, StaticSecret};
pub struct IntegrationSwapServer {
use grin_onion::crypto::comsig::ComSignature;
use grin_onion::crypto::dalek::DalekPublicKey;
use grin_onion::onion::Onion;
use mwixnet::mix_client::MixClientImpl;
use mwixnet::tor::TorService;
use mwixnet::{tor, SwapError, SwapServer, SwapStore};
use secp256k1zkp::SecretKey;
use crate::common::node::IntegrationGrinNode;
use crate::common::wallet::{GrinWalletManager, IntegrationGrinWallet};
pub struct IntegrationSwapServer<R: tor_rtcompat::Runtime> {
server_key: SecretKey,
tor_process: TorProcess,
tor_instance: Arc<grin_util::Mutex<TorService<R>>>,
swap_server: Arc<tokio::sync::Mutex<dyn SwapServer>>,
rpc_server: jsonrpc_http_server::Server,
_wallet: Arc<grin_util::Mutex<IntegrationGrinWallet>>,
}
impl IntegrationSwapServer {
impl<R: tor_rtcompat::Runtime> IntegrationSwapServer<R> {
pub async fn async_swap(&self, onion: &Onion, comsig: &ComSignature) -> Result<(), SwapError> {
self.swap_server.lock().await.swap(&onion, &comsig).await
}
@ -31,21 +35,25 @@ impl IntegrationSwapServer {
}
}
pub struct IntegrationMixServer {
pub struct IntegrationMixServer<R: tor_rtcompat::Runtime> {
server_key: SecretKey,
tor_process: TorProcess,
tor_instance: Arc<grin_util::Mutex<TorService<R>>>,
rpc_server: jsonrpc_http_server::Server,
_wallet: Arc<grin_util::Mutex<IntegrationGrinWallet>>,
}
async fn async_new_swap_server(
async fn async_new_swap_server<R>(
data_dir: &str,
rt_handle: &tokio::runtime::Handle,
tor_runtime: R,
wallets: &mut GrinWalletManager,
server_key: &SecretKey,
node: &Arc<grin_util::Mutex<IntegrationGrinNode>>,
next_server: Option<&IntegrationMixServer>,
) -> IntegrationSwapServer {
next_server: Option<&IntegrationMixServer<R>>,
) -> IntegrationSwapServer<R>
where
R: tor_rtcompat::Runtime,
{
let wallet = wallets.async_new_wallet(&node.lock().api_address()).await;
let server_config = mwixnet::ServerConfig {
@ -55,10 +63,6 @@ async fn async_new_swap_server(
.unwrap()
.local_addr()
.unwrap(),
socks_proxy_addr: TcpListener::bind("127.0.0.1:0")
.unwrap()
.local_addr()
.unwrap(),
grin_node_url: node.lock().api_address(),
grin_node_secret_path: None,
wallet_owner_url: wallet.lock().owner_address(),
@ -72,7 +76,10 @@ async fn async_new_swap_server(
// Open SwapStore
let store = SwapStore::new(format!("{}/db", data_dir).as_str()).unwrap();
let tor_process = tor::init_tor_listener(&data_dir, &server_config).unwrap();
let tor_instance = tor::async_init_tor(tor_runtime, &data_dir, &server_config)
.await
.unwrap();
let tor_instance = Arc::new(grin_util::Mutex::new(tor_instance));
let (swap_server, rpc_server) = mwixnet::swap_listen(
rt_handle,
@ -80,6 +87,7 @@ async fn async_new_swap_server(
match next_server {
Some(s) => Some(Arc::new(MixClientImpl::new(
server_config.clone(),
tor_instance.clone(),
DalekPublicKey::from_secret(&s.server_key),
))),
None => None,
@ -92,22 +100,26 @@ async fn async_new_swap_server(
IntegrationSwapServer {
server_key: server_key.clone(),
tor_process,
tor_instance,
swap_server,
rpc_server,
_wallet: wallet,
}
}
async fn async_new_mix_server(
async fn async_new_mix_server<R>(
data_dir: &str,
rt_handle: &tokio::runtime::Handle,
tor_runtime: R,
wallets: &mut GrinWalletManager,
server_key: &SecretKey,
node: &Arc<grin_util::Mutex<IntegrationGrinNode>>,
prev_server: DalekPublicKey,
next_server: Option<&IntegrationMixServer>,
) -> IntegrationMixServer {
next_server: Option<&IntegrationMixServer<R>>,
) -> IntegrationMixServer<R>
where
R: tor_rtcompat::Runtime,
{
let wallet = wallets.async_new_wallet(&node.lock().api_address()).await;
let server_config = mwixnet::ServerConfig {
key: server_key.clone(),
@ -116,10 +128,6 @@ async fn async_new_mix_server(
.unwrap()
.local_addr()
.unwrap(),
socks_proxy_addr: TcpListener::bind("127.0.0.1:0")
.unwrap()
.local_addr()
.unwrap(),
grin_node_url: node.lock().api_address(),
grin_node_secret_path: None,
wallet_owner_url: wallet.lock().owner_address(),
@ -131,7 +139,10 @@ async fn async_new_mix_server(
},
};
let tor_process = tor::init_tor_listener(&data_dir, &server_config).unwrap();
let tor_instance = tor::async_init_tor(tor_runtime, &data_dir, &server_config)
.await
.unwrap();
let tor_instance = Arc::new(grin_util::Mutex::new(tor_instance));
let (_, rpc_server) = mwixnet::mix_listen(
rt_handle,
@ -139,6 +150,7 @@ async fn async_new_mix_server(
match next_server {
Some(s) => Some(Arc::new(MixClientImpl::new(
server_config.clone(),
tor_instance.clone(),
DalekPublicKey::from_secret(&s.server_key),
))),
None => None,
@ -150,16 +162,16 @@ async fn async_new_mix_server(
IntegrationMixServer {
server_key: server_key.clone(),
tor_process,
tor_instance,
rpc_server,
_wallet: wallet,
}
}
pub struct Servers {
pub swapper: IntegrationSwapServer,
pub swapper: IntegrationSwapServer<PreferredRuntime>,
pub mixers: Vec<IntegrationMixServer>,
pub mixers: Vec<IntegrationMixServer<PreferredRuntime>>,
}
impl Servers {
@ -176,12 +188,16 @@ impl Servers {
.take(num_mixers + 1)
.collect();
// Setup mock tor network
let tor_runtime = PreferredRuntime::current().unwrap();
// Build mixers in reverse order
let mut mixers = Vec::new();
for i in (0..num_mixers).rev() {
let mix_server = async_new_mix_server(
format!("{}/mixers/{}", test_dir, i).as_str(),
rt_handle,
tor_runtime.clone(),
wallets,
&server_keys[i + 1],
&node,
@ -206,6 +222,7 @@ impl Servers {
let swapper = async_new_swap_server(
format!("{}/swapper", test_dir).as_str(),
rt_handle,
tor_runtime.clone(),
wallets,
&server_keys[0],
&node,
@ -234,11 +251,11 @@ impl Servers {
pub fn stop_all(&mut self) {
self.swapper.rpc_server.close_handle().close();
self.swapper.tor_process.kill().unwrap();
self.swapper.tor_instance.lock().stop();
self.mixers.iter_mut().for_each(|mixer| {
mixer.rpc_server.close_handle().close();
mixer.tor_process.kill().unwrap();
mixer.tor_instance.lock().stop();
});
}
}

View file

@ -1,13 +1,11 @@
use crate::common::types::BlockFees;
use grin_api::client;
use grin_api::json_rpc::Response;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;
use std::thread;
use grin_core::core::{FeeFields, Output, OutputFeatures, Transaction, TxKernel};
use grin_core::global::ChainTypes;
use grin_core::libtx::tx_fee;
use grin_keychain::{BlindingFactor, ExtKeychain, Identifier, Keychain, SwitchCommitmentType};
use grin_onion::crypto::comsig::ComSignature;
use grin_onion::onion::Onion;
use grin_onion::Hop;
use grin_util::{Mutex, ZeroingString};
use grin_wallet_api::Owner;
use grin_wallet_config::WalletConfig;
@ -15,15 +13,19 @@ use grin_wallet_controller::controller;
use grin_wallet_impls::{DefaultLCProvider, DefaultWalletImpl, HTTPNodeClient};
use grin_wallet_libwallet::{InitTxArgs, Slate, VersionedSlate, WalletInfo, WalletInst};
use log::error;
use serde_derive::{Deserialize, Serialize};
use serde_json::json;
use x25519_dalek::PublicKey as xPublicKey;
use grin_onion::crypto::comsig::ComSignature;
use grin_onion::onion::Onion;
use grin_onion::Hop;
use mwixnet::http;
use mwixnet::wallet::HttpWallet;
use secp256k1zkp::pedersen::Commitment;
use secp256k1zkp::{Secp256k1, SecretKey};
use serde_derive::{Deserialize, Serialize};
use serde_json::json;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;
use std::thread;
use x25519_dalek::PublicKey as xPublicKey;
use crate::common::types::BlockFees;
/// Response to build a coinbase output.
#[derive(Serialize, Deserialize, Debug, Clone)]
@ -202,14 +204,9 @@ impl IntegrationGrinWallet {
&self,
slate: &VersionedSlate,
) -> Result<VersionedSlate, grin_servers::common::types::Error> {
let req_body = json!({
"jsonrpc": "2.0",
"method": "receive_tx",
"id": 1,
"params": [slate, null, null]
});
let res: Response = client::post_async(self.foreign_api().as_str(), &req_body, None)
let params = json!([slate, null, null]);
let response =
http::async_send_json_request(&self.foreign_api(), &None, "receive_tx", &params)
.await
.map_err(|e| {
let report = format!("Failed to receive tx. Is the wallet listening? {}", e);
@ -217,13 +214,7 @@ impl IntegrationGrinWallet {
grin_servers::common::types::Error::WalletComm(report)
})?;
let parsed: VersionedSlate = res.clone().into_result().map_err(|e| {
let report = format!("Error parsing result: {}", e);
error!("{}", report);
grin_servers::common::types::Error::WalletComm(report)
})?;
Ok(parsed)
Ok(response)
}
async fn async_finalize_tx(
@ -241,6 +232,7 @@ impl IntegrationGrinWallet {
.await
}
#[allow(dead_code)]
async fn async_post_tx(
&self,
finalized_slate: &VersionedSlate,
@ -264,29 +256,19 @@ impl IntegrationGrinWallet {
&self,
block_fees: &BlockFees,
) -> Result<CbData, grin_servers::common::types::Error> {
let req_body = json!({
"jsonrpc": "2.0",
"method": "build_coinbase",
"id": 1,
"params": {
let params = json!({
"block_fees": block_fees
}
});
let res: Response = client::post_async(self.foreign_api().as_str(), &req_body, None)
let response =
http::async_send_json_request(&self.foreign_api(), &None, "build_coinbase", &params)
.await
.map_err(|e| {
let report = format!("Failed to get coinbase. Is the wallet listening? {}", e);
error!("{}", report);
grin_servers::common::types::Error::WalletComm(report)
})?;
let parsed: CbData = res.clone().into_result().map_err(|e| {
let report = format!("Error parsing result: {}", e);
error!("{}", report);
grin_servers::common::types::Error::WalletComm(report)
})?;
Ok(parsed)
Ok(response)
}
pub fn build_onion(

View file

@ -1,18 +1,20 @@
use crate::common::miner::Miner;
use crate::common::node::GrinNodeManager;
use crate::common::server::Servers;
use crate::common::wallet::GrinWalletManager;
#[macro_use]
extern crate log;
use std::ops::Deref;
use function_name::named;
use grin_core::global;
use grin_util::logger::LoggingConfig;
use log::Level;
use std::ops::Deref;
use crate::common::miner::Miner;
use crate::common::node::GrinNodeManager;
use crate::common::server::Servers;
use crate::common::wallet::GrinWalletManager;
mod common;
#[macro_use]
extern crate log;
/// Just removes all results from previous runs
fn clean_all_output(test_dir: &str) {
if let Err(e) = remove_dir_all::remove_dir_all(test_dir) {
@ -42,8 +44,7 @@ fn setup_test(test_name: &str) -> (GrinNodeManager, GrinWalletManager, String) {
#[named]
fn integration_test() -> Result<(), Box<dyn std::error::Error>> {
let (mut nodes, mut wallets, test_dir) = setup_test(function_name!());
let mut rt = tokio::runtime::Builder::new()
.threaded_scheduler()
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;