From e74c0e2571b9a79635553065d0974205bb151020 Mon Sep 17 00:00:00 2001 From: Yeastplume Date: Mon, 18 Nov 2019 10:49:51 +0000 Subject: [PATCH] Add updater thread + Updater related functions (#253) * first attempt at adding updater thread * rustfmt * rustfmt * many lifetimes made static to allow api to spawn separate thread * add first pass at updater thread functionality * rustfmt * add mpsc for returning status from update functions * rustfmt * add stop state, ensure update is triggered by wallet functions when wallet updater is running * rustfmt * add update status function to owner API, V3 owner api functions * rustfmt * change update warning * adding tests for new updater control functions * documentation updates * rustfmt --- api/src/foreign_rpc.rs | 11 +- api/src/owner.rs | 263 ++++++++++++++++++++++-- api/src/owner_rpc.rs | 19 +- api/src/owner_rpc_s.rs | 118 ++++++++++- controller/src/command.rs | 142 ++++++------- controller/src/controller.rs | 26 ++- controller/tests/updater_thread.rs | 122 +++++++++++ impls/src/backends/lmdb.rs | 2 +- impls/src/test_framework/mod.rs | 2 +- libwallet/src/api_impl.rs | 1 + libwallet/src/api_impl/owner.rs | 87 ++++++-- libwallet/src/api_impl/owner_updater.rs | 140 +++++++++++++ libwallet/src/internal/scan.rs | 109 +++++++--- libwallet/src/lib.rs | 1 + src/bin/grin-wallet.yml | 2 +- tests/common/mod.rs | 1 + tests/owner_v3_lifecycle.rs | 68 +++++- 17 files changed, 944 insertions(+), 170 deletions(-) create mode 100644 controller/tests/updater_thread.rs create mode 100644 libwallet/src/api_impl/owner_updater.rs diff --git a/api/src/foreign_rpc.rs b/api/src/foreign_rpc.rs index f1156af7..b90ee50c 100644 --- a/api/src/foreign_rpc.rs +++ b/api/src/foreign_rpc.rs @@ -691,9 +691,14 @@ pub fn run_doctest_foreign( false, ); //update local outputs after each block, so transaction IDs stay consistent - let (wallet_refreshed, _) = - api_impl::owner::retrieve_summary_info(wallet1.clone(), (&mask1).as_ref(), true, 1) - .unwrap(); + let (wallet_refreshed, _) = api_impl::owner::retrieve_summary_info( + wallet1.clone(), + (&mask1).as_ref(), + &None, + true, + 1, + ) + .unwrap(); assert!(wallet_refreshed); } diff --git a/api/src/owner.rs b/api/src/owner.rs index e9067088..b830bced 100644 --- a/api/src/owner.rs +++ b/api/src/owner.rs @@ -22,7 +22,8 @@ use crate::core::core::Transaction; use crate::core::global; use crate::impls::create_sender; use crate::keychain::{Identifier, Keychain}; -use crate::libwallet::api_impl::owner; +use crate::libwallet::api_impl::owner_updater::{start_updater_log_thread, StatusMessage}; +use crate::libwallet::api_impl::{owner, owner_updater}; use crate::libwallet::{ AcctPathMapping, Error, ErrorKind, InitTxArgs, IssueInvoiceTxArgs, NodeClient, NodeHeightResult, OutputCommitMapping, Slate, TxLogEntry, WalletInfo, WalletInst, @@ -30,7 +31,11 @@ use crate::libwallet::{ }; use crate::util::secp::key::SecretKey; use crate::util::{from_hex, static_secp_instance, LoggingConfig, Mutex, ZeroingString}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::{channel, Sender}; use std::sync::Arc; +use std::thread; +use std::time::Duration; /// Main interface into all wallet API functions. /// Wallet APIs are split into two seperate blocks of functionality @@ -46,23 +51,32 @@ use std::sync::Arc; /// its operation, then 'close' the wallet (unloading references to the keychain and master /// seed). -pub struct Owner<'a, L, C, K> +pub struct Owner where - L: WalletLCProvider<'a, C, K>, - C: NodeClient + 'a, - K: Keychain + 'a, + L: WalletLCProvider<'static, C, K> + 'static, + C: NodeClient + 'static, + K: Keychain + 'static, { /// contain all methods to manage the wallet - pub wallet_inst: Arc>>>, + pub wallet_inst: Arc>>>, /// Flag to normalize some output during testing. Can mostly be ignored. pub doctest_mode: bool, /// Share ECDH key pub shared_key: Arc>>, + /// Update thread + updater: Arc>>, + /// Stop state for update thread + pub updater_running: Arc, + /// Sender for update messages + status_tx: Mutex>>, + /// Holds all update and status messages returned by the + /// updater process + updater_messages: Arc>>, } -impl<'a, L, C, K> Owner<'a, L, C, K> +impl Owner where - L: WalletLCProvider<'a, C, K>, + L: WalletLCProvider<'static, C, K> + 'static, C: NodeClient, K: Keychain, { @@ -141,11 +155,26 @@ where /// /// ``` - pub fn new(wallet_inst: Arc>>>) -> Self { + pub fn new(wallet_inst: Arc>>>) -> Self { + let (tx, rx) = channel(); + + let updater_running = Arc::new(AtomicBool::new(false)); + let updater = Arc::new(Mutex::new(owner_updater::Updater::new( + wallet_inst.clone(), + updater_running.clone(), + ))); + + let updater_messages = Arc::new(Mutex::new(vec![])); + let _ = start_updater_log_thread(rx, updater_messages.clone()); + Owner { wallet_inst, doctest_mode: false, shared_key: Arc::new(Mutex::new(None)), + updater, + updater_running, + status_tx: Mutex::new(Some(tx)), + updater_messages, } } @@ -304,6 +333,8 @@ where /// provided during wallet instantiation). If `false`, the results will /// contain output information that may be out-of-date (from the last time /// the wallet's output set was refreshed against the node). + /// Note this setting is ignored if the updater process is running via a call to + /// [`start_updater`](struct.Owner.html#method.start_updater) /// * `tx_id` - If `Some(i)`, only return the outputs associated with /// the transaction log entry of id `i`. /// @@ -342,9 +373,18 @@ where refresh_from_node: bool, tx_id: Option, ) -> Result<(bool, Vec), Error> { + let tx = { + let t = self.status_tx.lock(); + t.clone() + }; + let refresh_from_node = match self.updater_running.load(Ordering::Relaxed) { + true => false, + false => refresh_from_node, + }; owner::retrieve_outputs( self.wallet_inst.clone(), keychain_mask, + &tx, include_spent, refresh_from_node, tx_id, @@ -362,6 +402,8 @@ where /// provided during wallet instantiation). If `false`, the results will /// contain transaction information that may be out-of-date (from the last time /// the wallet's output set was refreshed against the node). + /// Note this setting is ignored if the updater process is running via a call to + /// [`start_updater`](struct.Owner.html#method.start_updater) /// * `tx_id` - If `Some(i)`, only return the transactions associated with /// the transaction log entry of id `i`. /// * `tx_slate_id` - If `Some(uuid)`, only return transactions associated with @@ -400,9 +442,18 @@ where tx_id: Option, tx_slate_id: Option, ) -> Result<(bool, Vec), Error> { + let tx = { + let t = self.status_tx.lock(); + t.clone() + }; + let refresh_from_node = match self.updater_running.load(Ordering::Relaxed) { + true => false, + false => refresh_from_node, + }; let mut res = owner::retrieve_txs( self.wallet_inst.clone(), keychain_mask, + &tx, refresh_from_node, tx_id, tx_slate_id, @@ -431,6 +482,8 @@ where /// provided during wallet instantiation). If `false`, the results will /// contain transaction information that may be out-of-date (from the last time /// the wallet's output set was refreshed against the node). + /// Note this setting is ignored if the updater process is running via a call to + /// [`start_updater`](struct.Owner.html#method.start_updater) /// * `minimum_confirmations` - The minimum number of confirmations an output /// should have before it's included in the 'amount_currently_spendable' total /// @@ -464,9 +517,18 @@ where refresh_from_node: bool, minimum_confirmations: u64, ) -> Result<(bool, WalletInfo), Error> { + let tx = { + let t = self.status_tx.lock(); + t.clone() + }; + let refresh_from_node = match self.updater_running.load(Ordering::Relaxed) { + true => false, + false => refresh_from_node, + }; owner::retrieve_summary_info( self.wallet_inst.clone(), keychain_mask, + &tx, refresh_from_node, minimum_confirmations, ) @@ -531,7 +593,7 @@ where /// minimum_confirmations: 2, /// max_outputs: 500, /// num_change_outputs: 1, - /// selection_strategy_is_use_all: true, + /// selection_strategy_is_use_all: false, /// message: Some("Have some Grins. Love, Yeastplume".to_owned()), /// ..Default::default() /// }; @@ -680,7 +742,7 @@ where /// minimum_confirmations: 2, /// max_outputs: 500, /// num_change_outputs: 1, - /// selection_strategy_is_use_all: true, + /// selection_strategy_is_use_all: false, /// ..Default::default() /// }; /// @@ -741,7 +803,7 @@ where /// minimum_confirmations: 10, /// max_outputs: 500, /// num_change_outputs: 1, - /// selection_strategy_is_use_all: true, + /// selection_strategy_is_use_all: false, /// message: Some("Remember to lock this when we're happy this is sent".to_owned()), /// ..Default::default() /// }; @@ -805,7 +867,7 @@ where /// minimum_confirmations: 10, /// max_outputs: 500, /// num_change_outputs: 1, - /// selection_strategy_is_use_all: true, + /// selection_strategy_is_use_all: false, /// message: Some("Finalize this tx now".to_owned()), /// ..Default::default() /// }; @@ -865,7 +927,7 @@ where /// minimum_confirmations: 10, /// max_outputs: 500, /// num_change_outputs: 1, - /// selection_strategy_is_use_all: true, + /// selection_strategy_is_use_all: false, /// message: Some("Post this tx".to_owned()), /// ..Default::default() /// }; @@ -937,7 +999,7 @@ where /// minimum_confirmations: 10, /// max_outputs: 500, /// num_change_outputs: 1, - /// selection_strategy_is_use_all: true, + /// selection_strategy_is_use_all: false, /// message: Some("Cancel this tx".to_owned()), /// ..Default::default() /// }; @@ -964,7 +1026,17 @@ where tx_id: Option, tx_slate_id: Option, ) -> Result<(), Error> { - owner::cancel_tx(self.wallet_inst.clone(), keychain_mask, tx_id, tx_slate_id) + let tx = { + let t = self.status_tx.lock(); + t.clone() + }; + owner::cancel_tx( + self.wallet_inst.clone(), + keychain_mask, + &tx, + tx_id, + tx_slate_id, + ) } /// Retrieves the stored transaction associated with a TxLogEntry. Can be used even after the @@ -1043,7 +1115,7 @@ where /// minimum_confirmations: 10, /// max_outputs: 500, /// num_change_outputs: 1, - /// selection_strategy_is_use_all: true, + /// selection_strategy_is_use_all: false, /// message: Some("Just verify messages".to_owned()), /// ..Default::default() /// }; @@ -1140,6 +1212,7 @@ where keychain_mask, start_height, delete_unconfirmed, + &None, ) } @@ -1657,6 +1730,162 @@ where let lc = w_lock.lc_provider()?; lc.delete_wallet(name) } + + /// Starts a background wallet update thread, which performs the wallet update process + /// automatically at the frequency specified. + /// + /// The updater process is as follows: + /// + /// * Reconcile the wallet outputs against the node's current UTXO set, confirming + /// transactions if needs be. + /// * Look up transactions by kernel in cases where it's necessary (for instance, when + /// there are no change outputs for a transaction and transaction status can't be + /// inferred from the output state. + /// * Incrementally perform a scan of the UTXO set, correcting outputs and transactions + /// where their local state differs from what's on-chain. The wallet stores the last + /// position scanned, and will scan back 100 blocks worth of UTXOs on each update, to + /// correct any differences due to forks or otherwise. + /// + /// Note that an update process can take a long time, particularly when the entire + /// UTXO set is being scanned for correctness. The wallet status can be determined by + /// calling the [`get_updater_messages`](struct.Owner.html#method.get_updater_messages). + /// + /// # Arguments + /// + /// * `keychain_mask` - Wallet secret mask to XOR against the stored wallet seed before using, if + /// being used. + /// * `frequency`: The frequency at which to call the update process. Note this is + /// time elapsed since the last successful update process. If calling via the JSON-RPC + /// api, this represents milliseconds. + /// + /// # Returns + /// * Ok if successful + /// * or [`libwallet::Error`](../grin_wallet_libwallet/struct.Error.html) if an error is encountered. + /// + /// # Example + /// Set up as in [`new`](struct.Owner.html#method.new) method above. + /// ``` + /// # grin_wallet_api::doctest_helper_setup_doc_env!(wallet, wallet_config); + /// + /// use grin_core::global::ChainTypes; + /// + /// use std::time::Duration; + /// + /// // Set up as above + /// # let api_owner = Owner::new(wallet.clone()); + /// + /// let res = api_owner.start_updater(None, Duration::from_secs(60)); + /// + /// if let Ok(_) = res { + /// // ... + /// } + /// ``` + + pub fn start_updater( + &self, + keychain_mask: Option<&SecretKey>, + frequency: Duration, + ) -> Result<(), Error> { + let updater_inner = self.updater.clone(); + let tx_inner = { + let t = self.status_tx.lock(); + t.clone() + }; + let keychain_mask = match keychain_mask { + Some(m) => Some(m.clone()), + None => None, + }; + let _ = thread::Builder::new() + .name("wallet-updater".to_string()) + .spawn(move || { + let u = updater_inner.lock(); + if let Err(e) = u.run(frequency, keychain_mask, &tx_inner) { + error!("Wallet state updater failed with error: {:?}", e); + } + })?; + Ok(()) + } + + /// Stops the background update thread. If the updater is currently updating, the + /// thread will stop after the next update + /// + /// # Arguments + /// + /// * None + /// + /// # Returns + /// * Ok if successful + /// * or [`libwallet::Error`](../grin_wallet_libwallet/struct.Error.html) if an error is encountered. + /// + /// # Example + /// Set up as in [`new`](struct.Owner.html#method.new) method above. + /// ``` + /// # grin_wallet_api::doctest_helper_setup_doc_env!(wallet, wallet_config); + /// + /// use grin_core::global::ChainTypes; + /// + /// use std::time::Duration; + /// + /// // Set up as above + /// # let api_owner = Owner::new(wallet.clone()); + /// + /// let res = api_owner.start_updater(None, Duration::from_secs(60)); + /// + /// if let Ok(_) = res { + /// // ... + /// } + /// + /// let res = api_owner.stop_updater(); + /// ``` + + pub fn stop_updater(&self) -> Result<(), Error> { + self.updater_running.store(false, Ordering::Relaxed); + Ok(()) + } + + /// Retrieve messages from the updater thread, up to `count` number of messages. + /// The resulting array will be ordered newest messages first. The updater will + /// store a maximum of 10,000 messages, after which it will start removing the oldest + /// messages as newer ones are created. + /// + /// Messages retrieved via this method are removed from the internal queue, so calling + /// this function at a specified interval should result in a complete message history. + /// + /// # Arguments + /// + /// * `count` - The number of messages to retrieve. + /// + /// # Returns + /// * Ok with a Vec of [`StatusMessage`](../grin_wallet_libwallet/api_impl/owner_updater/enum.StatusMessage.html) + /// * or [`libwallet::Error`](../grin_wallet_libwallet/struct.Error.html) if an error is encountered. + /// + /// # Example + /// Set up as in [`new`](struct.Owner.html#method.new) method above. + /// ``` + /// # grin_wallet_api::doctest_helper_setup_doc_env!(wallet, wallet_config); + /// + /// use grin_core::global::ChainTypes; + /// + /// use std::time::Duration; + /// + /// // Set up as above + /// # let api_owner = Owner::new(wallet.clone()); + /// + /// let res = api_owner.start_updater(None, Duration::from_secs(60)); + /// + /// let messages = api_owner.get_updater_messages(10000); + /// + /// if let Ok(_) = res { + /// // ... + /// } + /// + /// ``` + + pub fn get_updater_messages(&self, count: usize) -> Result, Error> { + let mut q = self.updater_messages.lock(); + let index = q.len().saturating_sub(count); + Ok(q.split_off(index)) + } } #[doc(hidden)] diff --git a/api/src/owner_rpc.rs b/api/src/owner_rpc.rs index 4bf4171a..c2db3ac2 100644 --- a/api/src/owner_rpc.rs +++ b/api/src/owner_rpc.rs @@ -1223,11 +1223,11 @@ pub trait OwnerRpc: Sync + Send { fn node_height(&self) -> Result; } -impl<'a, L, C, K> OwnerRpc for Owner<'a, L, C, K> +impl<'a, L, C, K> OwnerRpc for Owner where - L: WalletLCProvider<'a, C, K>, - C: NodeClient + 'a, - K: Keychain + 'a, + L: WalletLCProvider<'static, C, K>, + C: NodeClient + 'static, + K: Keychain + 'static, { fn accounts(&self) -> Result, ErrorKind> { Owner::accounts(self, None).map_err(|e| e.kind()) @@ -1458,9 +1458,14 @@ pub fn run_doctest_owner( false, ); //update local outputs after each block, so transaction IDs stay consistent - let (wallet_refreshed, _) = - api_impl::owner::retrieve_summary_info(wallet1.clone(), (&mask1).as_ref(), true, 1) - .unwrap(); + let (wallet_refreshed, _) = api_impl::owner::retrieve_summary_info( + wallet1.clone(), + (&mask1).as_ref(), + &None, + true, + 1, + ) + .unwrap(); assert!(wallet_refreshed); } diff --git a/api/src/owner_rpc_s.rs b/api/src/owner_rpc_s.rs index 6931eed6..11d56ba0 100644 --- a/api/src/owner_rpc_s.rs +++ b/api/src/owner_rpc_s.rs @@ -22,14 +22,15 @@ use crate::keychain::{Identifier, Keychain}; use crate::libwallet::slate_versions::v2::TransactionV2; use crate::libwallet::{ AcctPathMapping, ErrorKind, InitTxArgs, IssueInvoiceTxArgs, NodeClient, NodeHeightResult, - OutputCommitMapping, Slate, SlateVersion, TxLogEntry, VersionedSlate, WalletInfo, - WalletLCProvider, + OutputCommitMapping, Slate, SlateVersion, StatusMessage, TxLogEntry, VersionedSlate, + WalletInfo, WalletLCProvider, }; use crate::util::secp::key::{PublicKey, SecretKey}; use crate::util::{static_secp_instance, LoggingConfig, ZeroingString}; use crate::{ECDHPubkey, Owner, Token}; use easy_jsonrpc_mw; use rand::thread_rng; +use std::time::Duration; /// Public definition used to generate Owner jsonrpc api. /// Secure version containing wallet lifecycle functions. All calls to this API must be encrypted. @@ -1671,13 +1672,101 @@ pub trait OwnerRpcS { ``` */ fn delete_wallet(&self, name: Option) -> Result<(), ErrorKind>; + + /** + Networked version of [Owner::start_updated](struct.Owner.html#method.start_updater). + ``` + # grin_wallet_api::doctest_helper_json_rpc_owner_assert_response!( + # r#" + { + "jsonrpc": "2.0", + "method": "start_updater", + "params": { + "token": "d202964900000000d302964900000000d402964900000000d502964900000000", + "frequency": 30000 + }, + "id": 1 + } + # "# + # , + # r#" + { + "id": 1, + "jsonrpc": "2.0", + "result": { + "Ok": null + } + } + # "# + # , true, 0, false, false, false); + ``` + */ + + fn start_updater(&self, token: Token, frequency: u32) -> Result<(), ErrorKind>; + + /** + Networked version of [Owner::stop_updater](struct.Owner.html#method.stop_updater). + ``` + # grin_wallet_api::doctest_helper_json_rpc_owner_assert_response!( + # r#" + { + "jsonrpc": "2.0", + "method": "stop_updater", + "params": null, + "id": 1 + } + # "# + # , + # r#" + { + "id": 1, + "jsonrpc": "2.0", + "result": { + "Ok": null + } + } + # "# + # , true, 0, false, false, false); + ``` + */ + fn stop_updater(&self) -> Result<(), ErrorKind>; + + /** + Networked version of [Owner::get_updater_messages](struct.Owner.html#method.get_updater_messages). + ``` + # grin_wallet_api::doctest_helper_json_rpc_owner_assert_response!( + # r#" + { + "jsonrpc": "2.0", + "method": "get_updater_messages", + "params": { + "count": 1 + }, + "id": 1 + } + # "# + # , + # r#" + { + "id": 1, + "jsonrpc": "2.0", + "result": { + "Ok": [] + } + } + # "# + # , true, 0, false, false, false); + ``` + */ + + fn get_updater_messages(&self, count: u32) -> Result, ErrorKind>; } -impl<'a, L, C, K> OwnerRpcS for Owner<'a, L, C, K> +impl OwnerRpcS for Owner where - L: WalletLCProvider<'a, C, K>, - C: NodeClient + 'a, - K: Keychain + 'a, + L: WalletLCProvider<'static, C, K>, + C: NodeClient + 'static, + K: Keychain + 'static, { fn accounts(&self, token: Token) -> Result, ErrorKind> { Owner::accounts(self, (&token.keychain_mask).as_ref()).map_err(|e| e.kind()) @@ -1958,4 +2047,21 @@ where let n = name.as_ref().map(|s| s.as_str()); Owner::delete_wallet(self, n).map_err(|e| e.kind()) } + + fn start_updater(&self, token: Token, frequency: u32) -> Result<(), ErrorKind> { + Owner::start_updater( + self, + (&token.keychain_mask).as_ref(), + Duration::from_millis(frequency as u64), + ) + .map_err(|e| e.kind()) + } + + fn stop_updater(&self) -> Result<(), ErrorKind> { + Owner::stop_updater(self).map_err(|e| e.kind()) + } + + fn get_updater_messages(&self, count: u32) -> Result, ErrorKind> { + Owner::get_updater_messages(self, count as usize).map_err(|e| e.kind()) + } } diff --git a/controller/src/command.rs b/controller/src/command.rs index 9c691bdb..a0ef94f1 100644 --- a/controller/src/command.rs +++ b/controller/src/command.rs @@ -63,15 +63,15 @@ pub struct InitArgs { pub restore: bool, } -pub fn init<'a, L, C, K>( - wallet: Arc>>>, +pub fn init( + wallet: Arc>>>, g_args: &GlobalArgs, args: InitArgs, ) -> Result<(), Error> where - L: WalletLCProvider<'a, C, K>, - C: NodeClient + 'a, - K: keychain::Keychain + 'a, + L: WalletLCProvider<'static, C, K> + 'static, + C: NodeClient + 'static, + K: keychain::Keychain + 'static, { let mut w_lock = wallet.lock(); let p = w_lock.lc_provider()?; @@ -101,14 +101,14 @@ pub struct RecoverArgs { pub passphrase: ZeroingString, } -pub fn recover<'a, L, C, K>( - wallet: Arc>>>, +pub fn recover( + wallet: Arc>>>, args: RecoverArgs, ) -> Result<(), Error> where - L: WalletLCProvider<'a, C, K>, - C: NodeClient + 'a, - K: keychain::Keychain + 'a, + L: WalletLCProvider<'static, C, K> + 'static, + C: NodeClient + 'static, + K: keychain::Keychain + 'static, { let mut w_lock = wallet.lock(); let p = w_lock.lc_provider()?; @@ -127,7 +127,7 @@ pub struct ListenArgs { pub method: String, } -pub fn listen<'a, L, C, K>( +pub fn listen( wallet: Arc>>>, keychain_mask: Arc>>, config: &WalletConfig, @@ -202,15 +202,15 @@ pub struct AccountArgs { pub create: Option, } -pub fn account<'a, L, C, K>( - wallet: Arc>>>, +pub fn account( + wallet: Arc>>>, keychain_mask: Option<&SecretKey>, args: AccountArgs, ) -> Result<(), Error> where - L: WalletLCProvider<'a, C, K>, - C: NodeClient + 'a, - K: keychain::Keychain + 'a, + L: WalletLCProvider<'static, C, K> + 'static, + C: NodeClient + 'static, + K: keychain::Keychain + 'static, { if args.create.is_none() { let res = controller::owner_single_use(wallet, keychain_mask, |api, m| { @@ -256,17 +256,17 @@ pub struct SendArgs { pub target_slate_version: Option, } -pub fn send<'a, L, C, K>( - wallet: Arc>>>, +pub fn send( + wallet: Arc>>>, keychain_mask: Option<&SecretKey>, tor_config: Option, args: SendArgs, dark_scheme: bool, ) -> Result<(), Error> where - L: WalletLCProvider<'a, C, K>, - C: NodeClient + 'a, - K: keychain::Keychain + 'a, + L: WalletLCProvider<'static, C, K> + 'static, + C: NodeClient + 'static, + K: keychain::Keychain + 'static, { controller::owner_single_use(wallet.clone(), keychain_mask, |api, m| { if args.estimate_selection_strategies { @@ -370,16 +370,16 @@ pub struct ReceiveArgs { pub message: Option, } -pub fn receive<'a, L, C, K>( - wallet: Arc>>>, +pub fn receive( + wallet: Arc>>>, keychain_mask: Option<&SecretKey>, g_args: &GlobalArgs, args: ReceiveArgs, ) -> Result<(), Error> where - L: WalletLCProvider<'a, C, K>, - C: NodeClient + 'a, - K: keychain::Keychain + 'a, + L: WalletLCProvider<'static, C, K>, + C: NodeClient + 'static, + K: keychain::Keychain + 'static, { let mut slate = PathToSlate((&args.input).into()).get_tx()?; let km = match keychain_mask.as_ref() { @@ -410,15 +410,15 @@ pub struct FinalizeArgs { pub dest: Option, } -pub fn finalize<'a, L, C, K>( - wallet: Arc>>>, +pub fn finalize( + wallet: Arc>>>, keychain_mask: Option<&SecretKey>, args: FinalizeArgs, ) -> Result<(), Error> where - L: WalletLCProvider<'a, C, K>, - C: NodeClient + 'a, - K: keychain::Keychain + 'a, + L: WalletLCProvider<'static, C, K> + 'static, + C: NodeClient + 'static, + K: keychain::Keychain + 'static, { let mut slate = PathToSlate((&args.input).into()).get_tx()?; @@ -497,15 +497,15 @@ pub struct IssueInvoiceArgs { pub issue_args: IssueInvoiceTxArgs, } -pub fn issue_invoice_tx<'a, L, C, K>( - wallet: Arc>>>, +pub fn issue_invoice_tx( + wallet: Arc>>>, keychain_mask: Option<&SecretKey>, args: IssueInvoiceArgs, ) -> Result<(), Error> where - L: WalletLCProvider<'a, C, K>, - C: NodeClient + 'a, - K: keychain::Keychain + 'a, + L: WalletLCProvider<'static, C, K> + 'static, + C: NodeClient + 'static, + K: keychain::Keychain + 'static, { controller::owner_single_use(wallet.clone(), keychain_mask, |api, m| { let slate = api.issue_invoice_tx(m, args.issue_args)?; @@ -530,17 +530,17 @@ pub struct ProcessInvoiceArgs { } /// Process invoice -pub fn process_invoice<'a, L, C, K>( - wallet: Arc>>>, +pub fn process_invoice( + wallet: Arc>>>, keychain_mask: Option<&SecretKey>, tor_config: Option, args: ProcessInvoiceArgs, dark_scheme: bool, ) -> Result<(), Error> where - L: WalletLCProvider<'a, C, K>, - C: NodeClient + 'a, - K: keychain::Keychain + 'a, + L: WalletLCProvider<'static, C, K> + 'static, + C: NodeClient + 'static, + K: keychain::Keychain + 'static, { let slate = PathToSlate((&args.input).into()).get_tx()?; controller::owner_single_use(wallet.clone(), keychain_mask, |api, m| { @@ -629,17 +629,17 @@ pub struct InfoArgs { pub minimum_confirmations: u64, } -pub fn info<'a, L, C, K>( - wallet: Arc>>>, +pub fn info( + wallet: Arc>>>, keychain_mask: Option<&SecretKey>, g_args: &GlobalArgs, args: InfoArgs, dark_scheme: bool, ) -> Result<(), Error> where - L: WalletLCProvider<'a, C, K>, - C: NodeClient + 'a, - K: keychain::Keychain + 'a, + L: WalletLCProvider<'static, C, K> + 'static, + C: NodeClient + 'static, + K: keychain::Keychain + 'static, { controller::owner_single_use(wallet.clone(), keychain_mask, |api, m| { let (validated, wallet_info) = @@ -650,16 +650,16 @@ where Ok(()) } -pub fn outputs<'a, L, C, K>( - wallet: Arc>>>, +pub fn outputs( + wallet: Arc>>>, keychain_mask: Option<&SecretKey>, g_args: &GlobalArgs, dark_scheme: bool, ) -> Result<(), Error> where - L: WalletLCProvider<'a, C, K>, - C: NodeClient + 'a, - K: keychain::Keychain + 'a, + L: WalletLCProvider<'static, C, K> + 'static, + C: NodeClient + 'static, + K: keychain::Keychain + 'static, { controller::owner_single_use(wallet.clone(), keychain_mask, |api, m| { let res = api.node_height(m)?; @@ -676,17 +676,17 @@ pub struct TxsArgs { pub tx_slate_id: Option, } -pub fn txs<'a, L, C, K>( - wallet: Arc>>>, +pub fn txs( + wallet: Arc>>>, keychain_mask: Option<&SecretKey>, g_args: &GlobalArgs, args: TxsArgs, dark_scheme: bool, ) -> Result<(), Error> where - L: WalletLCProvider<'a, C, K>, - C: NodeClient + 'a, - K: keychain::Keychain + 'a, + L: WalletLCProvider<'static, C, K> + 'static, + C: NodeClient + 'static, + K: keychain::Keychain + 'static, { controller::owner_single_use(wallet.clone(), keychain_mask, |api, m| { let res = api.node_height(m)?; @@ -763,15 +763,15 @@ pub struct RepostArgs { pub fluff: bool, } -pub fn repost<'a, L, C, K>( - wallet: Arc>>>, +pub fn repost( + wallet: Arc>>>, keychain_mask: Option<&SecretKey>, args: RepostArgs, ) -> Result<(), Error> where - L: WalletLCProvider<'a, C, K>, - C: NodeClient + 'a, - K: keychain::Keychain + 'a, + L: WalletLCProvider<'static, C, K> + 'static, + C: NodeClient + 'static, + K: keychain::Keychain + 'static, { controller::owner_single_use(wallet.clone(), keychain_mask, |api, m| { let (_, txs) = api.retrieve_txs(m, true, Some(args.id), None)?; @@ -815,15 +815,15 @@ pub struct CancelArgs { pub tx_id_string: String, } -pub fn cancel<'a, L, C, K>( - wallet: Arc>>>, +pub fn cancel( + wallet: Arc>>>, keychain_mask: Option<&SecretKey>, args: CancelArgs, ) -> Result<(), Error> where - L: WalletLCProvider<'a, C, K>, - C: NodeClient + 'a, - K: keychain::Keychain + 'a, + L: WalletLCProvider<'static, C, K> + 'static, + C: NodeClient + 'static, + K: keychain::Keychain + 'static, { controller::owner_single_use(wallet.clone(), keychain_mask, |api, m| { let result = api.cancel_tx(m, args.tx_id, args.tx_slate_id); @@ -847,15 +847,15 @@ pub struct CheckArgs { pub start_height: Option, } -pub fn scan<'a, L, C, K>( - wallet: Arc>>>, +pub fn scan( + wallet: Arc>>>, keychain_mask: Option<&SecretKey>, args: CheckArgs, ) -> Result<(), Error> where - L: WalletLCProvider<'a, C, K>, - C: NodeClient + 'a, - K: keychain::Keychain + 'a, + L: WalletLCProvider<'static, C, K> + 'static, + C: NodeClient + 'static, + K: keychain::Keychain + 'static, { controller::owner_single_use(wallet.clone(), keychain_mask, |api, m| { warn!("Starting output scan ...",); diff --git a/controller/src/controller.rs b/controller/src/controller.rs index 6619a115..bb23dc95 100644 --- a/controller/src/controller.rs +++ b/controller/src/controller.rs @@ -121,16 +121,16 @@ where /// Instantiate wallet Owner API for a single-use (command line) call /// Return a function containing a loaded API context to call -pub fn owner_single_use<'a, L, F, C, K>( - wallet: Arc>>>, +pub fn owner_single_use( + wallet: Arc>>>, keychain_mask: Option<&SecretKey>, f: F, ) -> Result<(), Error> where - L: WalletLCProvider<'a, C, K>, - F: FnOnce(&mut Owner<'a, L, C, K>, Option<&SecretKey>) -> Result<(), Error>, - C: NodeClient + 'a, - K: Keychain + 'a, + L: WalletLCProvider<'static, C, K> + 'static, + F: FnOnce(&mut Owner, Option<&SecretKey>) -> Result<(), Error>, + C: NodeClient + 'static, + K: Keychain + 'static, { f(&mut Owner::new(wallet), keychain_mask)?; Ok(()) @@ -305,7 +305,7 @@ where fn call_api( &self, req: Request, - api: Owner<'static, L, C, K>, + api: Owner, ) -> Box + Send> { Box::new(parse_body(req).and_then(move |val: serde_json::Value| { let owner_api = &api as &dyn OwnerRpc; @@ -362,6 +362,9 @@ where /// Wallet instance pub wallet: Arc + 'static>>>, + /// Handle to Owner API + owner_api: Arc>, + /// ECDH shared key pub shared_key: Arc>>, @@ -592,8 +595,10 @@ where keychain_mask: Arc>>, running_foreign: bool, ) -> OwnerAPIHandlerV3 { + let owner_api = Arc::new(Owner::new(wallet.clone())); OwnerAPIHandlerV3 { wallet, + owner_api, shared_key: Arc::new(Mutex::new(None)), keychain_mask: keychain_mask, running_foreign, @@ -603,14 +608,14 @@ where fn call_api( &self, req: Request, - api: Owner<'static, L, C, K>, + api: Arc>, ) -> Box + Send> { let key = self.shared_key.clone(); let mask = self.keychain_mask.clone(); let running_foreign = self.running_foreign; Box::new(parse_body(req).and_then(move |val: serde_json::Value| { let mut val = val; - let owner_api_s = &api as &dyn OwnerRpcS; + let owner_api_s = &*api as &dyn OwnerRpcS; let mut is_init_secure_api = OwnerV3Helpers::is_init_secure_api(&val); let mut was_encrypted = false; let mut encrypted_req_id = 0; @@ -671,9 +676,8 @@ where } fn handle_post_request(&self, req: Request) -> WalletResponseFuture { - let api = Owner::new(self.wallet.clone()); Box::new( - self.call_api(req, api) + self.call_api(req, self.owner_api.clone()) .and_then(|resp| ok(json_response_pretty(&resp))), ) } diff --git a/controller/tests/updater_thread.rs b/controller/tests/updater_thread.rs new file mode 100644 index 00000000..bec1921b --- /dev/null +++ b/controller/tests/updater_thread.rs @@ -0,0 +1,122 @@ +// Copyright 2019 The Grin Developers +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Test a wallet repost command +#[macro_use] +extern crate log; +extern crate grin_wallet_api as api; +extern crate grin_wallet_controller as wallet; +extern crate grin_wallet_impls as impls; +extern crate grin_wallet_libwallet as libwallet; + +use crate::libwallet::api_impl::owner_updater::{start_updater_log_thread, StatusMessage}; +use grin_wallet_util::grin_core as core; + +use self::libwallet::{InitTxArgs, Slate}; +use impls::test_framework::{self, LocalWalletClient}; +use impls::{PathToSlate, SlateGetter as _, SlatePutter as _}; +use std::sync::mpsc::channel; +use std::thread; +use std::time::Duration; + +#[macro_use] +mod common; +use common::{clean_output_dir, create_wallet_proxy, setup}; + +/// updater thread test impl +fn updater_thread_test_impl(test_dir: &'static str) -> Result<(), libwallet::Error> { + // Create a new proxy to simulate server and wallet responses + let mut wallet_proxy = create_wallet_proxy(test_dir); + let chain = wallet_proxy.chain.clone(); + + // Create a new wallet test client, and set its queues to communicate with the + // proxy + create_wallet_and_add!( + client1, + wallet1, + mask1_i, + test_dir, + "wallet1", + None, + &mut wallet_proxy, + false + ); + let mask1 = (&mask1_i).as_ref(); + create_wallet_and_add!( + client2, + wallet2, + mask2_i, + test_dir, + "wallet2", + None, + &mut wallet_proxy, + false + ); + let mask2 = (&mask2_i).as_ref(); + + // Set the wallet proxy listener running + thread::spawn(move || { + if let Err(e) = wallet_proxy.run() { + error!("Wallet Proxy error: {}", e); + } + }); + + // few values to keep things shorter + let reward = core::consensus::REWARD; + + // add some accounts + wallet::controller::owner_single_use(wallet1.clone(), mask1, |api, m| { + api.create_account_path(m, "mining")?; + api.create_account_path(m, "listener")?; + Ok(()) + })?; + + // add some accounts + wallet::controller::owner_single_use(wallet2.clone(), mask2, |api, m| { + api.create_account_path(m, "account1")?; + api.create_account_path(m, "account2")?; + Ok(()) + })?; + + // Get some mining done + { + wallet_inst!(wallet1, w); + w.set_parent_key_id_by_name("mining")?; + } + let mut bh = 10u64; + let _ = + test_framework::award_blocks_to_wallet(&chain, wallet1.clone(), mask1, bh as usize, false); + + let owner_api = api::Owner::new(wallet1); + owner_api.start_updater(mask1, Duration::from_secs(5))?; + + // let updater thread run a bit + thread::sleep(Duration::from_secs(10)); + + let messages = owner_api.get_updater_messages(1000)?; + assert_eq!(messages.len(), 34); + + owner_api.stop_updater()?; + thread::sleep(Duration::from_secs(2)); + Ok(()) +} + +#[test] +fn updater_thread() { + let test_dir = "test_output/updater_thread"; + setup(test_dir); + if let Err(e) = updater_thread_test_impl(test_dir) { + panic!("Libwallet Error: {} - {}", e, e.backtrace().unwrap()); + } + clean_output_dir(test_dir); +} diff --git a/impls/src/backends/lmdb.rs b/impls/src/backends/lmdb.rs index 41fd5319..b58ca903 100644 --- a/impls/src/backends/lmdb.rs +++ b/impls/src/backends/lmdb.rs @@ -362,7 +362,7 @@ where .join(filename); let path_buf = Path::new(&path).to_path_buf(); let mut stored_tx = File::create(path_buf)?; - let tx_hex = util::to_hex(ser::ser_vec(tx, ser::ProtocolVersion(1)).unwrap());; + let tx_hex = util::to_hex(ser::ser_vec(tx, ser::ProtocolVersion(1)).unwrap()); stored_tx.write_all(&tx_hex.as_bytes())?; stored_tx.sync_all()?; Ok(()) diff --git a/impls/src/test_framework/mod.rs b/impls/src/test_framework/mod.rs index 6b6a06ba..670bef05 100644 --- a/impls/src/test_framework/mod.rs +++ b/impls/src/test_framework/mod.rs @@ -244,7 +244,7 @@ where K: keychain::Keychain + 'a, { let (wallet_refreshed, wallet_info) = - owner::retrieve_summary_info(wallet, keychain_mask, true, 1)?; + owner::retrieve_summary_info(wallet, keychain_mask, &None, true, 1)?; assert!(wallet_refreshed); Ok(wallet_info) } diff --git a/libwallet/src/api_impl.rs b/libwallet/src/api_impl.rs index f2081c23..a5da6e28 100644 --- a/libwallet/src/api_impl.rs +++ b/libwallet/src/api_impl.rs @@ -23,4 +23,5 @@ pub mod foreign; pub mod owner; +pub mod owner_updater; pub mod types; diff --git a/libwallet/src/api_impl/owner.rs b/libwallet/src/api_impl/owner.rs index 1f5cfb3f..d42c58f5 100644 --- a/libwallet/src/api_impl/owner.rs +++ b/libwallet/src/api_impl/owner.rs @@ -1,4 +1,4 @@ -// Copyright 2019 The Grin Developers +// Copyright 2019 The Grin Develope; // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -23,6 +23,7 @@ use crate::grin_util; use crate::grin_util::secp::key::SecretKey; use crate::grin_util::Mutex; +use crate::api_impl::owner_updater::StatusMessage; use crate::grin_keychain::{Identifier, Keychain}; use crate::internal::{keys, scan, selection, tx, updater}; use crate::slate::Slate; @@ -32,6 +33,7 @@ use crate::{ ScannedBlockInfo, TxLogEntryType, WalletInitStatus, WalletInst, WalletLCProvider, }; use crate::{Error, ErrorKind}; +use std::sync::mpsc::Sender; use std::sync::Arc; const USER_MESSAGE_MAX_LEN: usize = 256; @@ -74,6 +76,7 @@ where pub fn retrieve_outputs<'a, L, C, K>( wallet_inst: Arc>>>, keychain_mask: Option<&SecretKey>, + status_send_channel: &Option>, include_spent: bool, refresh_from_node: bool, tx_id: Option, @@ -85,7 +88,12 @@ where { let mut validated = false; if refresh_from_node { - validated = update_wallet_state(wallet_inst.clone(), keychain_mask, false)?; + validated = update_wallet_state( + wallet_inst.clone(), + keychain_mask, + status_send_channel, + false, + )?; } wallet_lock!(wallet_inst, w); @@ -107,6 +115,7 @@ where pub fn retrieve_txs<'a, L, C, K>( wallet_inst: Arc>>>, keychain_mask: Option<&SecretKey>, + status_send_channel: &Option>, refresh_from_node: bool, tx_id: Option, tx_slate_id: Option, @@ -118,7 +127,12 @@ where { let mut validated = false; if refresh_from_node { - validated = update_wallet_state(wallet_inst.clone(), keychain_mask, false)?; + validated = update_wallet_state( + wallet_inst.clone(), + keychain_mask, + status_send_channel, + false, + )?; } wallet_lock!(wallet_inst, w); @@ -132,6 +146,7 @@ where pub fn retrieve_summary_info<'a, L, C, K>( wallet_inst: Arc>>>, keychain_mask: Option<&SecretKey>, + status_send_channel: &Option>, refresh_from_node: bool, minimum_confirmations: u64, ) -> Result<(bool, WalletInfo), Error> @@ -142,7 +157,12 @@ where { let mut validated = false; if refresh_from_node { - validated = update_wallet_state(wallet_inst.clone(), keychain_mask, false)?; + validated = update_wallet_state( + wallet_inst.clone(), + keychain_mask, + status_send_channel, + false, + )?; } wallet_lock!(wallet_inst, w); @@ -412,6 +432,7 @@ where pub fn cancel_tx<'a, L, C, K>( wallet_inst: Arc>>>, keychain_mask: Option<&SecretKey>, + status_send_channel: &Option>, tx_id: Option, tx_slate_id: Option, ) -> Result<(), Error> @@ -420,7 +441,12 @@ where C: NodeClient + 'a, K: Keychain + 'a, { - if !update_wallet_state(wallet_inst.clone(), keychain_mask, false)? { + if !update_wallet_state( + wallet_inst.clone(), + keychain_mask, + status_send_channel, + false, + )? { return Err(ErrorKind::TransactionCancellationError( "Can't contact running Grin node. Not Cancelling.", ))?; @@ -477,6 +503,7 @@ pub fn scan<'a, L, C, K>( keychain_mask: Option<&SecretKey>, start_height: Option, delete_unconfirmed: bool, + status_send_channel: &Option>, ) -> Result<(), Error> where L: WalletLCProvider<'a, C, K>, @@ -489,8 +516,6 @@ where w.w2n_client().get_chain_tip()? }; - let status_fn: fn(&str) = |m| warn!("{}", m); - let start_height = match start_height { Some(h) => h, None => 1, @@ -502,7 +527,7 @@ where delete_unconfirmed, start_height, tip.0, - status_fn, + status_send_channel, )?; info.hash = tip.1; @@ -535,7 +560,7 @@ where updated_from_node: true, }), Err(_) => { - let outputs = retrieve_outputs(wallet_inst, keychain_mask, true, false, None)?; + let outputs = retrieve_outputs(wallet_inst, keychain_mask, &None, true, false, None)?; let height = match outputs.1.iter().map(|m| m.output.height).max() { Some(height) => height, None => 0, @@ -549,9 +574,10 @@ where } } /// Experimental, wrap the entire definition of how a wallet's state is updated -fn update_wallet_state<'a, L, C, K>( +pub fn update_wallet_state<'a, L, C, K>( wallet_inst: Arc>>>, keychain_mask: Option<&SecretKey>, + status_send_channel: &Option>, update_all: bool, ) -> Result where @@ -569,12 +595,28 @@ where }; // Step 1: Update outputs and transactions purely based on UTXO state + if let Some(ref s) = status_send_channel { + let _ = s.send(StatusMessage::UpdatingOutputs( + "Updating outputs from node".to_owned(), + )); + } let mut result = update_outputs(wallet_inst.clone(), keychain_mask, update_all)?; if !result { + if let Some(ref s) = status_send_channel { + let _ = s.send(StatusMessage::UpdateWarning( + "Updater Thread unable to contact node".to_owned(), + )); + } return Ok(result); } + if let Some(ref s) = status_send_channel { + let _ = s.send(StatusMessage::UpdatingTransactions( + "Updating transactions".to_owned(), + )); + } + // Step 2: Update outstanding transactions with no change outputs by kernel let mut txs = { wallet_lock!(wallet_inst, w); @@ -582,6 +624,11 @@ where }; result = update_txs_via_kernel(wallet_inst.clone(), keychain_mask, &mut txs)?; if !result { + if let Some(ref s) = status_send_channel { + let _ = s.send(StatusMessage::UpdateWarning( + "Updater Thread unable to contact node".to_owned(), + )); + } return Ok(result); } @@ -590,7 +637,14 @@ where // if we can't get the tip, don't continue let tip = match res { Ok(t) => t, - Err(_) => return Ok(false), + Err(_) => { + if let Some(ref s) = status_send_channel { + let _ = s.send(StatusMessage::UpdateWarning( + "Updater Thread unable to contact node".to_owned(), + )); + } + return Ok(false); + } }; // Check if this is a restored wallet that needs a full scan @@ -615,11 +669,12 @@ where let start_index = last_scanned_block.height.saturating_sub(100); - let mut status_fn: fn(&str) = |m| debug!("{}", m); if last_scanned_block.height == 0 { - warn!("This wallet's contents has not been initialized with a full chain scan, performing scan now."); - warn!("This operation may take a while for the first scan, but should be much quicker once the initial scan is done."); - status_fn = |m| warn!("{}", m); + let msg = format!("This wallet's contents has not been initialized with a full chain scan, performing scan now. + This operation may take a while for the first scan, but should be much quicker once the initial scan is done."); + if let Some(ref s) = status_send_channel { + let _ = s.send(StatusMessage::FullScanWarn(msg)); + } } let mut info = scan::scan( @@ -628,7 +683,7 @@ where false, start_index, tip.0, - status_fn, + status_send_channel, )?; info.hash = tip.1; diff --git a/libwallet/src/api_impl/owner_updater.rs b/libwallet/src/api_impl/owner_updater.rs new file mode 100644 index 00000000..78f61c62 --- /dev/null +++ b/libwallet/src/api_impl/owner_updater.rs @@ -0,0 +1,140 @@ +// Copyright 2019 The Grin Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! A threaded persistent Updater that can be controlled by a grin wallet +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::{Receiver, Sender}; +use std::sync::Arc; +use std::thread; +use std::time::Duration; + +use crate::grin_keychain::Keychain; +use crate::grin_util::secp::key::SecretKey; +use crate::grin_util::Mutex; + +use crate::api_impl::owner; +use crate::types::NodeClient; +use crate::Error; +use crate::{WalletInst, WalletLCProvider}; + +const MESSAGE_QUEUE_MAX_LEN: usize = 10_000; + +/// Update status messages which can be returned to listening clients +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum StatusMessage { + /// Wallet is performing a regular update, matching the UTXO set against + /// current wallet outputs + UpdatingOutputs(String), + /// Wallet is updating transactions, potentially retrieving transactions + /// by kernel if needed + UpdatingTransactions(String), + /// Warning that the wallet is about to perform a full UTXO scan + FullScanWarn(String), + /// Status and percentage complete messages returned during the + /// scanning process + Scanning(String, u8), + /// UTXO scanning is complete + ScanningComplete(String), + /// Warning of issues that may have occured during an update + UpdateWarning(String), +} + +/// Helper function that starts a simple log thread for updater messages +pub fn start_updater_log_thread( + rx: Receiver, + queue: Arc>>, +) -> Result<(), Error> { + let _ = thread::Builder::new() + .name("wallet-updater-status".to_string()) + .spawn(move || loop { + while let Ok(m) = rx.try_recv() { + // save to our message queue to be read by other consumers + { + let mut q = queue.lock(); + q.insert(0, m.clone()); + while q.len() > MESSAGE_QUEUE_MAX_LEN { + q.pop(); + } + } + match m { + StatusMessage::UpdatingOutputs(s) => debug!("{}", s), + StatusMessage::UpdatingTransactions(s) => debug!("{}", s), + StatusMessage::FullScanWarn(s) => warn!("{}", s), + StatusMessage::Scanning(s, m) => { + debug!("{}", s); + warn!("Scanning - {}% complete", m); + } + StatusMessage::ScanningComplete(s) => warn!("{}", s), + StatusMessage::UpdateWarning(s) => warn!("{}", s), + } + } + thread::sleep(Duration::from_millis(500)); + })?; + + Ok(()) +} + +/// Handles and launches a background update thread +pub struct Updater<'a, L, C, K> +where + L: WalletLCProvider<'a, C, K>, + C: NodeClient + 'a, + K: Keychain + 'a, +{ + wallet_inst: Arc>>>, + is_running: Arc, +} + +impl<'a, L, C, K> Updater<'a, L, C, K> +where + L: WalletLCProvider<'a, C, K>, + C: NodeClient + 'a, + K: Keychain + 'a, +{ + /// create a new updater + pub fn new( + wallet_inst: Arc>>>, + is_running: Arc, + ) -> Self { + is_running.store(false, Ordering::Relaxed); + Updater { + wallet_inst, + is_running, + } + } + + /// Start the updater at the given frequency + pub fn run( + &self, + frequency: Duration, + keychain_mask: Option, + status_send_channel: &Option>, + ) -> Result<(), Error> { + self.is_running.store(true, Ordering::Relaxed); + loop { + // Business goes here + owner::update_wallet_state( + self.wallet_inst.clone(), + (&keychain_mask).as_ref(), + status_send_channel, + false, + )?; + if !self.is_running.load(Ordering::Relaxed) { + break; + } + thread::sleep(frequency); + } + Ok(()) + } +} diff --git a/libwallet/src/internal/scan.rs b/libwallet/src/internal/scan.rs index 1b17b507..d4fe8e98 100644 --- a/libwallet/src/internal/scan.rs +++ b/libwallet/src/internal/scan.rs @@ -13,6 +13,7 @@ // limitations under the License. //! Functions to restore a wallet's outputs from just the master seed +use crate::api_impl::owner_updater::StatusMessage; use crate::grin_core::consensus::{valid_header_version, WEEK_HEIGHT}; use crate::grin_core::core::HeaderVersion; use crate::grin_core::global; @@ -24,7 +25,9 @@ use crate::grin_util::Mutex; use crate::internal::{keys, updater}; use crate::types::*; use crate::{wallet_lock, Error, OutputCommitMapping}; +use std::cmp; use std::collections::HashMap; +use std::sync::mpsc::Sender; use std::sync::Arc; /// Utility struct for return values from below @@ -60,20 +63,23 @@ struct RestoredTxStats { pub num_outputs: usize, } -fn identify_utxo_outputs<'a, K, F>( +fn identify_utxo_outputs<'a, K>( keychain: &K, outputs: Vec<(pedersen::Commitment, pedersen::RangeProof, bool, u64, u64)>, - status_cb: &F, + status_send_channel: &Option>, + percentage_complete: u8, ) -> Result, Error> where K: Keychain + 'a, - F: Fn(&str), { let mut wallet_outputs: Vec = Vec::new(); - status_cb(&format!( + let msg = format!( "Scanning {} outputs in the current Grin utxo set", outputs.len(), - )); + ); + if let Some(ref s) = status_send_channel { + let _ = s.send(StatusMessage::Scanning(msg, percentage_complete)); + } let legacy_builder = proof::LegacyProofBuilder::new(keychain); let builder = proof::ProofBuilder::new(keychain); @@ -113,13 +119,20 @@ where *height }; - status_cb(&format!( + let msg = format!( "Output found: {:?}, amount: {:?}, key_id: {:?}, mmr_index: {},", commit, amount, key_id, mmr_index, - )); + ); + + if let Some(ref s) = status_send_channel { + let _ = s.send(StatusMessage::Scanning(msg, percentage_complete)); + } if switch != SwitchCommitmentType::Regular { - status_cb(&format!("Unexpected switch commitment type {:?}", switch)) + let msg = format!("Unexpected switch commitment type {:?}", switch); + if let Some(ref s) = status_send_channel { + let _ = s.send(StatusMessage::UpdateWarning(msg)); + } } wallet_outputs.push(OutputResult { @@ -136,17 +149,16 @@ where Ok(wallet_outputs) } -fn collect_chain_outputs<'a, C, K, F>( +fn collect_chain_outputs<'a, C, K>( keychain: &K, client: C, start_index: u64, end_index: Option, - status_cb: &F, + status_send_channel: &Option>, ) -> Result<(Vec, u64), Error> where C: NodeClient + 'a, K: Keychain + 'a, - F: Fn(&str), { let batch_size = 1000; let mut start_index = start_index; @@ -155,17 +167,26 @@ where loop { let (highest_index, last_retrieved_index, outputs) = client.get_outputs_by_pmmr_index(start_index, end_index, batch_size)?; - status_cb(&format!( + let perc_complete = cmp::min( + ((last_retrieved_index as f64 / highest_index as f64) * 100.0) as u8, + 99, + ); + + let msg = format!( "Checking {} outputs, up to index {}. (Highest index: {})", outputs.len(), highest_index, last_retrieved_index, - )); + ); + if let Some(ref s) = status_send_channel { + let _ = s.send(StatusMessage::Scanning(msg, perc_complete)); + } result_vec.append(&mut identify_utxo_outputs( keychain, outputs.clone(), - status_cb, + status_send_channel, + perc_complete as u8, )?); if highest_index <= last_retrieved_index { @@ -195,8 +216,6 @@ where let commit = w.calc_commit_for_cache(keychain_mask, output.value, &output.key_id)?; let mut batch = w.batch(keychain_mask)?; - error!("RESTORING OUTPUT: {:?}", output); - let parent_key_id = output.key_id.parent_path(); if !found_parents.contains_key(&parent_key_id) { found_parents.insert(parent_key_id.clone(), 0); @@ -311,22 +330,23 @@ where /// Check / repair wallet contents by scanning against chain /// assume wallet contents have been freshly updated with contents /// of latest block -pub fn scan<'a, L, C, K, F>( +pub fn scan<'a, L, C, K>( wallet_inst: Arc>>>, keychain_mask: Option<&SecretKey>, delete_unconfirmed: bool, start_height: u64, end_height: u64, - status_cb: F, + status_send_channel: &Option>, ) -> Result where L: WalletLCProvider<'a, C, K>, C: NodeClient + 'a, K: Keychain + 'a, - F: Fn(&str), { // First, get a definitive list of outputs we own from the chain - status_cb("Starting UTXO scan"); + if let Some(ref s) = status_send_channel { + let _ = s.send(StatusMessage::Scanning("Starting UTXO scan".to_owned(), 0)); + } let (client, keychain) = { wallet_lock!(wallet_inst, w); (w.w2n_client().clone(), w.keychain(keychain_mask)?.clone()) @@ -340,12 +360,16 @@ where client, pmmr_range.0, Some(pmmr_range.1), - &status_cb, + status_send_channel, )?; - status_cb(&format!( + let msg = format!( "Identified {} wallet_outputs as belonging to this wallet", chain_outs.len(), - )); + ); + + if let Some(ref s) = status_send_channel { + let _ = s.send(StatusMessage::Scanning(msg, 99)); + } // Now, get all outputs owned by this wallet (regardless of account) let wallet_outputs = { @@ -376,11 +400,14 @@ where // mark problem spent outputs as unspent (confirmed against a short-lived fork, for example) for m in accidental_spend_outs.into_iter() { let mut o = m.0; - status_cb(&format!( + let msg = format!( "Output for {} with ID {} ({:?}) marked as spent but exists in UTXO set. \ Marking unspent and cancelling any associated transaction log entries.", o.value, o.key_id, m.1.commit, - )); + ); + if let Some(ref s) = status_send_channel { + let _ = s.send(StatusMessage::Scanning(msg, 99)); + } o.status = OutputStatus::Unspent; // any transactions associated with this should be cancelled cancel_tx_log_entry(wallet_inst.clone(), keychain_mask, &o)?; @@ -394,11 +421,14 @@ where // Restore missing outputs, adding transaction for it back to the log for m in missing_outs.into_iter() { - status_cb(&format!( + let msg = format!( "Confirmed output for {} with ID {} ({:?}, index {}) exists in UTXO set but not in wallet. \ Restoring.", m.value, m.key_id, m.commit, m.mmr_index - )); + ); + if let Some(ref s) = status_send_channel { + let _ = s.send(StatusMessage::Scanning(msg, 99)); + } restore_missing_output( wallet_inst.clone(), keychain_mask, @@ -412,11 +442,14 @@ where // Unlock locked outputs for m in locked_outs.into_iter() { let mut o = m.0; - status_cb(&format!( + let msg = format!( "Confirmed output for {} with ID {} ({:?}) exists in UTXO set and is locked. \ Unlocking and cancelling associated transaction log entries.", o.value, o.key_id, m.1.commit, - )); + ); + if let Some(ref s) = status_send_channel { + let _ = s.send(StatusMessage::Scanning(msg, 99)); + } o.status = OutputStatus::Unspent; cancel_tx_log_entry(wallet_inst.clone(), keychain_mask, &o)?; wallet_lock!(wallet_inst, w); @@ -432,11 +465,14 @@ where // Delete unconfirmed outputs for m in unconfirmed_outs.into_iter() { let o = m.output.clone(); - status_cb(&format!( + let msg = format!( "Unconfirmed output for {} with ID {} ({:?}) not in UTXO set. \ Deleting and cancelling associated transaction log entries.", o.value, o.key_id, m.commit, - )); + ); + if let Some(ref s) = status_send_channel { + let _ = s.send(StatusMessage::Scanning(msg, 99)); + } cancel_tx_log_entry(wallet_inst.clone(), keychain_mask, &o)?; wallet_lock!(wallet_inst, w); let mut batch = w.batch(keychain_mask)?; @@ -454,7 +490,10 @@ where // Only restore paths that don't exist if !accounts.contains(path) { let label = format!("{}_{}", label_base, acct_index); - status_cb(&format!("Setting account {} at path {}", label, path)); + let msg = format!("Setting account {} at path {}", label, path); + if let Some(ref s) = status_send_channel { + let _ = s.send(StatusMessage::Scanning(msg, 99)); + } keys::set_acct_path(&mut **w, keychain_mask, &label, path)?; acct_index += 1; } @@ -467,6 +506,12 @@ where } } + if let Some(ref s) = status_send_channel { + let _ = s.send(StatusMessage::ScanningComplete( + "Scanning Complete".to_owned(), + )); + } + Ok(ScannedBlockInfo { height: end_height, hash: "".to_owned(), diff --git a/libwallet/src/lib.rs b/libwallet/src/lib.rs index 532ad36a..14b01876 100644 --- a/libwallet/src/lib.rs +++ b/libwallet/src/lib.rs @@ -57,6 +57,7 @@ pub use crate::slate_versions::{ SlateVersion, VersionedCoinbase, VersionedSlate, CURRENT_SLATE_VERSION, GRIN_BLOCK_HEADER_VERSION, }; +pub use api_impl::owner_updater::StatusMessage; pub use api_impl::types::{ BlockFees, InitTxArgs, InitTxSendArgs, IssueInvoiceTxArgs, NodeHeightResult, OutputCommitMapping, SendTXArgs, VersionInfo, diff --git a/src/bin/grin-wallet.yml b/src/bin/grin-wallet.yml index dcd89b5a..085dc9ef 100644 --- a/src/bin/grin-wallet.yml +++ b/src/bin/grin-wallet.yml @@ -104,7 +104,7 @@ subcommands: possible_values: - all - smallest - default_value: all + default_value: smallest takes_value: true - estimate_selection_strategies: help: Estimates all possible Coin/Output selection strategies. diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 73e276d7..33ebb7a4 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -391,6 +391,7 @@ where })?; let res_val: Value = serde_json::from_str(&res).unwrap(); + //println!("RES_VAL: {}", res_val); // encryption error, just return the string if res_val["error"] != json!(null) { return Ok(Err(WalletAPIReturnError { diff --git a/tests/owner_v3_lifecycle.rs b/tests/owner_v3_lifecycle.rs index 5892eb63..f1733a88 100644 --- a/tests/owner_v3_lifecycle.rs +++ b/tests/owner_v3_lifecycle.rs @@ -438,7 +438,67 @@ fn owner_v3_lifecycle() -> Result<(), grin_wallet_controller::Error> { thread::sleep(Duration::from_millis(200)); assert_eq!(res.unwrap().1.amount_awaiting_finalization, 6000000000); - // 21) Delete the wallet (close first) + // 21) Start the automatic updater, let it run for a bit + let req = serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "start_updater", + "params": { + "token": token, + "frequency": 3000, + } + }); + + let res = send_request_enc::( + 1, + 1, + "http://127.0.0.1:43420/v3/owner", + &req.to_string(), + &shared_key, + )?; + assert!(res.is_ok()); + println!("RES 21: {:?}", res); + thread::sleep(Duration::from_millis(5000)); + + // 22) Retrieve some messages about updater status + let req = serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "get_updater_messages", + "params": { + "count": 1000, + } + }); + + let res = send_request_enc::( + 1, + 1, + "http://127.0.0.1:43420/v3/owner", + &req.to_string(), + &shared_key, + )?; + assert!(res.is_ok()); + println!("RES 22: {:?}", res); + + // 23) Stop Updater + let req = serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "stop_updater", + "params": null + }); + + let res = send_request_enc::( + 1, + 1, + "http://127.0.0.1:43420/v3/owner", + &req.to_string(), + &shared_key, + )?; + assert!(res.is_ok()); + println!("RES 23: {:?}", res); + + // 24) Delete the wallet (close first) let req = include_str!("data/v3_reqs/close_wallet.req.json"); let res = send_request_enc::(1, 1, "http://127.0.0.1:43420/v3/owner", &req, &shared_key)?; @@ -447,14 +507,14 @@ fn owner_v3_lifecycle() -> Result<(), grin_wallet_controller::Error> { let req = include_str!("data/v3_reqs/delete_wallet.req.json"); let res = send_request_enc::(1, 1, "http://127.0.0.1:43420/v3/owner", &req, &shared_key)?; - println!("RES 21: {:?}", res); + println!("RES 24: {:?}", res); assert!(res.is_ok()); - // 22) Wallet should be gone + // 25) Wallet should be gone let req = include_str!("data/v3_reqs/open_wallet.req.json"); let res = send_request_enc::(1, 1, "http://127.0.0.1:43420/v3/owner", &req, &shared_key)?; - println!("RES 22: {:?}", res); + println!("RES 25: {:?}", res); assert!(res.is_err()); clean_output_dir(test_dir);