tor: fix service availability check

This commit is contained in:
ardocrat 2024-05-15 20:27:58 +03:00
parent 5749c5a367
commit 2bb51d6757
3 changed files with 112 additions and 89 deletions

View file

@ -18,6 +18,8 @@ use egui::{Align, Id, Layout, Margin, RichText, Rounding, ScrollArea};
use egui::os::OperatingSystem;
use egui::scroll_area::ScrollBarVisibility;
use parking_lot::RwLock;
use tor_rtcompat::BlockOn;
use tor_rtcompat::tokio::TokioNativeTlsRuntime;
use grin_core::core::{amount_from_hr_string, amount_to_hr_string};
use grin_wallet_libwallet::SlatepackAddress;
@ -402,7 +404,7 @@ impl WalletTransport {
ui.add_space(6.0);
}
/// Draw Tor send content.
/// Draw Tor receive content.
fn tor_receive_ui(&mut self,
ui: &mut egui::Ui,
wallet: &Wallet,
@ -495,7 +497,7 @@ impl WalletTransport {
});
}
/// Draw Tor receive content.
/// Draw Tor send content.
fn tor_send_ui(&mut self, ui: &mut egui::Ui, cb: &dyn PlatformCallbacks) {
// Setup layout size.
let mut rect = ui.available_rect_before_wrap();
@ -684,12 +686,13 @@ impl WalletTransport {
let send_success = self.tor_success.clone();
let mut wallet = wallet.clone();
thread::spawn(move || {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
let runtime = TokioNativeTlsRuntime::create().unwrap();
let runtime_tor = runtime.clone();
runtime
.block_on(async {
if wallet.send_tor(a, &addr).await.is_some() {
if wallet.send_tor(a, &addr, runtime_tor)
.await
.is_some() {
let mut w_send_success = send_success.write();
*w_send_success = true;
} else {
@ -742,15 +745,16 @@ impl WalletTransport {
let send_success = self.tor_success.clone();
let mut wallet = wallet.clone();
thread::spawn(move || {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
let runtime = TokioNativeTlsRuntime::create().unwrap();
let runtime_tor = runtime.clone();
runtime
.block_on(async {
let addr_str = addr_text.as_str();
let addr = &SlatepackAddress::try_from(addr_str)
.unwrap();
if wallet.send_tor(a, &addr).await.is_some() {
if wallet.send_tor(a, &addr, runtime_tor)
.await
.is_some() {
let mut w_send_success = send_success.write();
*w_send_success = true;
} else {

View file

@ -32,7 +32,7 @@ use sha2::Sha512;
use tokio::time::{sleep, sleep_until};
use tor_config::CfgPath;
use tor_rtcompat::tokio::TokioNativeTlsRuntime;
use tor_rtcompat::Runtime;
use tor_rtcompat::{BlockOn, PreferredRuntime, Runtime};
use tor_hsrproxy::OnionServiceReverseProxy;
use tor_hsrproxy::config::{Encapsulation, ProxyAction, ProxyPattern, ProxyRule, TargetAddr, ProxyConfigBuilder};
use tor_hsservice::config::OnionServiceConfigBuilder;
@ -68,7 +68,9 @@ pub struct Tor {
/// Starting Onion services identifiers.
starting_services: Arc<RwLock<BTreeSet<String>>>,
/// Failed Onion services identifiers.
failed_services: Arc<RwLock<BTreeSet<String>>>
failed_services: Arc<RwLock<BTreeSet<String>>>,
/// Checking Onion services identifiers.
checking_services: Arc<RwLock<BTreeSet<String>>>
}
impl Default for Tor {
@ -76,7 +78,8 @@ impl Default for Tor {
Self {
running_services: Arc::new(RwLock::new(BTreeMap::new())),
starting_services: Arc::new(RwLock::new(BTreeSet::new())),
failed_services: Arc::new(RwLock::new(BTreeSet::new()))
failed_services: Arc::new(RwLock::new(BTreeSet::new())),
checking_services: Arc::new(RwLock::new(BTreeSet::new()))
}
}
}
@ -108,9 +111,8 @@ impl Tor {
}
/// Send post request using Tor.
pub async fn post(body: String, url: String) -> Option<String> {
pub async fn post(body: String, url: String, runtime: TokioNativeTlsRuntime) -> Option<String> {
// Create client.
let runtime = TokioNativeTlsRuntime::create().unwrap();
let (client, _) = Self::build_client(runtime).await;
// Create http tor-powered client to post data.
let tls_connector = TlsConnector::builder().unwrap().build().unwrap();
@ -156,6 +158,12 @@ impl Tor {
r_services.contains(id)
}
/// Check if Onion service is checking.
pub fn is_service_checking(id: &String) -> bool {
let r_services = TOR_SERVER_STATE.checking_services.read().unwrap();
r_services.contains(id)
}
/// Stop running Onion service.
pub fn stop_service(id: &String) {
let mut w_services = TOR_SERVER_STATE.running_services.write().unwrap();
@ -180,6 +188,7 @@ impl Tor {
}
let service_id = id.clone();
thread::spawn(move || {
let runtime = TokioNativeTlsRuntime::create().unwrap();
let runtime_client = runtime.clone();
runtime.spawn(async move {
@ -200,50 +209,58 @@ impl Tor {
Self::run_service_proxy(addr, client, service.clone(), request, hs_nickname.clone())
).await.unwrap();
// Check service availability.
let url = format!("http://{}", service.onion_name().unwrap().to_string());
std::thread::spawn(move || {
thread::sleep(Duration::from_millis(3000));
let runtime = TokioNativeTlsRuntime::create().unwrap();
let runtime_client = runtime.clone();
runtime.spawn(async move {
loop {
if !Self::is_service_running(&service_id) &&
!Self::is_service_starting(&service_id) {
break;
// Check service availability if not checking.
if Self::is_service_checking(&service_id) {
return;
}
let url = format!("http://{}/v2/foreign", service.onion_name().unwrap().to_string());
thread::spawn(move || {
let runtime = TokioNativeTlsRuntime::create().unwrap();
let client_runtime = runtime.clone();
// Put service to checking.
{
let mut w_services = TOR_SERVER_STATE.checking_services.write().unwrap();
w_services.insert(service_id.clone());
}
runtime
.spawn(async move {
loop {
println!("start loop");
// Create client.
let (client, _) = Self::build_client(runtime_client.clone()).await;
let (client, _) = Self::build_client(client_runtime.clone()).await;
// Create http tor-powered client to ping service.
let tls_connector = TlsConnector::builder().unwrap().build().unwrap();
let tor_connector = ArtiHttpConnector::new(client, tls_connector);
let http = hyper::Client::builder().build::<_, Body>(tor_connector);
println!("start get {}", url);
match http.get(Uri::from_str(url.as_str()).unwrap()).await {
Ok(_) => {
// Remove service from starting.
let mut w_services = TOR_SERVER_STATE.starting_services.write().unwrap();
w_services.remove(&service_id);
println!("check success");
println!("OK");
},
Err(e) => {
// Put service to starting.
let mut w_services = TOR_SERVER_STATE.starting_services.write().unwrap();
w_services.insert(service_id.clone());
println!("check err: {}", e);
println!("err: {}", e);
},
}
if !Self::is_service_running(&service_id) &&
!Self::is_service_starting(&service_id) {
// Remove service from checking.
let mut w_services = TOR_SERVER_STATE.checking_services.write().unwrap();
w_services.remove(&service_id);
break;
}
}
}).unwrap();
});
}).unwrap();
});
}
/// Launch Onion service proxy.
@ -282,7 +299,6 @@ impl Tor {
.handle_requests(runtime, nickname.clone(), request)
.await {
Ok(()) => {
println!("service stopped");
// Remove service from running.
let mut w_services =
TOR_SERVER_STATE.running_services.write().unwrap();

View file

@ -24,6 +24,8 @@ use std::thread::Thread;
use std::time::Duration;
use futures::channel::oneshot;
use serde_json::{json, Value};
use tor_rtcompat::tokio::TokioNativeTlsRuntime;
use rand::Rng;
use grin_api::{ApiServer, Router};
use grin_chain::SyncStatus;
@ -40,7 +42,6 @@ use grin_wallet_impls::{DefaultLCProvider, DefaultWalletImpl, HTTPNodeClient};
use grin_wallet_libwallet::{address, Error, InitTxArgs, IssueInvoiceTxArgs, NodeClient, RetrieveTxQueryArgs, RetrieveTxQuerySortField, RetrieveTxQuerySortOrder, Slate, SlatepackAddress, SlateState, SlateVersion, StatusMessage, TxLogEntry, TxLogEntryType, VersionedSlate, WalletInst, WalletLCProvider};
use grin_wallet_libwallet::api_impl::owner::{cancel_tx, retrieve_summary_info, retrieve_txs};
use grin_wallet_util::OnionV3Address;
use rand::Rng;
use crate::AppConfig;
use crate::node::{Node, NodeConfig};
@ -675,7 +676,9 @@ impl Wallet {
}
/// Send amount to provided address with Tor transport.
pub async fn send_tor(&mut self, amount: u64, addr: &SlatepackAddress) -> Option<Slate> {
pub async fn send_tor(&mut self,
amount: u64, addr: &SlatepackAddress,
runtime: TokioNativeTlsRuntime) -> Option<Slate> {
// Initialize transaction.
let send_res = self.send(amount);
@ -706,7 +709,7 @@ impl Wallet {
}).to_string();
// Send request to receiver.
let req_res = Tor::post(body, url).await;
let req_res = Tor::post(body, url, runtime).await;
if req_res.is_none() {
cancel_tx();
return None;