tor: single client and config, tx status setup after cancellation at tor sending

This commit is contained in:
ardocrat 2024-05-16 19:37:28 +03:00
parent 50099da88c
commit 36d6b75c65
3 changed files with 152 additions and 85 deletions

View file

@ -308,6 +308,7 @@ impl WalletTransport {
if let Ok(key) = wallet.secret_key() { if let Ok(key) = wallet.secret_key() {
let service_id = &wallet.identifier(); let service_id = &wallet.identifier();
Tor::stop_service(service_id); Tor::stop_service(service_id);
Tor::rebuild_client();
let api_port = wallet.foreign_api_port().unwrap(); let api_port = wallet.foreign_api_port().unwrap();
Tor::start_service(api_port, key, service_id); Tor::start_service(api_port, key, service_id);
} }
@ -344,6 +345,7 @@ impl WalletTransport {
if let Ok(key) = wallet.secret_key() { if let Ok(key) = wallet.secret_key() {
let service_id = &wallet.identifier(); let service_id = &wallet.identifier();
Tor::stop_service(service_id); Tor::stop_service(service_id);
Tor::rebuild_client();
let api_port = wallet.foreign_api_port().unwrap(); let api_port = wallet.foreign_api_port().unwrap();
Tor::start_service(api_port, key, service_id); Tor::start_service(api_port, key, service_id);
} }
@ -687,10 +689,9 @@ impl WalletTransport {
let mut wallet = wallet.clone(); let mut wallet = wallet.clone();
thread::spawn(move || { thread::spawn(move || {
let runtime = TokioNativeTlsRuntime::create().unwrap(); let runtime = TokioNativeTlsRuntime::create().unwrap();
let runtime_tor = runtime.clone();
runtime runtime
.block_on(async { .block_on(async {
if wallet.send_tor(a, &addr, runtime_tor) if wallet.send_tor(a, &addr)
.await .await
.is_some() { .is_some() {
let mut w_send_success = send_success.write(); let mut w_send_success = send_success.write();
@ -746,13 +747,12 @@ impl WalletTransport {
let mut wallet = wallet.clone(); let mut wallet = wallet.clone();
thread::spawn(move || { thread::spawn(move || {
let runtime = TokioNativeTlsRuntime::create().unwrap(); let runtime = TokioNativeTlsRuntime::create().unwrap();
let runtime_tor = runtime.clone();
runtime runtime
.block_on(async { .block_on(async {
let addr_str = addr_text.as_str(); let addr_str = addr_text.as_str();
let addr = &SlatepackAddress::try_from(addr_str) let addr = &SlatepackAddress::try_from(addr_str)
.unwrap(); .unwrap();
if wallet.send_tor(a, &addr, runtime_tor) if wallet.send_tor(a, &addr)
.await .await
.is_some() { .is_some() {
let mut w_send_success = send_success.write(); let mut w_send_success = send_success.write();

View file

@ -18,6 +18,7 @@ use std::sync::Arc;
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
use arti_client::config::pt::TransportConfigBuilder; use arti_client::config::pt::TransportConfigBuilder;
use grin_api::client;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use futures::task::SpawnExt; use futures::task::SpawnExt;
@ -33,7 +34,7 @@ use sha2::Sha512;
use tokio::time::sleep; use tokio::time::sleep;
use tor_config::CfgPath; use tor_config::CfgPath;
use tor_rtcompat::tokio::TokioNativeTlsRuntime; use tor_rtcompat::tokio::TokioNativeTlsRuntime;
use tor_rtcompat::Runtime; use tor_rtcompat::{BlockOn, Runtime};
use tor_hsrproxy::OnionServiceReverseProxy; use tor_hsrproxy::OnionServiceReverseProxy;
use tor_hsrproxy::config::{Encapsulation, ProxyAction, ProxyPattern, ProxyRule, TargetAddr, ProxyConfigBuilder}; use tor_hsrproxy::config::{Encapsulation, ProxyAction, ProxyPattern, ProxyRule, TargetAddr, ProxyConfigBuilder};
use tor_hsservice::config::OnionServiceConfigBuilder; use tor_hsservice::config::OnionServiceConfigBuilder;
@ -63,6 +64,8 @@ lazy_static! {
/// Tor server to use as SOCKS proxy for requests and to launch Onion services. /// Tor server to use as SOCKS proxy for requests and to launch Onion services.
pub struct Tor { pub struct Tor {
/// Tor client and config.
client_config: Arc<RwLock<(TorClient<TokioNativeTlsRuntime>, TorClientConfig)>>,
/// Mapping of running Onion services identifiers to proxy. /// Mapping of running Onion services identifiers to proxy.
running_services: Arc<RwLock<BTreeMap<String, running_services: Arc<RwLock<BTreeMap<String,
(Arc<RunningOnionService>, Arc<OnionServiceReverseProxy>)>>>, (Arc<RunningOnionService>, Arc<OnionServiceReverseProxy>)>>>,
@ -76,17 +79,19 @@ pub struct Tor {
impl Default for Tor { impl Default for Tor {
fn default() -> Self { fn default() -> Self {
let client_config = Self::build_client(TokioNativeTlsRuntime::create().unwrap());
Self { Self {
running_services: Arc::new(RwLock::new(BTreeMap::new())), running_services: Arc::new(RwLock::new(BTreeMap::new())),
starting_services: Arc::new(RwLock::new(BTreeSet::new())), starting_services: Arc::new(RwLock::new(BTreeSet::new())),
failed_services: Arc::new(RwLock::new(BTreeSet::new())), failed_services: Arc::new(RwLock::new(BTreeSet::new())),
checking_services: Arc::new(RwLock::new(BTreeSet::new())) checking_services: Arc::new(RwLock::new(BTreeSet::new())),
client_config: Arc::new(RwLock::new(client_config)),
} }
} }
} }
impl Tor { impl Tor {
async fn build_client(runtime: TokioNativeTlsRuntime) fn build_client(runtime: TokioNativeTlsRuntime)
-> (TorClient<TokioNativeTlsRuntime>, TorClientConfig) { -> (TorClient<TokioNativeTlsRuntime>, TorClientConfig) {
// Create Tor client config. // Create Tor client config.
let mut builder = let mut builder =
@ -106,15 +111,22 @@ impl Tor {
let config = builder.build().unwrap(); let config = builder.build().unwrap();
(TorClient::with_runtime(runtime) (TorClient::with_runtime(runtime)
.config(config.clone()) .config(config.clone())
.create_bootstrapped() .create_unbootstrapped()
.await
.unwrap(), config) .unwrap(), config)
} }
/// Recreate Tor client on configuration change.
pub fn rebuild_client() {
let client_config = Self::build_client(TokioNativeTlsRuntime::create().unwrap());
let mut w_client = TOR_SERVER_STATE.client_config.write();
*w_client = client_config;
}
/// Send post request using Tor. /// Send post request using Tor.
pub async fn post(body: String, url: String, runtime: TokioNativeTlsRuntime) -> Option<String> { pub async fn post(body: String, url: String) -> Option<String> {
// Create client. // Bootstrap client.
let (client, _) = Self::build_client(runtime).await; let (client, _) = Self::client_config();
client.bootstrap().await.unwrap();
// Create http tor-powered client to post data. // Create http tor-powered client to post data.
let tls_connector = TlsConnector::builder().unwrap().build().unwrap(); let tls_connector = TlsConnector::builder().unwrap().build().unwrap();
let tor_connector = ArtiHttpConnector::new(client, tls_connector); let tor_connector = ArtiHttpConnector::new(client, tls_connector);
@ -141,6 +153,11 @@ impl Tor {
resp resp
} }
fn client_config() -> (TorClient<TokioNativeTlsRuntime>, TorClientConfig) {
let r_client_config = TOR_SERVER_STATE.client_config.read();
r_client_config.clone()
}
/// Check if Onion service is starting. /// Check if Onion service is starting.
pub fn is_service_starting(id: &String) -> bool { pub fn is_service_starting(id: &String) -> bool {
let r_services = TOR_SERVER_STATE.starting_services.read(); let r_services = TOR_SERVER_STATE.starting_services.read();
@ -190,25 +207,25 @@ impl Tor {
let service_id = id.clone(); let service_id = id.clone();
thread::spawn(move || { thread::spawn(move || {
let runtime = TokioNativeTlsRuntime::create().unwrap(); let (client, config) = Self::client_config();
let runtime_client = runtime.clone(); let client_thread = client.clone();
runtime.spawn(async move { client.runtime().spawn(async move {
let (client, config) = Self::build_client(runtime_client.clone()).await;
// Add service key to keystore. // Add service key to keystore.
let hs_nickname = HsNickname::new(service_id.clone()).unwrap(); let hs_nickname = HsNickname::new(service_id.clone()).unwrap();
Self::add_service_key(config.fs_mistrust(), &key, &hs_nickname); Self::add_service_key(config.fs_mistrust(), &key, &hs_nickname);
// Bootstrap client.
client_thread.bootstrap().await.unwrap();
// Launch Onion service. // Launch Onion service.
let service_config = OnionServiceConfigBuilder::default() let service_config = OnionServiceConfigBuilder::default()
.nickname(hs_nickname.clone()) .nickname(hs_nickname.clone())
.build() .build()
.unwrap(); .unwrap();
let (service, request) = client.launch_onion_service(service_config).unwrap(); if let Ok((service, request)) = client_thread.launch_onion_service(service_config) {
// Launch service proxy. // Launch service proxy.
let addr = SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), port); let addr = SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), port);
tokio::spawn( tokio::spawn(
Self::run_service_proxy(addr, Self::run_service_proxy(addr,
client.clone(), client_thread.clone(),
service.clone(), service.clone(),
request, request,
hs_nickname.clone()) hs_nickname.clone())
@ -220,7 +237,7 @@ impl Tor {
} }
let url = format!("http://{}/v2/foreign", service.onion_name().unwrap().to_string()); let url = format!("http://{}/v2/foreign", service.onion_name().unwrap().to_string());
thread::spawn(move || { thread::spawn(move || {
// Wait 5 seconds for service to start. // Wait 5 seconds to start.
thread::sleep(Duration::from_millis(5000)); thread::sleep(Duration::from_millis(5000));
let runtime = TokioNativeTlsRuntime::create().unwrap(); let runtime = TokioNativeTlsRuntime::create().unwrap();
let client_runtime = runtime.clone(); let client_runtime = runtime.clone();
@ -245,23 +262,55 @@ impl Tor {
"method": "check_version", "method": "check_version",
"params": [] "params": []
}).to_string(); }).to_string();
match Self::post(data, url.clone(), client_runtime.clone()).await { // Bootstrap client.
Some(_) => { let (client, _) = Self::build_client(client_runtime.clone());
client.bootstrap().await.unwrap();
// Create http tor-powered client to post data.
let tls_connector = TlsConnector::builder().unwrap().build().unwrap();
let tor_connector = ArtiHttpConnector::new(client, tls_connector);
let http = hyper::Client::builder().build::<_, Body>(tor_connector);
// Create request.
let req = hyper::Request::builder()
.method(hyper::Method::POST)
.uri(url.clone())
.body(Body::from(data))
.unwrap();
// Send request.
match http.request(req).await {
Ok(r) => {
match hyper::body::to_bytes(r).await {
Ok(_) => {
// Remove service from starting. // Remove service from starting.
let mut w_services = TOR_SERVER_STATE.starting_services.write(); let mut w_services = TOR_SERVER_STATE.starting_services.write();
w_services.remove(&service_id); w_services.remove(&service_id);
}, },
None => { Err(_) => {
// Put service to starting. // Put service to starting.
let mut w_services = TOR_SERVER_STATE.starting_services.write(); let mut w_services = TOR_SERVER_STATE.starting_services.write();
w_services.insert(service_id.clone()); w_services.insert(service_id.clone());
}, },
} }
// Ping once per 10 second. },
sleep(Duration::from_millis(10000)).await; Err(_) => {
// Put service to starting.
let mut w_services = TOR_SERVER_STATE.starting_services.write();
w_services.insert(service_id.clone());
},
}
// Check once per 5 second.
sleep(Duration::from_millis(5000)).await;
} }
}).unwrap(); }).unwrap();
}); });
} else {
// Remove service from starting.
let mut w_services = TOR_SERVER_STATE.starting_services.write();
w_services.remove(&service_id);
// Save failed service.
let mut w_services = TOR_SERVER_STATE.failed_services.write();
w_services.insert(service_id);
}
}).unwrap(); }).unwrap();
}); });
} }

View file

@ -676,9 +676,7 @@ impl Wallet {
} }
/// Send amount to provided address with Tor transport. /// Send amount to provided address with Tor transport.
pub async fn send_tor(&mut self, pub async fn send_tor(&mut self, amount: u64, addr: &SlatepackAddress) -> Option<Slate> {
amount: u64, addr: &SlatepackAddress,
runtime: TokioNativeTlsRuntime) -> Option<Slate> {
// Initialize transaction. // Initialize transaction.
let send_res = self.send(amount); let send_res = self.send(amount);
@ -690,7 +688,30 @@ impl Wallet {
// Function to cancel initialized tx in case of error. // Function to cancel initialized tx in case of error.
let cancel_tx = || { let cancel_tx = || {
let instance = self.instance.clone().unwrap(); let instance = self.instance.clone().unwrap();
let _ = cancel_tx(instance, None, &None, None, Some(slate.clone().id)); let id = slate.clone().id;
cancel_tx(instance, None, &None, None, Some(id.clone())).unwrap();
// Setup posting flag, and ability to finalize.
{
let mut w_data = self.data.write();
let mut data = w_data.clone().unwrap();
let txs = data.txs.iter_mut().map(|tx| {
if tx.data.tx_slate_id == Some(id) {
tx.cancelling = false;
tx.posting = false;
tx.can_finalize = false;
tx.data.tx_type = if tx.data.tx_type == TxLogEntryType::TxReceived {
TxLogEntryType::TxReceivedCancelled
} else {
TxLogEntryType::TxSentCancelled
};
}
tx.clone()
}).collect::<Vec<WalletTransaction>>();
data.txs = txs;
*w_data = Some(data);
}
// Refresh wallet info to update statuses.
self.sync();
}; };
// Initialize parameters. // Initialize parameters.
@ -709,7 +730,7 @@ impl Wallet {
}).to_string(); }).to_string();
// Send request to receiver. // Send request to receiver.
let req_res = Tor::post(body, url, runtime).await; let req_res = Tor::post(body, url).await;
if req_res.is_none() { if req_res.is_none() {
cancel_tx(); cancel_tx();
return None; return None;
@ -886,7 +907,7 @@ impl Wallet {
let wallet = self.clone(); let wallet = self.clone();
thread::spawn(move || { thread::spawn(move || {
let instance = wallet.instance.clone().unwrap(); let instance = wallet.instance.clone().unwrap();
let _ = cancel_tx(instance, None, &None, Some(id), None); cancel_tx(instance, None, &None, Some(id), None).unwrap();
// Setup posting flag, and ability to finalize. // Setup posting flag, and ability to finalize.
let mut w_data = wallet.data.write(); let mut w_data = wallet.data.write();
let mut data = w_data.clone().unwrap(); let mut data = w_data.clone().unwrap();
@ -1337,12 +1358,9 @@ fn start_api_server(wallet: &mut Wallet) -> Result<(ApiServer, u16), Error> {
Box::leak(Box::new(oneshot::channel::<()>())); Box::leak(Box::new(oneshot::channel::<()>()));
let mut apis = ApiServer::new(); let mut apis = ApiServer::new();
println!("Starting HTTP Foreign listener API server at {}.", api_addr);
let socket_addr: SocketAddr = api_addr.parse().unwrap(); let socket_addr: SocketAddr = api_addr.parse().unwrap();
let _ = apis.start(socket_addr, router, None, api_chan) let _ = apis.start(socket_addr, router, None, api_chan)
.map_err(|_| Error::GenericError("API thread failed to start".to_string()))?; .map_err(|_| Error::GenericError("API thread failed to start".to_string()))?;
println!("HTTP Foreign listener started.");
Ok((apis, free_port)) Ok((apis, free_port))
} }