wallet: optimize rwlock, api server port randomize

This commit is contained in:
ardocrat 2024-05-15 14:31:10 +03:00
parent 880f5629f5
commit 7ee8fb2ff6

View file

@ -17,15 +17,18 @@ use std::fs::File;
use std::io::Write;
use std::net::{SocketAddr, TcpListener};
use std::path::PathBuf;
use std::sync::{Arc, mpsc, RwLock};
use std::sync::{Arc, mpsc};
use parking_lot::RwLock;
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU8, Ordering};
use std::thread::Thread;
use std::time::Duration;
use futures::channel::oneshot;
use serde_json::{json, Value};
use grin_api::{ApiServer, Router};
use grin_chain::SyncStatus;
use grin_core::global;
use grin_core::global::ChainTypes;
use grin_keychain::{ExtKeychain, Identifier, Keychain};
use grin_util::{Mutex, ToHex};
use grin_util::secp::SecretKey;
@ -37,7 +40,7 @@ use grin_wallet_impls::{DefaultLCProvider, DefaultWalletImpl, HTTPNodeClient};
use grin_wallet_libwallet::{address, Error, InitTxArgs, IssueInvoiceTxArgs, NodeClient, RetrieveTxQueryArgs, RetrieveTxQuerySortField, RetrieveTxQuerySortOrder, Slate, SlatepackAddress, SlateState, SlateVersion, StatusMessage, TxLogEntry, TxLogEntryType, VersionedSlate, WalletInst, WalletLCProvider};
use grin_wallet_libwallet::api_impl::owner::{cancel_tx, retrieve_summary_info, retrieve_txs};
use grin_wallet_util::OnionV3Address;
use serde_json::{json, Value};
use rand::Rng;
use crate::AppConfig;
use crate::node::{Node, NodeConfig};
@ -95,11 +98,6 @@ pub struct Wallet {
repair_progress: Arc<AtomicU8>
}
/// Default Foreign API server host.
const DEFAULT_FOREIGN_API_HOST: &str = "127.0.0.1";
/// Default Foreign API server port.
const DEFAULT_FOREIGN_API_PORT: u16 = 3415;
impl Wallet {
/// Create new [`Wallet`] instance with provided [`WalletConfig`].
fn new(config: WalletConfig) -> Self {
@ -239,62 +237,62 @@ impl Wallet {
/// Get Slatepack address to receive txs at transport.
pub fn slatepack_address(&self) -> Option<String> {
let r_address = self.slatepack_address.read().unwrap();
let r_address = self.slatepack_address.read();
if r_address.is_some() {
let addr = r_address.clone().unwrap();
return Some(addr)
let addr = r_address.clone();
return addr
}
None
}
/// Get wallet config.
pub fn get_config(&self) -> WalletConfig {
self.config.read().unwrap().clone()
self.config.read().clone()
}
/// Change wallet name.
pub fn change_name(&self, name: String) {
let mut w_config = self.config.write().unwrap();
let mut w_config = self.config.write();
w_config.name = name;
w_config.save();
}
/// Check if start of Tor listener on wallet opening is needed.
pub fn auto_start_tor_listener(&self) -> bool {
let r_config = self.config.read().unwrap();
let r_config = self.config.read();
r_config.enable_tor_listener.unwrap_or(true)
}
/// Update start of Tor listener on wallet opening.
pub fn update_auto_start_tor_listener(&self, start: bool) {
let mut w_config = self.config.write().unwrap();
let mut w_config = self.config.write();
w_config.enable_tor_listener = Some(start);
w_config.save();
}
/// Check if Dandelion usage is needed to post transactions.
pub fn can_use_dandelion(&self) -> bool {
let r_config = self.config.read().unwrap();
let r_config = self.config.read();
r_config.use_dandelion.unwrap_or(true)
}
/// Update usage of Dandelion to post transactions.
pub fn update_use_dandelion(&self, use_dandelion: bool) {
let mut w_config = self.config.write().unwrap();
let mut w_config = self.config.write();
w_config.use_dandelion = Some(use_dandelion);
w_config.save();
}
/// Update minimal amount of confirmations.
pub fn update_min_confirmations(&self, min_confirmations: u64) {
let mut w_config = self.config.write().unwrap();
let mut w_config = self.config.write();
w_config.min_confirmations = min_confirmations;
w_config.save();
}
/// Update external connection identifier.
pub fn update_ext_conn_id(&self, id: Option<i64>) {
let mut w_config = self.config.write().unwrap();
let mut w_config = self.config.write();
w_config.ext_conn_id = id;
w_config.save();
}
@ -306,7 +304,7 @@ impl Wallet {
}
// Create new wallet instance if sync thread was stopped or instance was not created.
if self.sync_thread.read().unwrap().is_none() || self.instance.is_none() {
if self.sync_thread.read().is_none() || self.instance.is_none() {
let config = self.get_config();
let new_instance = Self::create_wallet_instance(config.clone())?;
self.instance = Some(new_instance);
@ -333,12 +331,11 @@ impl Wallet {
wallet_inst.set_parent_key_id_by_name(label.as_str())?;
// Start new synchronization thread or wake up existing one.
let mut thread_w = self.sync_thread.write().unwrap();
let mut thread_w = self.sync_thread.write();
if thread_w.is_none() {
let thread = start_sync(self.clone());
*thread_w = Some(thread);
} else {
println!("unfreeze thread");
thread_w.clone().unwrap().unpark();
}
self.is_open.store(true, Ordering::Relaxed);
@ -353,7 +350,7 @@ impl Wallet {
// Set slatepack address.
let mut api = Owner::new(self.instance.clone().unwrap(), None);
controller::owner_single_use(None, None, Some(&mut api), |api, m| {
let mut w_address = self.slatepack_address.write().unwrap();
let mut w_address = self.slatepack_address.write();
*w_address = Some(api.get_slatepack_address(m, 0)?.to_string());
Ok(())
})?;
@ -364,7 +361,7 @@ impl Wallet {
/// Get external connection id applied to [`WalletInstance`]
/// after wallet opening if sync is running or get it from config.
pub fn get_current_ext_conn_id(&self) -> Option<i64> {
if self.sync_thread.read().unwrap().is_some() {
if self.sync_thread.read().is_some() {
let ext_conn_id = self.instance_ext_conn_id.load(Ordering::Relaxed);
if ext_conn_id == 0 {
None
@ -400,10 +397,10 @@ impl Wallet {
thread::spawn(move || {
// Stop running API server.
let api_server_exists = {
wallet_close.foreign_api_server.read().unwrap().is_some()
wallet_close.foreign_api_server.read().is_some()
};
if api_server_exists {
let mut w_api_server = wallet_close.foreign_api_server.write().unwrap();
let mut w_api_server = wallet_close.foreign_api_server.write();
w_api_server.as_mut().unwrap().0.stop();
*w_api_server = None;
}
@ -457,7 +454,7 @@ impl Wallet {
controller::owner_single_use(None, None, Some(&mut api), |api, m| {
api.set_active_account(m, label)?;
// Set Slatepack address.
let mut w_address = self.slatepack_address.write().unwrap();
let mut w_address = self.slatepack_address.write();
*w_address = Some(api.get_slatepack_address(m, 0)?.to_string());
Ok(())
})?;
@ -467,12 +464,12 @@ impl Wallet {
Tor::stop_service(&cur_service_id);
// Save account label into config.
let mut w_config = self.config.write().unwrap();
let mut w_config = self.config.write();
w_config.account = label.to_owned();
w_config.save();
// Clear wallet info.
let mut w_data = self.data.write().unwrap();
let mut w_data = self.data.write();
*w_data = None;
// Reset progress values.
@ -486,7 +483,7 @@ impl Wallet {
/// Get list of accounts for the wallet.
pub fn accounts(&self) -> Vec<WalletAccount> {
self.accounts.read().unwrap().clone()
self.accounts.read().clone()
}
/// Set wallet reopen status.
@ -538,13 +535,13 @@ impl Wallet {
/// Get wallet data.
pub fn get_data(&self) -> Option<WalletData> {
let r_data = self.data.read().unwrap();
let r_data = self.data.read();
r_data.clone()
}
/// Wake up wallet thread to sync wallet data and update statuses.
pub fn sync(&self) {
let thread_r = self.sync_thread.read().unwrap();
let thread_r = self.sync_thread.read();
if let Some(thread) = thread_r.as_ref() {
thread.unpark();
}
@ -552,7 +549,7 @@ impl Wallet {
/// Get running Foreign API server port.
pub fn foreign_api_port(&self) -> Option<u16> {
let r_api = self.foreign_api_server.read().unwrap();
let r_api = self.foreign_api_server.read();
if r_api.is_some() {
let api = r_api.as_ref().unwrap();
return Some(api.1);
@ -717,13 +714,11 @@ impl Wallet {
// Parse response and finalize transaction.
let res: Value = serde_json::from_str(&req_res.unwrap()).unwrap();
println!("Response: {}", res);
if res["error"] != json!(null) {
let report = format!(
"Posting transaction slate: Error: {}, Message: {}",
res["error"]["code"], res["error"]["message"]
);
println!("{}", report);
cancel_tx();
return None;
}
@ -745,11 +740,9 @@ impl Wallet {
let result = self.post(&slate, self.can_use_dandelion());
match result {
Ok(_) => {
println!("Tx sent successfully", );
Ok(())
}
Err(e) => {
eprintln!("Tx sent fail: {}", e);
Err(e)
}
}
@ -854,7 +847,7 @@ impl Wallet {
slate.state = SlateState::Standard3
};
if let Some(tx) = self.tx_by_slate(&slate) {
let mut w_data = self.data.write().unwrap();
let mut w_data = self.data.write();
let mut data = w_data.clone().unwrap();
for t in &mut data.txs {
if t.data.id == tx.data.id {
@ -874,7 +867,7 @@ impl Wallet {
pub fn cancel(&mut self, id: u32) {
// Setup cancelling status.
{
let mut w_data = self.data.write().unwrap();
let mut w_data = self.data.write();
let mut data = w_data.clone().unwrap();
let txs = data.txs.iter_mut().map(|tx| {
if tx.data.id == id {
@ -892,7 +885,7 @@ impl Wallet {
let instance = wallet.instance.clone().unwrap();
let _ = cancel_tx(instance, None, &None, Some(id), None);
// Setup posting flag, and ability to finalize.
let mut w_data = wallet.data.write().unwrap();
let mut w_data = wallet.data.write();
let mut data = w_data.clone().unwrap();
let txs = data.txs.iter_mut().map(|tx| {
if tx.data.id == id {
@ -996,8 +989,6 @@ fn start_sync(mut wallet: Wallet) -> Thread {
wallet.repair_progress.store(0, Ordering::Relaxed);
thread::spawn(move || loop {
println!("SYNC {}, attempts: {}", wallet.get_config().name, wallet.get_sync_attempts());
// Close wallet on chain type change.
if wallet.get_config().chain_type != AppConfig::chain_type() {
wallet.close();
@ -1006,11 +997,11 @@ fn start_sync(mut wallet: Wallet) -> Thread {
// Stop syncing if wallet was closed.
if !wallet.is_open() {
// Clear thread instance.
let mut thread_w = wallet.sync_thread.write().unwrap();
let mut thread_w = wallet.sync_thread.write();
*thread_w = None;
// Clear wallet info.
let mut w_data = wallet.data.write().unwrap();
let mut w_data = wallet.data.write();
*w_data = None;
return;
}
@ -1034,12 +1025,12 @@ fn start_sync(mut wallet: Wallet) -> Thread {
// Start Foreign API listener if API server is not running.
let mut api_server_running = {
wallet.foreign_api_server.read().unwrap().is_some()
wallet.foreign_api_server.read().is_some()
};
if !api_server_running && wallet.is_open() {
match start_api_server(&mut wallet) {
Ok(api_server) => {
let mut api_server_w = wallet.foreign_api_server.write().unwrap();
let mut api_server_w = wallet.foreign_api_server.write();
*api_server_w = Some(api_server);
api_server_running = true;
}
@ -1050,7 +1041,7 @@ fn start_sync(mut wallet: Wallet) -> Thread {
// Start Tor service if API server is running and wallet is open.
if wallet.auto_start_tor_listener() && wallet.is_open() && api_server_running &&
!Tor::is_service_running(&wallet.identifier()) {
let r_foreign_api = wallet.foreign_api_server.read().unwrap();
let r_foreign_api = wallet.foreign_api_server.read();
let api = r_foreign_api.as_ref().unwrap();
if let Ok(sec_key) = wallet.secret_key() {
Tor::start_service(api.1, sec_key, &wallet.identifier());
@ -1069,11 +1060,11 @@ fn start_sync(mut wallet: Wallet) -> Thread {
// Stop sync if wallet was closed.
if !wallet.is_open() {
// Clear thread instance.
let mut thread_w = wallet.sync_thread.write().unwrap();
let mut thread_w = wallet.sync_thread.write();
*thread_w = None;
// Clear wallet info.
let mut w_data = wallet.data.write().unwrap();
let mut w_data = wallet.data.write();
*w_data = None;
return;
}
@ -1085,16 +1076,6 @@ fn start_sync(mut wallet: Wallet) -> Thread {
} else {
SYNC_DELAY
};
if failed_sync {
println!("SYNC {} failed, attempts: {}, wait {}ms",
wallet.get_config().name,
wallet.get_sync_attempts(),
delay.as_millis());
} else {
println!("SYNC success for {}, wait {}ms",
wallet.get_config().name,
delay.as_millis());
}
thread::park_timeout(delay);
}).thread().clone()
}
@ -1217,8 +1198,6 @@ fn sync_wallet_data(wallet: &Wallet) {
// Setup transaction posting status based on slate state.
let posting = if unconfirmed_sent_or_received {
println!("{}", serde_json::to_string(tx).unwrap());
// Create slate to check existing file.
let is_invoice = tx.tx_type == TxLogEntryType::TxReceived;
let mut slate = Slate::blank(0, is_invoice);
@ -1285,7 +1264,7 @@ fn sync_wallet_data(wallet: &Wallet) {
}
// Update wallet data.
let mut w_data = wallet.data.write().unwrap();
let mut w_data = wallet.data.write();
*w_data = Some(WalletData { info: info.1, txs: new_txs });
return;
}
@ -1317,11 +1296,17 @@ fn sync_wallet_data(wallet: &Wallet) {
}
}
/// Start Foreign API server to receive mining rewards from Stratum server.
/// Start Foreign API server to receive txs over transport and mining rewards.
fn start_api_server(wallet: &mut Wallet) -> Result<(ApiServer, u16), Error> {
let host = "127.0.0.1";
// Find free port.
let free_port = (DEFAULT_FOREIGN_API_PORT..).find(|port| {
return match TcpListener::bind((DEFAULT_FOREIGN_API_HOST, port.to_owned())) {
let port = if wallet.get_config().chain_type == ChainTypes::Mainnet {
rand::thread_rng().gen_range(37000..40000)
} else {
rand::thread_rng().gen_range(47000..50000)
};
let free_port = (port..).find(|port| {
return match TcpListener::bind((host, port.to_owned())) {
Ok(_) => {
let node_p2p_port = NodeConfig::get_p2p_port();
let node_api_port = NodeConfig::get_api_ip_port().1;
@ -1332,7 +1317,7 @@ fn start_api_server(wallet: &mut Wallet) -> Result<(ApiServer, u16), Error> {
}).unwrap();
// Setup API server address.
let api_addr = format!("{}:{}", DEFAULT_FOREIGN_API_HOST, free_port);
let api_addr = format!("{}:{}", host, free_port);
// Start Foreign API server thread.
let instance = wallet.instance.clone().unwrap();
@ -1362,14 +1347,14 @@ fn start_api_server(wallet: &mut Wallet) -> Result<(ApiServer, u16), Error> {
fn update_accounts(wallet: &Wallet, current_height: u64, current_spendable: Option<u64>) {
// Update only current account if list is not empty.
if current_spendable.is_some() {
let mut accounts = wallet.accounts.read().unwrap().clone();
let mut accounts = wallet.accounts.read().clone();
for mut a in accounts.iter_mut() {
if a.label == wallet.get_config().account {
a.spendable_amount = current_spendable.unwrap();
}
}
// Save accounts data.
let mut w_data = wallet.accounts.write().unwrap();
let mut w_data = wallet.accounts.write();
*w_data = accounts;
} else {
let mut api = Owner::new(wallet.instance.clone().unwrap(), None);
@ -1402,7 +1387,7 @@ fn update_accounts(wallet: &Wallet, current_height: u64, current_spendable: Opti
accounts.reverse();
// Save accounts data.
let mut w_data = wallet.accounts.write().unwrap();
let mut w_data = wallet.accounts.write();
*w_data = accounts;
// Set current active account from config.