Add updater thread + Updater related functions (#253)

* first attempt at adding updater thread

* rustfmt

* rustfmt

* many lifetimes made static to allow api to spawn separate thread

* add first pass at updater thread functionality

* rustfmt

* add mpsc for returning status from update functions

* rustfmt

* add stop state, ensure update is triggered by wallet functions when wallet updater is running

* rustfmt

* add update status function to owner API, V3 owner api functions

* rustfmt

* change update warning

* adding tests for new updater control functions

* documentation updates

* rustfmt
This commit is contained in:
Yeastplume 2019-11-18 10:49:51 +00:00 committed by GitHub
parent 355f08498c
commit e74c0e2571
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 944 additions and 170 deletions

View file

@ -691,8 +691,13 @@ pub fn run_doctest_foreign(
false,
);
//update local outputs after each block, so transaction IDs stay consistent
let (wallet_refreshed, _) =
api_impl::owner::retrieve_summary_info(wallet1.clone(), (&mask1).as_ref(), true, 1)
let (wallet_refreshed, _) = api_impl::owner::retrieve_summary_info(
wallet1.clone(),
(&mask1).as_ref(),
&None,
true,
1,
)
.unwrap();
assert!(wallet_refreshed);
}

View file

@ -22,7 +22,8 @@ use crate::core::core::Transaction;
use crate::core::global;
use crate::impls::create_sender;
use crate::keychain::{Identifier, Keychain};
use crate::libwallet::api_impl::owner;
use crate::libwallet::api_impl::owner_updater::{start_updater_log_thread, StatusMessage};
use crate::libwallet::api_impl::{owner, owner_updater};
use crate::libwallet::{
AcctPathMapping, Error, ErrorKind, InitTxArgs, IssueInvoiceTxArgs, NodeClient,
NodeHeightResult, OutputCommitMapping, Slate, TxLogEntry, WalletInfo, WalletInst,
@ -30,7 +31,11 @@ use crate::libwallet::{
};
use crate::util::secp::key::SecretKey;
use crate::util::{from_hex, static_secp_instance, LoggingConfig, Mutex, ZeroingString};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Sender};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
/// Main interface into all wallet API functions.
/// Wallet APIs are split into two seperate blocks of functionality
@ -46,23 +51,32 @@ use std::sync::Arc;
/// its operation, then 'close' the wallet (unloading references to the keychain and master
/// seed).
pub struct Owner<'a, L, C, K>
pub struct Owner<L, C, K>
where
L: WalletLCProvider<'a, C, K>,
C: NodeClient + 'a,
K: Keychain + 'a,
L: WalletLCProvider<'static, C, K> + 'static,
C: NodeClient + 'static,
K: Keychain + 'static,
{
/// contain all methods to manage the wallet
pub wallet_inst: Arc<Mutex<Box<dyn WalletInst<'a, L, C, K>>>>,
pub wallet_inst: Arc<Mutex<Box<dyn WalletInst<'static, L, C, K>>>>,
/// Flag to normalize some output during testing. Can mostly be ignored.
pub doctest_mode: bool,
/// Share ECDH key
pub shared_key: Arc<Mutex<Option<SecretKey>>>,
/// Update thread
updater: Arc<Mutex<owner_updater::Updater<'static, L, C, K>>>,
/// Stop state for update thread
pub updater_running: Arc<AtomicBool>,
/// Sender for update messages
status_tx: Mutex<Option<Sender<StatusMessage>>>,
/// Holds all update and status messages returned by the
/// updater process
updater_messages: Arc<Mutex<Vec<StatusMessage>>>,
}
impl<'a, L, C, K> Owner<'a, L, C, K>
impl<L, C, K> Owner<L, C, K>
where
L: WalletLCProvider<'a, C, K>,
L: WalletLCProvider<'static, C, K> + 'static,
C: NodeClient,
K: Keychain,
{
@ -141,11 +155,26 @@ where
///
/// ```
pub fn new(wallet_inst: Arc<Mutex<Box<dyn WalletInst<'a, L, C, K>>>>) -> Self {
pub fn new(wallet_inst: Arc<Mutex<Box<dyn WalletInst<'static, L, C, K>>>>) -> Self {
let (tx, rx) = channel();
let updater_running = Arc::new(AtomicBool::new(false));
let updater = Arc::new(Mutex::new(owner_updater::Updater::new(
wallet_inst.clone(),
updater_running.clone(),
)));
let updater_messages = Arc::new(Mutex::new(vec![]));
let _ = start_updater_log_thread(rx, updater_messages.clone());
Owner {
wallet_inst,
doctest_mode: false,
shared_key: Arc::new(Mutex::new(None)),
updater,
updater_running,
status_tx: Mutex::new(Some(tx)),
updater_messages,
}
}
@ -304,6 +333,8 @@ where
/// provided during wallet instantiation). If `false`, the results will
/// contain output information that may be out-of-date (from the last time
/// the wallet's output set was refreshed against the node).
/// Note this setting is ignored if the updater process is running via a call to
/// [`start_updater`](struct.Owner.html#method.start_updater)
/// * `tx_id` - If `Some(i)`, only return the outputs associated with
/// the transaction log entry of id `i`.
///
@ -342,9 +373,18 @@ where
refresh_from_node: bool,
tx_id: Option<u32>,
) -> Result<(bool, Vec<OutputCommitMapping>), Error> {
let tx = {
let t = self.status_tx.lock();
t.clone()
};
let refresh_from_node = match self.updater_running.load(Ordering::Relaxed) {
true => false,
false => refresh_from_node,
};
owner::retrieve_outputs(
self.wallet_inst.clone(),
keychain_mask,
&tx,
include_spent,
refresh_from_node,
tx_id,
@ -362,6 +402,8 @@ where
/// provided during wallet instantiation). If `false`, the results will
/// contain transaction information that may be out-of-date (from the last time
/// the wallet's output set was refreshed against the node).
/// Note this setting is ignored if the updater process is running via a call to
/// [`start_updater`](struct.Owner.html#method.start_updater)
/// * `tx_id` - If `Some(i)`, only return the transactions associated with
/// the transaction log entry of id `i`.
/// * `tx_slate_id` - If `Some(uuid)`, only return transactions associated with
@ -400,9 +442,18 @@ where
tx_id: Option<u32>,
tx_slate_id: Option<Uuid>,
) -> Result<(bool, Vec<TxLogEntry>), Error> {
let tx = {
let t = self.status_tx.lock();
t.clone()
};
let refresh_from_node = match self.updater_running.load(Ordering::Relaxed) {
true => false,
false => refresh_from_node,
};
let mut res = owner::retrieve_txs(
self.wallet_inst.clone(),
keychain_mask,
&tx,
refresh_from_node,
tx_id,
tx_slate_id,
@ -431,6 +482,8 @@ where
/// provided during wallet instantiation). If `false`, the results will
/// contain transaction information that may be out-of-date (from the last time
/// the wallet's output set was refreshed against the node).
/// Note this setting is ignored if the updater process is running via a call to
/// [`start_updater`](struct.Owner.html#method.start_updater)
/// * `minimum_confirmations` - The minimum number of confirmations an output
/// should have before it's included in the 'amount_currently_spendable' total
///
@ -464,9 +517,18 @@ where
refresh_from_node: bool,
minimum_confirmations: u64,
) -> Result<(bool, WalletInfo), Error> {
let tx = {
let t = self.status_tx.lock();
t.clone()
};
let refresh_from_node = match self.updater_running.load(Ordering::Relaxed) {
true => false,
false => refresh_from_node,
};
owner::retrieve_summary_info(
self.wallet_inst.clone(),
keychain_mask,
&tx,
refresh_from_node,
minimum_confirmations,
)
@ -531,7 +593,7 @@ where
/// minimum_confirmations: 2,
/// max_outputs: 500,
/// num_change_outputs: 1,
/// selection_strategy_is_use_all: true,
/// selection_strategy_is_use_all: false,
/// message: Some("Have some Grins. Love, Yeastplume".to_owned()),
/// ..Default::default()
/// };
@ -680,7 +742,7 @@ where
/// minimum_confirmations: 2,
/// max_outputs: 500,
/// num_change_outputs: 1,
/// selection_strategy_is_use_all: true,
/// selection_strategy_is_use_all: false,
/// ..Default::default()
/// };
///
@ -741,7 +803,7 @@ where
/// minimum_confirmations: 10,
/// max_outputs: 500,
/// num_change_outputs: 1,
/// selection_strategy_is_use_all: true,
/// selection_strategy_is_use_all: false,
/// message: Some("Remember to lock this when we're happy this is sent".to_owned()),
/// ..Default::default()
/// };
@ -805,7 +867,7 @@ where
/// minimum_confirmations: 10,
/// max_outputs: 500,
/// num_change_outputs: 1,
/// selection_strategy_is_use_all: true,
/// selection_strategy_is_use_all: false,
/// message: Some("Finalize this tx now".to_owned()),
/// ..Default::default()
/// };
@ -865,7 +927,7 @@ where
/// minimum_confirmations: 10,
/// max_outputs: 500,
/// num_change_outputs: 1,
/// selection_strategy_is_use_all: true,
/// selection_strategy_is_use_all: false,
/// message: Some("Post this tx".to_owned()),
/// ..Default::default()
/// };
@ -937,7 +999,7 @@ where
/// minimum_confirmations: 10,
/// max_outputs: 500,
/// num_change_outputs: 1,
/// selection_strategy_is_use_all: true,
/// selection_strategy_is_use_all: false,
/// message: Some("Cancel this tx".to_owned()),
/// ..Default::default()
/// };
@ -964,7 +1026,17 @@ where
tx_id: Option<u32>,
tx_slate_id: Option<Uuid>,
) -> Result<(), Error> {
owner::cancel_tx(self.wallet_inst.clone(), keychain_mask, tx_id, tx_slate_id)
let tx = {
let t = self.status_tx.lock();
t.clone()
};
owner::cancel_tx(
self.wallet_inst.clone(),
keychain_mask,
&tx,
tx_id,
tx_slate_id,
)
}
/// Retrieves the stored transaction associated with a TxLogEntry. Can be used even after the
@ -1043,7 +1115,7 @@ where
/// minimum_confirmations: 10,
/// max_outputs: 500,
/// num_change_outputs: 1,
/// selection_strategy_is_use_all: true,
/// selection_strategy_is_use_all: false,
/// message: Some("Just verify messages".to_owned()),
/// ..Default::default()
/// };
@ -1140,6 +1212,7 @@ where
keychain_mask,
start_height,
delete_unconfirmed,
&None,
)
}
@ -1657,6 +1730,162 @@ where
let lc = w_lock.lc_provider()?;
lc.delete_wallet(name)
}
/// Starts a background wallet update thread, which performs the wallet update process
/// automatically at the frequency specified.
///
/// The updater process is as follows:
///
/// * Reconcile the wallet outputs against the node's current UTXO set, confirming
/// transactions if needs be.
/// * Look up transactions by kernel in cases where it's necessary (for instance, when
/// there are no change outputs for a transaction and transaction status can't be
/// inferred from the output state.
/// * Incrementally perform a scan of the UTXO set, correcting outputs and transactions
/// where their local state differs from what's on-chain. The wallet stores the last
/// position scanned, and will scan back 100 blocks worth of UTXOs on each update, to
/// correct any differences due to forks or otherwise.
///
/// Note that an update process can take a long time, particularly when the entire
/// UTXO set is being scanned for correctness. The wallet status can be determined by
/// calling the [`get_updater_messages`](struct.Owner.html#method.get_updater_messages).
///
/// # Arguments
///
/// * `keychain_mask` - Wallet secret mask to XOR against the stored wallet seed before using, if
/// being used.
/// * `frequency`: The frequency at which to call the update process. Note this is
/// time elapsed since the last successful update process. If calling via the JSON-RPC
/// api, this represents milliseconds.
///
/// # Returns
/// * Ok if successful
/// * or [`libwallet::Error`](../grin_wallet_libwallet/struct.Error.html) if an error is encountered.
///
/// # Example
/// Set up as in [`new`](struct.Owner.html#method.new) method above.
/// ```
/// # grin_wallet_api::doctest_helper_setup_doc_env!(wallet, wallet_config);
///
/// use grin_core::global::ChainTypes;
///
/// use std::time::Duration;
///
/// // Set up as above
/// # let api_owner = Owner::new(wallet.clone());
///
/// let res = api_owner.start_updater(None, Duration::from_secs(60));
///
/// if let Ok(_) = res {
/// // ...
/// }
/// ```
pub fn start_updater(
&self,
keychain_mask: Option<&SecretKey>,
frequency: Duration,
) -> Result<(), Error> {
let updater_inner = self.updater.clone();
let tx_inner = {
let t = self.status_tx.lock();
t.clone()
};
let keychain_mask = match keychain_mask {
Some(m) => Some(m.clone()),
None => None,
};
let _ = thread::Builder::new()
.name("wallet-updater".to_string())
.spawn(move || {
let u = updater_inner.lock();
if let Err(e) = u.run(frequency, keychain_mask, &tx_inner) {
error!("Wallet state updater failed with error: {:?}", e);
}
})?;
Ok(())
}
/// Stops the background update thread. If the updater is currently updating, the
/// thread will stop after the next update
///
/// # Arguments
///
/// * None
///
/// # Returns
/// * Ok if successful
/// * or [`libwallet::Error`](../grin_wallet_libwallet/struct.Error.html) if an error is encountered.
///
/// # Example
/// Set up as in [`new`](struct.Owner.html#method.new) method above.
/// ```
/// # grin_wallet_api::doctest_helper_setup_doc_env!(wallet, wallet_config);
///
/// use grin_core::global::ChainTypes;
///
/// use std::time::Duration;
///
/// // Set up as above
/// # let api_owner = Owner::new(wallet.clone());
///
/// let res = api_owner.start_updater(None, Duration::from_secs(60));
///
/// if let Ok(_) = res {
/// // ...
/// }
///
/// let res = api_owner.stop_updater();
/// ```
pub fn stop_updater(&self) -> Result<(), Error> {
self.updater_running.store(false, Ordering::Relaxed);
Ok(())
}
/// Retrieve messages from the updater thread, up to `count` number of messages.
/// The resulting array will be ordered newest messages first. The updater will
/// store a maximum of 10,000 messages, after which it will start removing the oldest
/// messages as newer ones are created.
///
/// Messages retrieved via this method are removed from the internal queue, so calling
/// this function at a specified interval should result in a complete message history.
///
/// # Arguments
///
/// * `count` - The number of messages to retrieve.
///
/// # Returns
/// * Ok with a Vec of [`StatusMessage`](../grin_wallet_libwallet/api_impl/owner_updater/enum.StatusMessage.html)
/// * or [`libwallet::Error`](../grin_wallet_libwallet/struct.Error.html) if an error is encountered.
///
/// # Example
/// Set up as in [`new`](struct.Owner.html#method.new) method above.
/// ```
/// # grin_wallet_api::doctest_helper_setup_doc_env!(wallet, wallet_config);
///
/// use grin_core::global::ChainTypes;
///
/// use std::time::Duration;
///
/// // Set up as above
/// # let api_owner = Owner::new(wallet.clone());
///
/// let res = api_owner.start_updater(None, Duration::from_secs(60));
///
/// let messages = api_owner.get_updater_messages(10000);
///
/// if let Ok(_) = res {
/// // ...
/// }
///
/// ```
pub fn get_updater_messages(&self, count: usize) -> Result<Vec<StatusMessage>, Error> {
let mut q = self.updater_messages.lock();
let index = q.len().saturating_sub(count);
Ok(q.split_off(index))
}
}
#[doc(hidden)]

