From 36d6b75c6518b8dd6cf9a4ebce61ccacaefd4914 Mon Sep 17 00:00:00 2001 From: ardocrat Date: Thu, 16 May 2024 19:37:28 +0300 Subject: [PATCH] tor: single client and config, tx status setup after cancellation at tor sending --- src/gui/views/wallets/wallet/transport.rs | 8 +- src/tor/tor.rs | 193 ++++++++++++++-------- src/wallet/wallet.rs | 36 +++- 3 files changed, 152 insertions(+), 85 deletions(-) diff --git a/src/gui/views/wallets/wallet/transport.rs b/src/gui/views/wallets/wallet/transport.rs index 2e078a2..65af4b2 100644 --- a/src/gui/views/wallets/wallet/transport.rs +++ b/src/gui/views/wallets/wallet/transport.rs @@ -308,6 +308,7 @@ impl WalletTransport { if let Ok(key) = wallet.secret_key() { let service_id = &wallet.identifier(); Tor::stop_service(service_id); + Tor::rebuild_client(); let api_port = wallet.foreign_api_port().unwrap(); Tor::start_service(api_port, key, service_id); } @@ -344,6 +345,7 @@ impl WalletTransport { if let Ok(key) = wallet.secret_key() { let service_id = &wallet.identifier(); Tor::stop_service(service_id); + Tor::rebuild_client(); let api_port = wallet.foreign_api_port().unwrap(); Tor::start_service(api_port, key, service_id); } @@ -687,10 +689,9 @@ impl WalletTransport { let mut wallet = wallet.clone(); thread::spawn(move || { let runtime = TokioNativeTlsRuntime::create().unwrap(); - let runtime_tor = runtime.clone(); runtime .block_on(async { - if wallet.send_tor(a, &addr, runtime_tor) + if wallet.send_tor(a, &addr) .await .is_some() { let mut w_send_success = send_success.write(); @@ -746,13 +747,12 @@ impl WalletTransport { let mut wallet = wallet.clone(); thread::spawn(move || { let runtime = TokioNativeTlsRuntime::create().unwrap(); - let runtime_tor = runtime.clone(); runtime .block_on(async { let addr_str = addr_text.as_str(); let addr = &SlatepackAddress::try_from(addr_str) .unwrap(); - if wallet.send_tor(a, &addr, runtime_tor) + if wallet.send_tor(a, &addr) .await .is_some() { let mut w_send_success = send_success.write(); diff --git a/src/tor/tor.rs b/src/tor/tor.rs index 18f95ed..1f8e9e6 100644 --- a/src/tor/tor.rs +++ b/src/tor/tor.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use std::thread; use std::time::Duration; use arti_client::config::pt::TransportConfigBuilder; +use grin_api::client; use lazy_static::lazy_static; use futures::task::SpawnExt; @@ -33,7 +34,7 @@ use sha2::Sha512; use tokio::time::sleep; use tor_config::CfgPath; use tor_rtcompat::tokio::TokioNativeTlsRuntime; -use tor_rtcompat::Runtime; +use tor_rtcompat::{BlockOn, Runtime}; use tor_hsrproxy::OnionServiceReverseProxy; use tor_hsrproxy::config::{Encapsulation, ProxyAction, ProxyPattern, ProxyRule, TargetAddr, ProxyConfigBuilder}; use tor_hsservice::config::OnionServiceConfigBuilder; @@ -63,6 +64,8 @@ lazy_static! { /// Tor server to use as SOCKS proxy for requests and to launch Onion services. pub struct Tor { + /// Tor client and config. + client_config: Arc, TorClientConfig)>>, /// Mapping of running Onion services identifiers to proxy. running_services: Arc, Arc)>>>, @@ -76,18 +79,20 @@ pub struct Tor { impl Default for Tor { fn default() -> Self { + let client_config = Self::build_client(TokioNativeTlsRuntime::create().unwrap()); Self { running_services: Arc::new(RwLock::new(BTreeMap::new())), starting_services: Arc::new(RwLock::new(BTreeSet::new())), failed_services: Arc::new(RwLock::new(BTreeSet::new())), - checking_services: Arc::new(RwLock::new(BTreeSet::new())) + checking_services: Arc::new(RwLock::new(BTreeSet::new())), + client_config: Arc::new(RwLock::new(client_config)), } } } impl Tor { - async fn build_client(runtime: TokioNativeTlsRuntime) - -> (TorClient, TorClientConfig) { + fn build_client(runtime: TokioNativeTlsRuntime) + -> (TorClient, TorClientConfig) { // Create Tor client config. let mut builder = TorClientConfigBuilder::from_directories(TorConfig::state_path(), @@ -105,16 +110,23 @@ impl Tor { // Create connected Tor client from config. let config = builder.build().unwrap(); (TorClient::with_runtime(runtime) - .config(config.clone()) - .create_bootstrapped() - .await - .unwrap(), config) + .config(config.clone()) + .create_unbootstrapped() + .unwrap(), config) + } + + /// Recreate Tor client on configuration change. + pub fn rebuild_client() { + let client_config = Self::build_client(TokioNativeTlsRuntime::create().unwrap()); + let mut w_client = TOR_SERVER_STATE.client_config.write(); + *w_client = client_config; } /// Send post request using Tor. - pub async fn post(body: String, url: String, runtime: TokioNativeTlsRuntime) -> Option { - // Create client. - let (client, _) = Self::build_client(runtime).await; + pub async fn post(body: String, url: String) -> Option { + // Bootstrap client. + let (client, _) = Self::client_config(); + client.bootstrap().await.unwrap(); // Create http tor-powered client to post data. let tls_connector = TlsConnector::builder().unwrap().build().unwrap(); let tor_connector = ArtiHttpConnector::new(client, tls_connector); @@ -141,6 +153,11 @@ impl Tor { resp } + fn client_config() -> (TorClient, TorClientConfig) { + let r_client_config = TOR_SERVER_STATE.client_config.read(); + r_client_config.clone() + } + /// Check if Onion service is starting. pub fn is_service_starting(id: &String) -> bool { let r_services = TOR_SERVER_STATE.starting_services.read(); @@ -190,78 +207,110 @@ impl Tor { let service_id = id.clone(); thread::spawn(move || { - let runtime = TokioNativeTlsRuntime::create().unwrap(); - let runtime_client = runtime.clone(); - runtime.spawn(async move { - let (client, config) = Self::build_client(runtime_client.clone()).await; + let (client, config) = Self::client_config(); + let client_thread = client.clone(); + client.runtime().spawn(async move { // Add service key to keystore. let hs_nickname = HsNickname::new(service_id.clone()).unwrap(); Self::add_service_key(config.fs_mistrust(), &key, &hs_nickname); + // Bootstrap client. + client_thread.bootstrap().await.unwrap(); // Launch Onion service. let service_config = OnionServiceConfigBuilder::default() .nickname(hs_nickname.clone()) .build() .unwrap(); - let (service, request) = client.launch_onion_service(service_config).unwrap(); - - // Launch service proxy. - let addr = SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), port); - tokio::spawn( - Self::run_service_proxy(addr, - client.clone(), - service.clone(), - request, - hs_nickname.clone()) - ).await.unwrap(); - - // Check service availability if not checking. - if Self::is_service_checking(&service_id) { - return; - } - let url = format!("http://{}/v2/foreign", service.onion_name().unwrap().to_string()); - thread::spawn(move || { - // Wait 5 seconds for service to start. - thread::sleep(Duration::from_millis(5000)); - let runtime = TokioNativeTlsRuntime::create().unwrap(); - let client_runtime = runtime.clone(); - // Put service to checking. - { - let mut w_services = TOR_SERVER_STATE.checking_services.write(); - w_services.insert(service_id.clone()); + if let Ok((service, request)) = client_thread.launch_onion_service(service_config) { + // Launch service proxy. + let addr = SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), port); + tokio::spawn( + Self::run_service_proxy(addr, + client_thread.clone(), + service.clone(), + request, + hs_nickname.clone()) + ).await.unwrap(); + + // Check service availability if not checking. + if Self::is_service_checking(&service_id) { + return; } - runtime - .spawn(async move { - loop { - if !Self::is_service_running(&service_id) && - !Self::is_service_starting(&service_id) { - // Remove service from checking. - let mut w_services = TOR_SERVER_STATE.checking_services.write(); - w_services.remove(&service_id); - break; - } - let data = json!({ - "id": 1, - "jsonrpc": "2.0", - "method": "check_version", - "params": [] - }).to_string(); - match Self::post(data, url.clone(), client_runtime.clone()).await { - Some(_) => { - // Remove service from starting. - let mut w_services = TOR_SERVER_STATE.starting_services.write(); + let url = format!("http://{}/v2/foreign", service.onion_name().unwrap().to_string()); + thread::spawn(move || { + // Wait 5 seconds to start. + thread::sleep(Duration::from_millis(5000)); + let runtime = TokioNativeTlsRuntime::create().unwrap(); + let client_runtime = runtime.clone(); + // Put service to checking. + { + let mut w_services = TOR_SERVER_STATE.checking_services.write(); + w_services.insert(service_id.clone()); + } + runtime + .spawn(async move { + loop { + if !Self::is_service_running(&service_id) && + !Self::is_service_starting(&service_id) { + // Remove service from checking. + let mut w_services = TOR_SERVER_STATE.checking_services.write(); w_services.remove(&service_id); - }, - None => { - // Put service to starting. - let mut w_services = TOR_SERVER_STATE.starting_services.write(); - w_services.insert(service_id.clone()); - }, + break; + } + let data = json!({ + "id": 1, + "jsonrpc": "2.0", + "method": "check_version", + "params": [] + }).to_string(); + // Bootstrap client. + let (client, _) = Self::build_client(client_runtime.clone()); + client.bootstrap().await.unwrap(); + // Create http tor-powered client to post data. + let tls_connector = TlsConnector::builder().unwrap().build().unwrap(); + let tor_connector = ArtiHttpConnector::new(client, tls_connector); + let http = hyper::Client::builder().build::<_, Body>(tor_connector); + // Create request. + let req = hyper::Request::builder() + .method(hyper::Method::POST) + .uri(url.clone()) + .body(Body::from(data)) + .unwrap(); + // Send request. + match http.request(req).await { + Ok(r) => { + match hyper::body::to_bytes(r).await { + Ok(_) => { + // Remove service from starting. + let mut w_services = TOR_SERVER_STATE.starting_services.write(); + w_services.remove(&service_id); + }, + Err(_) => { + // Put service to starting. + let mut w_services = TOR_SERVER_STATE.starting_services.write(); + w_services.insert(service_id.clone()); + }, + } + }, + Err(_) => { + // Put service to starting. + let mut w_services = TOR_SERVER_STATE.starting_services.write(); + w_services.insert(service_id.clone()); + }, + } + // Check once per 5 second. + sleep(Duration::from_millis(5000)).await; } - // Ping once per 10 second. - sleep(Duration::from_millis(10000)).await; - } - }).unwrap(); - }); + }).unwrap(); + }); + } else { + // Remove service from starting. + let mut w_services = TOR_SERVER_STATE.starting_services.write(); + w_services.remove(&service_id); + // Save failed service. + let mut w_services = TOR_SERVER_STATE.failed_services.write(); + w_services.insert(service_id); + } + }).unwrap(); }); } diff --git a/src/wallet/wallet.rs b/src/wallet/wallet.rs index 06be0b5..fd8e84b 100644 --- a/src/wallet/wallet.rs +++ b/src/wallet/wallet.rs @@ -676,9 +676,7 @@ impl Wallet { } /// Send amount to provided address with Tor transport. - pub async fn send_tor(&mut self, - amount: u64, addr: &SlatepackAddress, - runtime: TokioNativeTlsRuntime) -> Option { + pub async fn send_tor(&mut self, amount: u64, addr: &SlatepackAddress) -> Option { // Initialize transaction. let send_res = self.send(amount); @@ -690,7 +688,30 @@ impl Wallet { // Function to cancel initialized tx in case of error. let cancel_tx = || { let instance = self.instance.clone().unwrap(); - let _ = cancel_tx(instance, None, &None, None, Some(slate.clone().id)); + let id = slate.clone().id; + cancel_tx(instance, None, &None, None, Some(id.clone())).unwrap(); + // Setup posting flag, and ability to finalize. + { + let mut w_data = self.data.write(); + let mut data = w_data.clone().unwrap(); + let txs = data.txs.iter_mut().map(|tx| { + if tx.data.tx_slate_id == Some(id) { + tx.cancelling = false; + tx.posting = false; + tx.can_finalize = false; + tx.data.tx_type = if tx.data.tx_type == TxLogEntryType::TxReceived { + TxLogEntryType::TxReceivedCancelled + } else { + TxLogEntryType::TxSentCancelled + }; + } + tx.clone() + }).collect::>(); + data.txs = txs; + *w_data = Some(data); + } + // Refresh wallet info to update statuses. + self.sync(); }; // Initialize parameters. @@ -709,7 +730,7 @@ impl Wallet { }).to_string(); // Send request to receiver. - let req_res = Tor::post(body, url, runtime).await; + let req_res = Tor::post(body, url).await; if req_res.is_none() { cancel_tx(); return None; @@ -886,7 +907,7 @@ impl Wallet { let wallet = self.clone(); thread::spawn(move || { let instance = wallet.instance.clone().unwrap(); - let _ = cancel_tx(instance, None, &None, Some(id), None); + cancel_tx(instance, None, &None, Some(id), None).unwrap(); // Setup posting flag, and ability to finalize. let mut w_data = wallet.data.write(); let mut data = w_data.clone().unwrap(); @@ -1337,12 +1358,9 @@ fn start_api_server(wallet: &mut Wallet) -> Result<(ApiServer, u16), Error> { Box::leak(Box::new(oneshot::channel::<()>())); let mut apis = ApiServer::new(); - println!("Starting HTTP Foreign listener API server at {}.", api_addr); let socket_addr: SocketAddr = api_addr.parse().unwrap(); let _ = apis.start(socket_addr, router, None, api_chan) .map_err(|_| Error::GenericError("API thread failed to start".to_string()))?; - - println!("HTTP Foreign listener started."); Ok((apis, free_port)) }