View file

@ -1223,11 +1223,11 @@ pub trait OwnerRpc: Sync + Send {
fn node_height(&self) -> Result<NodeHeightResult, ErrorKind>;
}
impl<'a, L, C, K> OwnerRpc for Owner<'a, L, C, K>
impl<'a, L, C, K> OwnerRpc for Owner<L, C, K>
where
L: WalletLCProvider<'a, C, K>,
C: NodeClient + 'a,
K: Keychain + 'a,
L: WalletLCProvider<'static, C, K>,
C: NodeClient + 'static,
K: Keychain + 'static,
{
fn accounts(&self) -> Result<Vec<AcctPathMapping>, ErrorKind> {
Owner::accounts(self, None).map_err(|e| e.kind())
@ -1458,8 +1458,13 @@ pub fn run_doctest_owner(
false,
);
//update local outputs after each block, so transaction IDs stay consistent
let (wallet_refreshed, _) =
api_impl::owner::retrieve_summary_info(wallet1.clone(), (&mask1).as_ref(), true, 1)
let (wallet_refreshed, _) = api_impl::owner::retrieve_summary_info(
wallet1.clone(),
(&mask1).as_ref(),
&None,
true,
1,
)
.unwrap();
assert!(wallet_refreshed);
}

View file

@ -22,14 +22,15 @@ use crate::keychain::{Identifier, Keychain};
use crate::libwallet::slate_versions::v2::TransactionV2;
use crate::libwallet::{
AcctPathMapping, ErrorKind, InitTxArgs, IssueInvoiceTxArgs, NodeClient, NodeHeightResult,
OutputCommitMapping, Slate, SlateVersion, TxLogEntry, VersionedSlate, WalletInfo,
WalletLCProvider,
OutputCommitMapping, Slate, SlateVersion, StatusMessage, TxLogEntry, VersionedSlate,
WalletInfo, WalletLCProvider,
};
use crate::util::secp::key::{PublicKey, SecretKey};
use crate::util::{static_secp_instance, LoggingConfig, ZeroingString};
use crate::{ECDHPubkey, Owner, Token};
use easy_jsonrpc_mw;
use rand::thread_rng;
use std::time::Duration;
/// Public definition used to generate Owner jsonrpc api.
/// Secure version containing wallet lifecycle functions. All calls to this API must be encrypted.
@ -1671,13 +1672,101 @@ pub trait OwnerRpcS {
```
*/
fn delete_wallet(&self, name: Option<String>) -> Result<(), ErrorKind>;
/**
Networked version of [Owner::start_updated](struct.Owner.html#method.start_updater).
```
# grin_wallet_api::doctest_helper_json_rpc_owner_assert_response!(
# r#"
{
"jsonrpc": "2.0",
"method": "start_updater",
"params": {
"token": "d202964900000000d302964900000000d402964900000000d502964900000000",
"frequency": 30000
},
"id": 1
}
# "#
# ,
# r#"
{
"id": 1,
"jsonrpc": "2.0",
"result": {
"Ok": null
}
}
# "#
# , true, 0, false, false, false);
```
*/
fn start_updater(&self, token: Token, frequency: u32) -> Result<(), ErrorKind>;
/**
Networked version of [Owner::stop_updater](struct.Owner.html#method.stop_updater).
```
# grin_wallet_api::doctest_helper_json_rpc_owner_assert_response!(
# r#"
{
"jsonrpc": "2.0",
"method": "stop_updater",
"params": null,
"id": 1
}
# "#
# ,
# r#"
{
"id": 1,
"jsonrpc": "2.0",
"result": {
"Ok": null
}
}
# "#
# , true, 0, false, false, false);
```
*/
fn stop_updater(&self) -> Result<(), ErrorKind>;
/**
Networked version of [Owner::get_updater_messages](struct.Owner.html#method.get_updater_messages).
```
# grin_wallet_api::doctest_helper_json_rpc_owner_assert_response!(
# r#"
{
"jsonrpc": "2.0",
"method": "get_updater_messages",
"params": {
"count": 1
},
"id": 1
}
# "#
# ,
# r#"
{
"id": 1,
"jsonrpc": "2.0",
"result": {
"Ok": []
}
}
# "#
# , true, 0, false, false, false);
```
*/
fn get_updater_messages(&self, count: u32) -> Result<Vec<StatusMessage>, ErrorKind>;
}
impl<'a, L, C, K> OwnerRpcS for Owner<'a, L, C, K>
impl<L, C, K> OwnerRpcS for Owner<L, C, K>
where
L: WalletLCProvider<'a, C, K>,
C: NodeClient + 'a,
K: Keychain + 'a,
L: WalletLCProvider<'static, C, K>,
C: NodeClient + 'static,
K: Keychain + 'static,
{
fn accounts(&self, token: Token) -> Result<Vec<AcctPathMapping>, ErrorKind> {
Owner::accounts(self, (&token.keychain_mask).as_ref()).map_err(|e| e.kind())
@ -1958,4 +2047,21 @@ where
let n = name.as_ref().map(|s| s.as_str());
Owner::delete_wallet(self, n).map_err(|e| e.kind())
}
fn start_updater(&self, token: Token, frequency: u32) -> Result<(), ErrorKind> {
Owner::start_updater(
self,
(&token.keychain_mask).as_ref(),
Duration::from_millis(frequency as u64),
)
.map_err(|e| e.kind())
}
fn stop_updater(&self) -> Result<(), ErrorKind> {
Owner::stop_updater(self).map_err(|e| e.kind())
}
fn get_updater_messages(&self, count: u32) -> Result<Vec<StatusMessage>, ErrorKind> {
Owner::get_updater_messages(self, count as usize).map_err(|e| e.kind())
}
}

View file

@ -63,15 +63,15 @@ pub struct InitArgs {
pub restore: bool,
}
pub fn init<'a, L, C, K>(
wallet: Arc<Mutex<Box<dyn WalletInst<'a, L, C, K>>>>,
pub fn init<L, C, K>(
wallet: Arc<Mutex<Box<dyn WalletInst<'static, L, C, K>>>>,
g_args: &GlobalArgs,
args: InitArgs,
) -> Result<(), Error>
where
L: WalletLCProvider<'a, C, K>,
C: NodeClient + 'a,
K: keychain::Keychain + 'a,
L: WalletLCProvider<'static, C, K> + 'static,
C: NodeClient + 'static,
K: keychain::Keychain + 'static,
{
let mut w_lock = wallet.lock();
let p = w_lock.lc_provider()?;
@ -101,14 +101,14 @@ pub struct RecoverArgs {
pub passphrase: ZeroingString,
}
pub fn recover<'a, L, C, K>(
wallet: Arc<Mutex<Box<dyn WalletInst<'a, L, C, K>>>>,
pub fn recover<L, C, K>(
wallet: Arc<Mutex<Box<dyn WalletInst<'static, L, C, K>>>>,
args: RecoverArgs,
) -> Result<(), Error>
where
L: WalletLCProvider<'a, C, K>,
C: NodeClient + 'a,
K: keychain::Keychain + 'a,
L: WalletLCProvider<'static, C, K> + 'static,
C: NodeClient + 'static,
K: keychain::Keychain + 'static,
{
let mut w_lock = wallet.lock();
let p = w_lock.lc_provider()?;
@ -127,7 +127,7 @@ pub struct ListenArgs {
pub method: String,
}
pub fn listen<'a, L, C, K>(
pub fn listen<L, C, K>(
wallet: Arc<Mutex<Box<dyn WalletInst<'static, L, C, K>>>>,
keychain_mask: Arc<Mutex<Option<SecretKey>>>,
config: &WalletConfig,
@ -202,15 +202,15 @@ pub struct AccountArgs {
pub create: Option<String>,
}
pub fn account<'a, L, C, K>(
wallet: Arc<Mutex<Box<dyn WalletInst<'a, L, C, K>>>>,
pub fn account<L, C, K>(
wallet: Arc<Mutex<Box<dyn WalletInst<'static, L, C, K>>>>,
keychain_mask: Option<&SecretKey>,
args: AccountArgs,
) -> Result<(), Error>
where
L: WalletLCProvider<'a, C, K>,
C: NodeClient + 'a,
K: keychain::Keychain + 'a,
L: WalletLCProvider<'static, C, K> + 'static,
C: NodeClient + 'static,
K: keychain::Keychain + 'static,
{
if args.create.is_none() {
let res = controller::owner_single_use(wallet, keychain_mask, |api, m| {
@ -256,17 +256,17 @@ pub struct SendArgs {
pub target_slate_version: Option<u16>,
}
pub fn send<'a, L, C, K>(
wallet: Arc<Mutex<Box<dyn WalletInst<'a, L, C, K>>>>,
pub fn send<L, C, K>(
wallet: Arc<Mutex<Box<dyn WalletInst<'static, L, C, K>>>>,
keychain_mask: Option<&SecretKey>,
tor_config: Option<TorConfig>,
args: SendArgs,
dark_scheme: bool,
) -> Result<(), Error>
where
L: WalletLCProvider<'a, C, K>,
C: NodeClient + 'a,
K: keychain::Keychain + 'a,
L: WalletLCProvider<'static, C, K> + 'static,
C: NodeClient + 'static,
K: keychain::Keychain + 'static,
{
controller::owner_single_use(wallet.clone(), keychain_mask, |api, m| {
if args.estimate_selection_strategies {
@ -370,16 +370,16 @@ pub struct ReceiveArgs {
pub message: Option<String>,
}
pub fn receive<'a, L, C, K>(
wallet: Arc<Mutex<Box<dyn WalletInst<'a, L, C, K>>>>,
pub fn receive<L, C, K>(
wallet: Arc<Mutex<Box<dyn WalletInst<'static, L, C, K>>>>,
keychain_mask: Option<&SecretKey>,
g_args: &GlobalArgs,
args: ReceiveArgs,
) -> Result<(), Error>
where
L: WalletLCProvider<'a, C, K>,
C: NodeClient + 'a,
K: keychain::Keychain + 'a,
L: WalletLCProvider<'static, C, K>,
C: NodeClient + 'static,
K: keychain::Keychain + 'static,
{
let mut slate = PathToSlate((&args.input).into()).get_tx()?;
let km = match keychain_mask.as_ref() {
@ -410,15 +410,15 @@ pub struct FinalizeArgs {
pub dest: Option<String>,
}
pub fn finalize<'a, L, C, K>(
wallet: Arc<Mutex<Box<dyn WalletInst<'a, L, C, K>>>>,
pub fn finalize<L, C, K>(
wallet: Arc<Mutex<Box<dyn WalletInst<'static, L, C, K>>>>,
keychain_mask: Option<&SecretKey>,
args: FinalizeArgs,
) -> Result<(), Error>
where
L: WalletLCProvider<'a, C, K>,
C: NodeClient + 'a,
K: keychain::Keychain + 'a,
L: WalletLCProvider<'static, C, K> + 'static,
C: NodeClient + 'static,
K: keychain::Keychain + 'static,
{
let mut slate = PathToSlate((&args.input).into()).get_tx()?;
@ -497,15 +497,15 @@ pub struct IssueInvoiceArgs {
pub issue_args: IssueInvoiceTxArgs,
}
pub fn issue_invoice_tx<'a, L, C, K>(
wallet: Arc<Mutex<Box<dyn WalletInst<'a, L, C, K>>>>,
pub fn issue_invoice_tx<L, C, K>(
wallet: Arc<Mutex<Box<dyn WalletInst<'static, L, C, K>>>>,
keychain_mask: Option<&SecretKey>,
args: IssueInvoiceArgs,
) -> Result<(), Error>
where
L: WalletLCProvider<'a, C, K>,
C: NodeClient + 'a,
K: keychain::Keychain + 'a,
L: WalletLCProvider<'static, C, K> + 'static,
C: NodeClient + 'static,
K: keychain::Keychain + 'static,
{
controller::owner_single_use(wallet.clone(), keychain_mask, |api, m| {
let slate = api.issue_invoice_tx(m, args.issue_args)?;
@ -530,17 +530,17 @@ pub struct ProcessInvoiceArgs {
}
/// Process invoice
pub fn process_invoice<'a, L, C, K>(
wallet: Arc<Mutex<Box<dyn WalletInst<'a, L, C, K>>>>,
pub fn process_invoice<L, C, K>(
wallet: Arc<Mutex<Box<dyn WalletInst<'static, L, C, K>>>>,
keychain_mask: Option<&SecretKey>,
tor_config: Option<TorConfig>,
args: ProcessInvoiceArgs,
dark_scheme: bool,
) -> Result<(), Error>
where
L: WalletLCProvider<'a, C, K>,
C: NodeClient + 'a,
K: keychain::Keychain + 'a,
L: WalletLCProvider<'static, C, K> + 'static,
C: NodeClient + 'static,
K: keychain::Keychain + 'static,
{
let slate = PathToSlate((&args.input).into()).get_tx()?;
controller::owner_single_use(wallet.clone(), keychain_mask, |api, m| {
@ -629,17 +629,17 @@ pub struct InfoArgs {
pub minimum_confirmations: u64,
}
pub fn info<'a, L, C, K>(
wallet: Arc<Mutex<Box<dyn WalletInst<'a, L, C, K>>>>,
pub fn info<L, C, K>(
wallet: Arc<Mutex<Box<dyn WalletInst<'static, L, C, K>>>>,
keychain_mask: Option<&SecretKey>,
g_args: &GlobalArgs,
args: InfoArgs,
dark_scheme: bool,
) -> Result<(), Error>
where
L: WalletLCProvider<'a, C, K>,
C: NodeClient + 'a,
K: keychain::Keychain + 'a,
L: WalletLCProvider<'static, C, K> + 'static,
C: NodeClient + 'static,
K: keychain::Keychain + 'static,
{
controller::owner_single_use(wallet.clone(), keychain_mask, |api, m| {
let (validated, wallet_info) =
@ -650,16 +650,16 @@ where
Ok(())
}
pub fn outputs<'a, L, C, K>(
wallet: Arc<Mutex<Box<dyn WalletInst<'a, L, C, K>>>>,
pub fn outputs<L, C, K>(
wallet: Arc<Mutex<Box<dyn WalletInst<'static, L, C, K>>>>,
keychain_mask: Option<&SecretKey>,
g_args: &GlobalArgs,
dark_scheme: bool,
) -> Result<(), Error>
where
L: WalletLCProvider<'a, C, K>,
C: NodeClient + 'a,
K: keychain::Keychain + 'a,
L: WalletLCProvider<'static, C, K> + 'static,
C: NodeClient + 'static,
K: keychain::Keychain + 'static,
{
controller::owner_single_use(wallet.clone(), keychain_mask, |api, m| {
let res = api.node_height(m)?;
@ -676,17 +676,17 @@ pub struct TxsArgs {
pub tx_slate_id: Option<Uuid>,
}
pub fn txs<'a, L, C, K>(
wallet: Arc<Mutex<Box<dyn WalletInst<'a, L, C, K>>>>,
pub fn txs<L, C, K>(
wallet: Arc<Mutex<Box<dyn WalletInst<'static, L, C, K>>>>,
keychain_mask: Option<&SecretKey>,
g_args: &GlobalArgs,
args: TxsArgs,
dark_scheme: bool,
) -> Result<(), Error>
where
L: WalletLCProvider<'a, C, K>,
C: NodeClient + 'a,
K: keychain::Keychain + 'a,
L: WalletLCProvider<'static, C, K> + 'static,
C: NodeClient + 'static,
K: keychain::Keychain + 'static,
{
controller::owner_single_use(wallet.clone(), keychain_mask, |api, m| {
let res = api.node_height(m)?;
@ -763,15 +763,15 @@ pub struct RepostArgs {
pub fluff: bool,
}
pub fn repost<'a, L, C, K>(
wallet: Arc<Mutex<Box<dyn WalletInst<'a, L, C, K>>>>,
pub fn repost<L, C, K>(
wallet: Arc<Mutex<Box<dyn WalletInst<'static, L, C, K>>>>,
keychain_mask: Option<&SecretKey>,
args: RepostArgs,
) -> Result<(), Error>
where
L: WalletLCProvider<'a, C, K>,
C: NodeClient + 'a,
K: keychain::Keychain + 'a,
L: WalletLCProvider<'static, C, K> + 'static,
C: NodeClient + 'static,
K: keychain::Keychain + 'static,
{
controller::owner_single_use(wallet.clone(), keychain_mask, |api, m| {
let (_, txs) = api.retrieve_txs(m, true, Some(args.id), None)?;
@ -815,15 +815,15 @@ pub struct CancelArgs {
pub tx_id_string: String,
}
pub fn cancel<'a, L, C, K>(
wallet: Arc<Mutex<Box<dyn WalletInst<'a, L, C, K>>>>,
pub fn cancel<L, C, K>(
wallet: Arc<Mutex<Box<dyn WalletInst<'static, L, C, K>>>>,
keychain_mask: Option<&SecretKey>,
args: CancelArgs,
) -> Result<(), Error>
where
L: WalletLCProvider<'a, C, K>,
C: NodeClient + 'a,
K: keychain::Keychain + 'a,
L: WalletLCProvider<'static, C, K> + 'static,
C: NodeClient + 'static,
K: keychain::Keychain + 'static,
{
controller::owner_single_use(wallet.clone(), keychain_mask, |api, m| {
let result = api.cancel_tx(m, args.tx_id, args.tx_slate_id);
@ -847,15 +847,15 @@ pub struct CheckArgs {
pub start_height: Option<u64>,
}
pub fn scan<'a, L, C, K>(
wallet: Arc<Mutex<Box<dyn WalletInst<'a, L, C, K>>>>,
pub fn scan<L, C, K>(
wallet: Arc<Mutex<Box<dyn WalletInst<'static, L, C, K>>>>,
keychain_mask: Option<&SecretKey>,
args: CheckArgs,
) -> Result<(), Error>
where
L: WalletLCProvider<'a, C, K>,
C: NodeClient + 'a,
K: keychain::Keychain + 'a,
L: WalletLCProvider<'static, C, K> + 'static,
C: NodeClient + 'static,
K: keychain::Keychain + 'static,
{
controller::owner_single_use(wallet.clone(), keychain_mask, |api, m| {
warn!("Starting output scan ...",);

View file

@ -121,16 +121,16 @@ where
/// Instantiate wallet Owner API for a single-use (command line) call
/// Return a function containing a loaded API context to call
pub fn owner_single_use<'a, L, F, C, K>(
wallet: Arc<Mutex<Box<dyn WalletInst<'a, L, C, K>>>>,
pub fn owner_single_use<L, F, C, K>(
wallet: Arc<Mutex<Box<dyn WalletInst<'static, L, C, K>>>>,
keychain_mask: Option<&SecretKey>,
f: F,
) -> Result<(), Error>
where
L: WalletLCProvider<'a, C, K>,
F: FnOnce(&mut Owner<'a, L, C, K>, Option<&SecretKey>) -> Result<(), Error>,
C: NodeClient + 'a,
K: Keychain + 'a,
L: WalletLCProvider<'static, C, K> + 'static,
F: FnOnce(&mut Owner<L, C, K>, Option<&SecretKey>) -> Result<(), Error>,
C: NodeClient + 'static,
K: Keychain + 'static,
{
f(&mut Owner::new(wallet), keychain_mask)?;
Ok(())
@ -305,7 +305,7 @@ where
fn call_api(
&self,
req: Request<Body>,
api: Owner<'static, L, C, K>,
api: Owner<L, C, K>,
) -> Box<dyn Future<Item = serde_json::Value, Error = Error> + Send> {
Box::new(parse_body(req).and_then(move |val: serde_json::Value| {
let owner_api = &api as &dyn OwnerRpc;
@ -362,6 +362,9 @@ where
/// Wallet instance
pub wallet: Arc<Mutex<Box<dyn WalletInst<'static, L, C, K> + 'static>>>,
/// Handle to Owner API
owner_api: Arc<Owner<L, C, K>>,
/// ECDH shared key
pub shared_key: Arc<Mutex<Option<SecretKey>>>,
@ -592,8 +595,10 @@ where
keychain_mask: Arc<Mutex<Option<SecretKey>>>,
running_foreign: bool,
) -> OwnerAPIHandlerV3<L, C, K> {
let owner_api = Arc::new(Owner::new(wallet.clone()));
OwnerAPIHandlerV3 {
wallet,
owner_api,
shared_key: Arc::new(Mutex::new(None)),
keychain_mask: keychain_mask,
running_foreign,
@ -603,14 +608,14 @@ where
fn call_api(
&self,
req: Request<Body>,
api: Owner<'static, L, C, K>,
api: Arc<Owner<L, C, K>>,
) -> Box<dyn Future<Item = serde_json::Value, Error = Error> + Send> {
let key = self.shared_key.clone();
let mask = self.keychain_mask.clone();
let running_foreign = self.running_foreign;
Box::new(parse_body(req).and_then(move |val: serde_json::Value| {
let mut val = val;
let owner_api_s = &api as &dyn OwnerRpcS;
let owner_api_s = &*api as &dyn OwnerRpcS;
let mut is_init_secure_api = OwnerV3Helpers::is_init_secure_api(&val);
let mut was_encrypted = false;
let mut encrypted_req_id = 0;
@ -671,9 +676,8 @@ where
}
fn handle_post_request(&self, req: Request<Body>) -> WalletResponseFuture {
let api = Owner::new(self.wallet.clone());
Box::new(
self.call_api(req, api)
self.call_api(req, self.owner_api.clone())
.and_then(|resp| ok(json_response_pretty(&resp))),
)
}

View file

@ -0,0 +1,122 @@
// Copyright 2019 The Grin Developers
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Test a wallet repost command
#[macro_use]
extern crate log;
extern crate grin_wallet_api as api;
extern crate grin_wallet_controller as wallet;
extern crate grin_wallet_impls as impls;
extern crate grin_wallet_libwallet as libwallet;
use crate::libwallet::api_impl::owner_updater::{start_updater_log_thread, StatusMessage};
use grin_wallet_util::grin_core as core;
use self::libwallet::{InitTxArgs, Slate};
use impls::test_framework::{self, LocalWalletClient};
use impls::{PathToSlate, SlateGetter as _, SlatePutter as _};
use std::sync::mpsc::channel;
use std::thread;
use std::time::Duration;
#[macro_use]
mod common;
use common::{clean_output_dir, create_wallet_proxy, setup};
/// updater thread test impl
fn updater_thread_test_impl(test_dir: &'static str) -> Result<(), libwallet::Error> {
// Create a new proxy to simulate server and wallet responses
let mut wallet_proxy = create_wallet_proxy(test_dir);
let chain = wallet_proxy.chain.clone();
// Create a new wallet test client, and set its queues to communicate with the
// proxy
create_wallet_and_add!(
client1,
wallet1,
mask1_i,
test_dir,
"wallet1",
None,
&mut wallet_proxy,
false
);
let mask1 = (&mask1_i).as_ref();
create_wallet_and_add!(
client2,
wallet2,
mask2_i,
test_dir,
"wallet2",
None,
&mut wallet_proxy,
false
);
let mask2 = (&mask2_i).as_ref();
// Set the wallet proxy listener running
thread::spawn(move || {
if let Err(e) = wallet_proxy.run() {
error!("Wallet Proxy error: {}", e);
}
});
// few values to keep things shorter
let reward = core::consensus::REWARD;
// add some accounts
wallet::controller::owner_single_use(wallet1.clone(), mask1, |api, m| {
api.create_account_path(m, "mining")?;
api.create_account_path(m, "listener")?;
Ok(())
})?;
// add some accounts
wallet::controller::owner_single_use(wallet2.clone(), mask2, |api, m| {
api.create_account_path(m, "account1")?;
api.create_account_path(m, "account2")?;
Ok(())
})?;
// Get some mining done
{
wallet_inst!(wallet1, w);
w.set_parent_key_id_by_name("mining")?;
}
let mut bh = 10u64;
let _ =
test_framework::award_blocks_to_wallet(&chain, wallet1.clone(), mask1, bh as usize, false);
let owner_api = api::Owner::new(wallet1);
owner_api.start_updater(mask1, Duration::from_secs(5))?;
// let updater thread run a bit
thread::sleep(Duration::from_secs(10));
let messages = owner_api.get_updater_messages(1000)?;
assert_eq!(messages.len(), 34);
owner_api.stop_updater()?;
thread::sleep(Duration::from_secs(2));
Ok(())
}
#[test]
fn updater_thread() {
let test_dir = "test_output/updater_thread";
setup(test_dir);
if let Err(e) = updater_thread_test_impl(test_dir) {
panic!("Libwallet Error: {} - {}", e, e.backtrace().unwrap());
}
clean_output_dir(test_dir);
}

View file

@ -362,7 +362,7 @@ where
.join(filename);
let path_buf = Path::new(&path).to_path_buf();
let mut stored_tx = File::create(path_buf)?;
let tx_hex = util::to_hex(ser::ser_vec(tx, ser::ProtocolVersion(1)).unwrap());;
let tx_hex = util::to_hex(ser::ser_vec(tx, ser::ProtocolVersion(1)).unwrap());
stored_tx.write_all(&tx_hex.as_bytes())?;
stored_tx.sync_all()?;
Ok(())

View file

@ -244,7 +244,7 @@ where
K: keychain::Keychain + 'a,
{
let (wallet_refreshed, wallet_info) =
owner::retrieve_summary_info(wallet, keychain_mask, true, 1)?;
owner::retrieve_summary_info(wallet, keychain_mask, &None, true, 1)?;
assert!(wallet_refreshed);
Ok(wallet_info)
}

View file

@ -23,4 +23,5 @@
pub mod foreign;
pub mod owner;
pub mod owner_updater;
pub mod types;

View file

@ -1,4 +1,4 @@
// Copyright 2019 The Grin Developers
// Copyright 2019 The Grin Develope;
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -23,6 +23,7 @@ use crate::grin_util;
use crate::grin_util::secp::key::SecretKey;
use crate::grin_util::Mutex;
use crate::api_impl::owner_updater::StatusMessage;
use crate::grin_keychain::{Identifier, Keychain};
use crate::internal::{keys, scan, selection, tx, updater};
use crate::slate::Slate;
@ -32,6 +33,7 @@ use crate::{
ScannedBlockInfo, TxLogEntryType, WalletInitStatus, WalletInst, WalletLCProvider,
};
use crate::{Error, ErrorKind};
use std::sync::mpsc::Sender;
use std::sync::Arc;
const USER_MESSAGE_MAX_LEN: usize = 256;
@ -74,6 +76,7 @@ where
pub fn retrieve_outputs<'a, L, C, K>(
wallet_inst: Arc<Mutex<Box<dyn WalletInst<'a, L, C, K>>>>,
keychain_mask: Option<&SecretKey>,
status_send_channel: &Option<Sender<StatusMessage>>,
include_spent: bool,
refresh_from_node: bool,
tx_id: Option<u32>,
@ -85,7 +88,12 @@ where
{
let mut validated = false;
if refresh_from_node {
validated = update_wallet_state(wallet_inst.clone(), keychain_mask, false)?;
validated = update_wallet_state(
wallet_inst.clone(),
keychain_mask,
status_send_channel,
false,
)?;
}
wallet_lock!(wallet_inst, w);
@ -107,6 +115,7 @@ where
pub fn retrieve_txs<'a, L, C, K>(
wallet_inst: Arc<Mutex<Box<dyn WalletInst<'a, L, C, K>>>>,
keychain_mask: Option<&SecretKey>,
status_send_channel: &Option<Sender<StatusMessage>>,
refresh_from_node: bool,
tx_id: Option<u32>,
tx_slate_id: Option<Uuid>,
@ -118,7 +127,12 @@ where
{
let mut validated = false;
if refresh_from_node {
validated = update_wallet_state(wallet_inst.clone(), keychain_mask, false)?;
validated = update_wallet_state(
wallet_inst.clone(),
keychain_mask,
status_send_channel,
false,
)?;
}
wallet_lock!(wallet_inst, w);
@ -132,6 +146,7 @@ where
pub fn retrieve_summary_info<'a, L, C, K>(
wallet_inst: Arc<Mutex<Box<dyn WalletInst<'a, L, C, K>>>>,
keychain_mask: Option<&SecretKey>,
status_send_channel: &Option<Sender<StatusMessage>>,
refresh_from_node: bool,
minimum_confirmations: u64,
) -> Result<(bool, WalletInfo), Error>
@ -142,7 +157,12 @@ where
{
let mut validated = false;
if refresh_from_node {
validated = update_wallet_state(wallet_inst.clone(), keychain_mask, false)?;
validated = update_wallet_state(
wallet_inst.clone(),
keychain_mask,
status_send_channel,
false,
)?;
}
wallet_lock!(wallet_inst, w);
@ -412,6 +432,7 @@ where
pub fn cancel_tx<'a, L, C, K>(
wallet_inst: Arc<Mutex<Box<dyn WalletInst<'a, L, C, K>>>>,
keychain_mask: Option<&SecretKey>,
status_send_channel: &Option<Sender<StatusMessage>>,
tx_id: Option<u32>,
tx_slate_id: Option<Uuid>,
) -> Result<(), Error>
@ -420,7 +441,12 @@ where
C: NodeClient + 'a,
K: Keychain + 'a,
{
if !update_wallet_state(wallet_inst.clone(), keychain_mask, false)? {
if !update_wallet_state(
wallet_inst.clone(),
keychain_mask,
status_send_channel,
false,
)? {
return Err(ErrorKind::TransactionCancellationError(
"Can't contact running Grin node. Not Cancelling.",
))?;
@ -477,6 +503,7 @@ pub fn scan<'a, L, C, K>(
keychain_mask: Option<&SecretKey>,
start_height: Option<u64>,
delete_unconfirmed: bool,
status_send_channel: &Option<Sender<StatusMessage>>,
) -> Result<(), Error>
where
L: WalletLCProvider<'a, C, K>,
@ -489,8 +516,6 @@ where
w.w2n_client().get_chain_tip()?
};
let status_fn: fn(&str) = |m| warn!("{}", m);
let start_height = match start_height {
Some(h) => h,
None => 1,
@ -502,7 +527,7 @@ where
delete_unconfirmed,
start_height,
tip.0,
status_fn,
status_send_channel,
)?;
info.hash = tip.1;
@ -535,7 +560,7 @@ where
updated_from_node: true,
}),
Err(_) => {
let outputs = retrieve_outputs(wallet_inst, keychain_mask, true, false, None)?;
let outputs = retrieve_outputs(wallet_inst, keychain_mask, &None, true, false, None)?;
let height = match outputs.1.iter().map(|m| m.output.height).max() {
Some(height) => height,
None => 0,
@ -549,9 +574,10 @@ where
}
}
/// Experimental, wrap the entire definition of how a wallet's state is updated
fn update_wallet_state<'a, L, C, K>(
pub fn update_wallet_state<'a, L, C, K>(
wallet_inst: Arc<Mutex<Box<dyn WalletInst<'a, L, C, K>>>>,
keychain_mask: Option<&SecretKey>,
status_send_channel: &Option<Sender<StatusMessage>>,
update_all: bool,
) -> Result<bool, Error>
where
@ -569,12 +595,28 @@ where
};
// Step 1: Update outputs and transactions purely based on UTXO state
if let Some(ref s) = status_send_channel {
let _ = s.send(StatusMessage::UpdatingOutputs(
"Updating outputs from node".to_owned(),
));
}
let mut result = update_outputs(wallet_inst.clone(), keychain_mask, update_all)?;
if !result {
if let Some(ref s) = status_send_channel {
let _ = s.send(StatusMessage::UpdateWarning(
"Updater Thread unable to contact node".to_owned(),
));
}
return Ok(result);
}
if let Some(ref s) = status_send_channel {
let _ = s.send(StatusMessage::UpdatingTransactions(
"Updating transactions".to_owned(),
));
}
// Step 2: Update outstanding transactions with no change outputs by kernel
let mut txs = {
wallet_lock!(wallet_inst, w);
@ -582,6 +624,11 @@ where
};
result = update_txs_via_kernel(wallet_inst.clone(), keychain_mask, &mut txs)?;
if !result {
if let Some(ref s) = status_send_channel {
let _ = s.send(StatusMessage::UpdateWarning(
"Updater Thread unable to contact node".to_owned(),
));
}
return Ok(result);
}
@ -590,7 +637,14 @@ where
// if we can't get the tip, don't continue
let tip = match res {
Ok(t) => t,
Err(_) => return Ok(false),
Err(_) => {
if let Some(ref s) = status_send_channel {
let _ = s.send(StatusMessage::UpdateWarning(
"Updater Thread unable to contact node".to_owned(),
));
}
return Ok(false);
}
};
// Check if this is a restored wallet that needs a full scan
@ -615,11 +669,12 @@ where
let start_index = last_scanned_block.height.saturating_sub(100);
let mut status_fn: fn(&str) = |m| debug!("{}", m);
if last_scanned_block.height == 0 {
warn!("This wallet's contents has not been initialized with a full chain scan, performing scan now.");
warn!("This operation may take a while for the first scan, but should be much quicker once the initial scan is done.");
status_fn = |m| warn!("{}", m);
let msg = format!("This wallet's contents has not been initialized with a full chain scan, performing scan now.
This operation may take a while for the first scan, but should be much quicker once the initial scan is done.");
if let Some(ref s) = status_send_channel {
let _ = s.send(StatusMessage::FullScanWarn(msg));
}
}
let mut info = scan::scan(
@ -628,7 +683,7 @@ where
false,
start_index,
tip.0,
status_fn,
status_send_channel,
)?;
info.hash = tip.1;

View file

@ -0,0 +1,140 @@
// Copyright 2019 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! A threaded persistent Updater that can be controlled by a grin wallet
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{Receiver, Sender};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use crate::grin_keychain::Keychain;
use crate::grin_util::secp::key::SecretKey;
use crate::grin_util::Mutex;
use crate::api_impl::owner;
use crate::types::NodeClient;
use crate::Error;
use crate::{WalletInst, WalletLCProvider};
const MESSAGE_QUEUE_MAX_LEN: usize = 10_000;
/// Update status messages which can be returned to listening clients
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum StatusMessage {
/// Wallet is performing a regular update, matching the UTXO set against
/// current wallet outputs
UpdatingOutputs(String),
/// Wallet is updating transactions, potentially retrieving transactions
/// by kernel if needed
UpdatingTransactions(String),
/// Warning that the wallet is about to perform a full UTXO scan
FullScanWarn(String),
/// Status and percentage complete messages returned during the
/// scanning process
Scanning(String, u8),
/// UTXO scanning is complete
ScanningComplete(String),
/// Warning of issues that may have occured during an update
UpdateWarning(String),
}
/// Helper function that starts a simple log thread for updater messages
pub fn start_updater_log_thread(
rx: Receiver<StatusMessage>,
queue: Arc<Mutex<Vec<StatusMessage>>>,
) -> Result<(), Error> {
let _ = thread::Builder::new()
.name("wallet-updater-status".to_string())
.spawn(move || loop {
while let Ok(m) = rx.try_recv() {
// save to our message queue to be read by other consumers
{
let mut q = queue.lock();
q.insert(0, m.clone());
while q.len() > MESSAGE_QUEUE_MAX_LEN {
q.pop();
}
}
match m {
StatusMessage::UpdatingOutputs(s) => debug!("{}", s),
StatusMessage::UpdatingTransactions(s) => debug!("{}", s),
StatusMessage::FullScanWarn(s) => warn!("{}", s),
StatusMessage::Scanning(s, m) => {
debug!("{}", s);
warn!("Scanning - {}% complete", m);
}
StatusMessage::ScanningComplete(s) => warn!("{}", s),
StatusMessage::UpdateWarning(s) => warn!("{}", s),
}
}
thread::sleep(Duration::from_millis(500));
})?;
Ok(())
}
/// Handles and launches a background update thread
pub struct Updater<'a, L, C, K>
where
L: WalletLCProvider<'a, C, K>,
C: NodeClient + 'a,
K: Keychain + 'a,
{
wallet_inst: Arc<Mutex<Box<dyn WalletInst<'a, L, C, K>>>>,
is_running: Arc<AtomicBool>,
}
impl<'a, L, C, K> Updater<'a, L, C, K>
where
L: WalletLCProvider<'a, C, K>,
C: NodeClient + 'a,
K: Keychain + 'a,
{
/// create a new updater
pub fn new(
wallet_inst: Arc<Mutex<Box<dyn WalletInst<'a, L, C, K>>>>,
is_running: Arc<AtomicBool>,
) -> Self {
is_running.store(false, Ordering::Relaxed);
Updater {
wallet_inst,
is_running,
}
}
/// Start the updater at the given frequency
pub fn run(
&self,
frequency: Duration,
keychain_mask: Option<SecretKey>,
status_send_channel: &Option<Sender<StatusMessage>>,
) -> Result<(), Error> {
self.is_running.store(true, Ordering::Relaxed);
loop {
// Business goes here
owner::update_wallet_state(
self.wallet_inst.clone(),
(&keychain_mask).as_ref(),
status_send_channel,
false,
)?;
if !self.is_running.load(Ordering::Relaxed) {
break;
}
thread::sleep(frequency);
}
Ok(())
}
}

View file

@ -13,6 +13,7 @@
// limitations under the License.
//! Functions to restore a wallet's outputs from just the master seed
use crate::api_impl::owner_updater::StatusMessage;
use crate::grin_core::consensus::{valid_header_version, WEEK_HEIGHT};
use crate::grin_core::core::HeaderVersion;
use crate::grin_core::global;
@ -24,7 +25,9 @@ use crate::grin_util::Mutex;
use crate::internal::{keys, updater};
use crate::types::*;
use crate::{wallet_lock, Error, OutputCommitMapping};
use std::cmp;
use std::collections::HashMap;
use std::sync::mpsc::Sender;
use std::sync::Arc;
/// Utility struct for return values from below
@ -60,20 +63,23 @@ struct RestoredTxStats {
pub num_outputs: usize,
}
fn identify_utxo_outputs<'a, K, F>(
fn identify_utxo_outputs<'a, K>(
keychain: &K,
outputs: Vec<(pedersen::Commitment, pedersen::RangeProof, bool, u64, u64)>,
status_cb: &F,
status_send_channel: &Option<Sender<StatusMessage>>,
percentage_complete: u8,
) -> Result<Vec<OutputResult>, Error>
where
K: Keychain + 'a,
F: Fn(&str),
{
let mut wallet_outputs: Vec<OutputResult> = Vec::new();
status_cb(&format!(
let msg = format!(
"Scanning {} outputs in the current Grin utxo set",
outputs.len(),
));
);
if let Some(ref s) = status_send_channel {
let _ = s.send(StatusMessage::Scanning(msg, percentage_complete));
}
let legacy_builder = proof::LegacyProofBuilder::new(keychain);
let builder = proof::ProofBuilder::new(keychain);
@ -113,13 +119,20 @@ where
*height
};
status_cb(&format!(
let msg = format!(
"Output found: {:?}, amount: {:?}, key_id: {:?}, mmr_index: {},",
commit, amount, key_id, mmr_index,
));
);
if let Some(ref s) = status_send_channel {
let _ = s.send(StatusMessage::Scanning(msg, percentage_complete));
}
if switch != SwitchCommitmentType::Regular {
status_cb(&format!("Unexpected switch commitment type {:?}", switch))
let msg = format!("Unexpected switch commitment type {:?}", switch);
if let Some(ref s) = status_send_channel {
let _ = s.send(StatusMessage::UpdateWarning(msg));
}
}
wallet_outputs.push(OutputResult {
@ -136,17 +149,16 @@ where
Ok(wallet_outputs)
}
fn collect_chain_outputs<'a, C, K, F>(
fn collect_chain_outputs<'a, C, K>(
keychain: &K,
client: C,
start_index: u64,
end_index: Option<u64>,
status_cb: &F,
status_send_channel: &Option<Sender<StatusMessage>>,
) -> Result<(Vec<OutputResult>, u64), Error>
where
C: NodeClient + 'a,
K: Keychain + 'a,
F: Fn(&str),
{
let batch_size = 1000;
let mut start_index = start_index;
@ -155,17 +167,26 @@ where
loop {
let (highest_index, last_retrieved_index, outputs) =
client.get_outputs_by_pmmr_index(start_index, end_index, batch_size)?;
status_cb(&format!(
let perc_complete = cmp::min(
((last_retrieved_index as f64 / highest_index as f64) * 100.0) as u8,
99,
);
let msg = format!(
"Checking {} outputs, up to index {}. (Highest index: {})",
outputs.len(),
highest_index,
last_retrieved_index,
));
);
if let Some(ref s) = status_send_channel {
let _ = s.send(StatusMessage::Scanning(msg, perc_complete));
}
result_vec.append(&mut identify_utxo_outputs(
keychain,
outputs.clone(),
status_cb,
status_send_channel,
perc_complete as u8,
)?);
if highest_index <= last_retrieved_index {
@ -195,8 +216,6 @@ where
let commit = w.calc_commit_for_cache(keychain_mask, output.value, &output.key_id)?;
let mut batch = w.batch(keychain_mask)?;
error!("RESTORING OUTPUT: {:?}", output);
let parent_key_id = output.key_id.parent_path();
if !found_parents.contains_key(&parent_key_id) {
found_parents.insert(parent_key_id.clone(), 0);
@ -311,22 +330,23 @@ where
/// Check / repair wallet contents by scanning against chain
/// assume wallet contents have been freshly updated with contents
/// of latest block
pub fn scan<'a, L, C, K, F>(
pub fn scan<'a, L, C, K>(
wallet_inst: Arc<Mutex<Box<dyn WalletInst<'a, L, C, K>>>>,
keychain_mask: Option<&SecretKey>,
delete_unconfirmed: bool,
start_height: u64,
end_height: u64,
status_cb: F,
status_send_channel: &Option<Sender<StatusMessage>>,
) -> Result<ScannedBlockInfo, Error>
where
L: WalletLCProvider<'a, C, K>,
C: NodeClient + 'a,
K: Keychain + 'a,
F: Fn(&str),
{
// First, get a definitive list of outputs we own from the chain
status_cb("Starting UTXO scan");
if let Some(ref s) = status_send_channel {
let _ = s.send(StatusMessage::Scanning("Starting UTXO scan".to_owned(), 0));
}
let (client, keychain) = {
wallet_lock!(wallet_inst, w);
(w.w2n_client().clone(), w.keychain(keychain_mask)?.clone())
@ -340,12 +360,16 @@ where
client,
pmmr_range.0,
Some(pmmr_range.1),
&status_cb,
status_send_channel,
)?;
status_cb(&format!(
let msg = format!(
"Identified {} wallet_outputs as belonging to this wallet",
chain_outs.len(),
));
);
if let Some(ref s) = status_send_channel {
let _ = s.send(StatusMessage::Scanning(msg, 99));
}
// Now, get all outputs owned by this wallet (regardless of account)
let wallet_outputs = {
@ -376,11 +400,14 @@ where
// mark problem spent outputs as unspent (confirmed against a short-lived fork, for example)
for m in accidental_spend_outs.into_iter() {
let mut o = m.0;
status_cb(&format!(
let msg = format!(
"Output for {} with ID {} ({:?}) marked as spent but exists in UTXO set. \
Marking unspent and cancelling any associated transaction log entries.",
o.value, o.key_id, m.1.commit,
));
);
if let Some(ref s) = status_send_channel {
let _ = s.send(StatusMessage::Scanning(msg, 99));
}
o.status = OutputStatus::Unspent;
// any transactions associated with this should be cancelled
cancel_tx_log_entry(wallet_inst.clone(), keychain_mask, &o)?;
@ -394,11 +421,14 @@ where
// Restore missing outputs, adding transaction for it back to the log
for m in missing_outs.into_iter() {
status_cb(&format!(
let msg = format!(
"Confirmed output for {} with ID {} ({:?}, index {}) exists in UTXO set but not in wallet. \
Restoring.",
m.value, m.key_id, m.commit, m.mmr_index
));
);
if let Some(ref s) = status_send_channel {
let _ = s.send(StatusMessage::Scanning(msg, 99));
}
restore_missing_output(
wallet_inst.clone(),
keychain_mask,
@ -412,11 +442,14 @@ where
// Unlock locked outputs
for m in locked_outs.into_iter() {
let mut o = m.0;
status_cb(&format!(
let msg = format!(
"Confirmed output for {} with ID {} ({:?}) exists in UTXO set and is locked. \
Unlocking and cancelling associated transaction log entries.",
o.value, o.key_id, m.1.commit,
));
);
if let Some(ref s) = status_send_channel {
let _ = s.send(StatusMessage::Scanning(msg, 99));
}
o.status = OutputStatus::Unspent;
cancel_tx_log_entry(wallet_inst.clone(), keychain_mask, &o)?;
wallet_lock!(wallet_inst, w);
@ -432,11 +465,14 @@ where
// Delete unconfirmed outputs
for m in unconfirmed_outs.into_iter() {
let o = m.output.clone();
status_cb(&format!(
let msg = format!(
"Unconfirmed output for {} with ID {} ({:?}) not in UTXO set. \
Deleting and cancelling associated transaction log entries.",
o.value, o.key_id, m.commit,
));
);
if let Some(ref s) = status_send_channel {
let _ = s.send(StatusMessage::Scanning(msg, 99));
}
cancel_tx_log_entry(wallet_inst.clone(), keychain_mask, &o)?;
wallet_lock!(wallet_inst, w);
let mut batch = w.batch(keychain_mask)?;
@ -454,7 +490,10 @@ where
// Only restore paths that don't exist
if !accounts.contains(path) {
let label = format!("{}_{}", label_base, acct_index);
status_cb(&format!("Setting account {} at path {}", label, path));
let msg = format!("Setting account {} at path {}", label, path);
if let Some(ref s) = status_send_channel {
let _ = s.send(StatusMessage::Scanning(msg, 99));
}
keys::set_acct_path(&mut **w, keychain_mask, &label, path)?;
acct_index += 1;
}
@ -467,6 +506,12 @@ where
}
}
if let Some(ref s) = status_send_channel {
let _ = s.send(StatusMessage::ScanningComplete(
"Scanning Complete".to_owned(),
));
}
Ok(ScannedBlockInfo {
height: end_height,
hash: "".to_owned(),

View file

@ -57,6 +57,7 @@ pub use crate::slate_versions::{
SlateVersion, VersionedCoinbase, VersionedSlate, CURRENT_SLATE_VERSION,
GRIN_BLOCK_HEADER_VERSION,
};
pub use api_impl::owner_updater::StatusMessage;
pub use api_impl::types::{
BlockFees, InitTxArgs, InitTxSendArgs, IssueInvoiceTxArgs, NodeHeightResult,
OutputCommitMapping, SendTXArgs, VersionInfo,

View file

@ -104,7 +104,7 @@ subcommands:
possible_values:
- all
- smallest
default_value: all
default_value: smallest
takes_value: true
- estimate_selection_strategies:
help: Estimates all possible Coin/Output selection strategies.

View file

@ -391,6 +391,7 @@ where
})?;
let res_val: Value = serde_json::from_str(&res).unwrap();
//println!("RES_VAL: {}", res_val);
// encryption error, just return the string
if res_val["error"] != json!(null) {
return Ok(Err(WalletAPIReturnError {

View file

@ -438,7 +438,67 @@ fn owner_v3_lifecycle() -> Result<(), grin_wallet_controller::Error> {
thread::sleep(Duration::from_millis(200));
assert_eq!(res.unwrap().1.amount_awaiting_finalization, 6000000000);
// 21) Delete the wallet (close first)
// 21) Start the automatic updater, let it run for a bit
let req = serde_json::json!({
"jsonrpc": "2.0",
"id": 1,
"method": "start_updater",
"params": {
"token": token,
"frequency": 3000,
}
});
let res = send_request_enc::<String>(
1,
1,
"http://127.0.0.1:43420/v3/owner",
&req.to_string(),
&shared_key,
)?;
assert!(res.is_ok());
println!("RES 21: {:?}", res);
thread::sleep(Duration::from_millis(5000));
// 22) Retrieve some messages about updater status
let req = serde_json::json!({
"jsonrpc": "2.0",
"id": 1,
"method": "get_updater_messages",
"params": {
"count": 1000,
}
});
let res = send_request_enc::<String>(
1,
1,
"http://127.0.0.1:43420/v3/owner",
&req.to_string(),
&shared_key,
)?;
assert!(res.is_ok());
println!("RES 22: {:?}", res);
// 23) Stop Updater
let req = serde_json::json!({
"jsonrpc": "2.0",
"id": 1,
"method": "stop_updater",
"params": null
});
let res = send_request_enc::<String>(
1,
1,
"http://127.0.0.1:43420/v3/owner",
&req.to_string(),
&shared_key,
)?;
assert!(res.is_ok());
println!("RES 23: {:?}", res);
// 24) Delete the wallet (close first)
let req = include_str!("data/v3_reqs/close_wallet.req.json");
let res =
send_request_enc::<String>(1, 1, "http://127.0.0.1:43420/v3/owner", &req, &shared_key)?;
@ -447,14 +507,14 @@ fn owner_v3_lifecycle() -> Result<(), grin_wallet_controller::Error> {
let req = include_str!("data/v3_reqs/delete_wallet.req.json");
let res =
send_request_enc::<String>(1, 1, "http://127.0.0.1:43420/v3/owner", &req, &shared_key)?;
println!("RES 21: {:?}", res);
println!("RES 24: {:?}", res);
assert!(res.is_ok());
// 22) Wallet should be gone
// 25) Wallet should be gone
let req = include_str!("data/v3_reqs/open_wallet.req.json");
let res =
send_request_enc::<String>(1, 1, "http://127.0.0.1:43420/v3/owner", &req, &shared_key)?;
println!("RES 22: {:?}", res);
println!("RES 25: {:?}", res);
assert!(res.is_err());
clean_output_dir(test_dir);