only hold wallet write lock for write operations on the wallet (#210)

* mount v2 router for flexibility, wallet checker now refreshes multiple outputs via single api call
* add read_wallet so we can read without acquiring the lock
* fix the api router
* read wallet without acquiring or holding lock, only acquire the write lock for wallet when updating or adding outputs
This commit is contained in:
AntiochP 2017-10-25 17:09:34 -04:00 committed by Ignotus Peverell
parent e2e24bc38e
commit d7b94a12f5
5 changed files with 242 additions and 184 deletions

View file

@ -23,6 +23,7 @@ use types::*;
use keychain::{Identifier, Keychain}; use keychain::{Identifier, Keychain};
use secp::pedersen; use secp::pedersen;
use util; use util;
use util::LOGGER;
// Transitions a local wallet output from Unconfirmed -> Unspent. // Transitions a local wallet output from Unconfirmed -> Unspent.
// Also updates the height and lock_height based on latest from the api. // Also updates the height and lock_height based on latest from the api.
@ -30,8 +31,11 @@ fn refresh_output(out: &mut OutputData, api_out: &api::Output) {
out.height = api_out.height; out.height = api_out.height;
out.lock_height = api_out.lock_height; out.lock_height = api_out.lock_height;
if out.status == OutputStatus::Unconfirmed { match out.status {
out.status = OutputStatus::Unspent; OutputStatus::Unconfirmed => {
out.status = OutputStatus::Unspent;
},
_ => (),
} }
} }
@ -39,8 +43,11 @@ fn refresh_output(out: &mut OutputData, api_out: &api::Output) {
// Unspent -> Spent // Unspent -> Spent
// Locked -> Spent // Locked -> Spent
fn mark_spent_output(out: &mut OutputData) { fn mark_spent_output(out: &mut OutputData) {
if vec![OutputStatus::Unspent, OutputStatus::Locked].contains(&out.status) { match out.status {
out.status = OutputStatus::Spent; OutputStatus::Unspent | OutputStatus::Locked => {
out.status = OutputStatus::Spent
},
_ => (),
} }
} }
@ -50,53 +57,58 @@ pub fn refresh_outputs(
config: &WalletConfig, config: &WalletConfig,
keychain: &Keychain, keychain: &Keychain,
) -> Result<(), Error> { ) -> Result<(), Error> {
WalletData::with_wallet(&config.data_file_dir, |wallet_data| { debug!(LOGGER, "Refreshing wallet outputs");
let mut wallet_outputs: HashMap<pedersen::Commitment, Identifier> = HashMap::new(); let mut wallet_outputs: HashMap<pedersen::Commitment, Identifier> = HashMap::new();
let mut commits: Vec<pedersen::Commitment> = vec![]; let mut commits: Vec<pedersen::Commitment> = vec![];
// build a local map of wallet outputs by commits // build a local map of wallet outputs by commits
// and a list of outputs we wantot query the node for // and a list of outputs we wantot query the node for
let _ = WalletData::read_wallet(&config.data_file_dir, |wallet_data| {
for out in wallet_data.outputs for out in wallet_data.outputs
.values_mut() .values()
.filter(|out| out.root_key_id == keychain.root_key_id()) .filter(|out| out.root_key_id == keychain.root_key_id())
.filter(|out| out.status != OutputStatus::Spent) { .filter(|out| out.status != OutputStatus::Spent)
let key_id = keychain.derive_key_id(out.n_child).unwrap(); {
let commit = keychain.commit(out.value, &key_id).unwrap(); let key_id = keychain.derive_key_id(out.n_child).unwrap();
commits.push(commit); let commit = keychain.commit(out.value, &key_id).unwrap();
wallet_outputs.insert(commit, out.key_id.clone()); commits.push(commit);
wallet_outputs.insert(commit, out.key_id.clone());
}
});
// build the necessary query params -
// ?id=xxx&id=yyy&id=zzz
let query_params: Vec<String> = commits
.iter()
.map(|commit| {
let id = util::to_hex(commit.as_ref().to_vec());
format!("id={}", id)
})
.collect();
let query_string = query_params.join("&");
let url = format!(
"{}/v2/chain/utxos?{}",
config.check_node_api_http_addr,
query_string,
);
// build a map of api outputs by commit so we can look them up efficiently
let mut api_outputs: HashMap<pedersen::Commitment, api::Output> = HashMap::new();
match api::client::get::<Vec<api::Output>>(url.as_str()) {
Ok(outputs) => {
for out in outputs {
api_outputs.insert(out.commit, out);
} }
},
Err(_) => {},
};
// build the necessary query params - // now for each commit, find the output in the wallet and
// ?id=xxx&id=yyy&id=zzz // the corresponding api output (if it exists)
let query_params: Vec<String> = commits // and refresh it in-place in the wallet.
.iter() // Note: minimizing the time we spend holding the wallet lock.
.map(|commit| { WalletData::with_wallet(&config.data_file_dir, |wallet_data| {
let id = util::to_hex(commit.as_ref().to_vec());
format!("id={}", id)
})
.collect();
let query_string = query_params.join("&");
let url = format!(
"{}/v2/chain/utxos?{}",
config.check_node_api_http_addr,
query_string,
);
// build a map of api outputs by commit so we can look them up efficiently
let mut api_outputs: HashMap<pedersen::Commitment, api::Output> = HashMap::new();
match api::client::get::<Vec<api::Output>>(url.as_str()) {
Ok(outputs) => {
for out in outputs {
api_outputs.insert(out.commit, out);
}
},
Err(_) => {},
};
// now for each commit we want to refresh the output for
// find the corresponding api output (if it exists)
// and refresh it in-place in the wallet
for commit in commits { for commit in commits {
let id = wallet_outputs.get(&commit).unwrap(); let id = wallet_outputs.get(&commit).unwrap();
if let Entry::Occupied(mut output) = wallet_data.outputs.entry(id.to_hex()) { if let Entry::Occupied(mut output) = wallet_data.outputs.entry(id.to_hex()) {

View file

@ -20,8 +20,8 @@ pub fn show_info(config: &WalletConfig, keychain: &Keychain) {
let root_key_id = keychain.root_key_id(); let root_key_id = keychain.root_key_id();
let _ = checker::refresh_outputs(&config, &keychain); let _ = checker::refresh_outputs(&config, &keychain);
// operate within a lock on wallet data // just read the wallet here, no need for a write lock
let _ = WalletData::with_wallet(&config.data_file_dir, |wallet_data| { let _ = WalletData::read_wallet(&config.data_file_dir, |wallet_data| {
// get the current height via the api // get the current height via the api
// if we cannot get the current height use the max height known to the wallet // if we cannot get the current height use the max height known to the wallet
@ -35,11 +35,8 @@ pub fn show_info(config: &WalletConfig, keychain: &Keychain) {
} }
}; };
// need to specify a default value here somehow
let minimum_confirmations = 1;
println!("Outputs - "); println!("Outputs - ");
println!("key_id, height, lock_height, status, spendable?, coinbase?, value"); println!("key_id, height, lock_height, status, coinbase?, num_confs, value");
println!("----------------------------------"); println!("----------------------------------");
let mut outputs = wallet_data let mut outputs = wallet_data
@ -55,8 +52,8 @@ pub fn show_info(config: &WalletConfig, keychain: &Keychain) {
out.height, out.height,
out.lock_height, out.lock_height,
out.status, out.status,
out.eligible_to_spend(current_height, minimum_confirmations),
out.is_coinbase, out.is_coinbase,
out.num_confirmations(current_height),
out.value, out.value,
); );
} }

View file

@ -1,4 +1,4 @@
// Copyright 2016 The Grin Developers // Copyright 2017 The Grin Developers
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@ -53,7 +53,7 @@ use core::consensus::reward;
use core::core::{Block, Transaction, TxKernel, Output, build}; use core::core::{Block, Transaction, TxKernel, Output, build};
use core::ser; use core::ser;
use api::{self, ApiEndpoint, Operation, ApiResult}; use api::{self, ApiEndpoint, Operation, ApiResult};
use keychain::{BlindingFactor, Keychain}; use keychain::{BlindingFactor, Identifier, Keychain};
use types::*; use types::*;
use util; use util;
use util::LOGGER; use util::LOGGER;
@ -171,6 +171,36 @@ impl ApiEndpoint for WalletReceiver {
} }
} }
// Read wallet data without acquiring the write lock.
fn retrieve_existing_key(
config: &WalletConfig,
key_id: Identifier,
) -> Result<(Identifier, u32), Error> {
let res = WalletData::read_wallet(&config.data_file_dir, |wallet_data| {
if let Some(existing) = wallet_data.get_output(&key_id) {
let key_id = existing.key_id.clone();
let derivation = existing.n_child;
(key_id, derivation)
} else {
panic!("should never happen");
}
})?;
Ok(res)
}
fn next_available_key(
config: &WalletConfig,
keychain: &Keychain,
) -> Result<(Identifier, u32), Error> {
let res = WalletData::read_wallet(&config.data_file_dir, |wallet_data| {
let root_key_id = keychain.root_key_id();
let derivation = wallet_data.next_child(root_key_id.clone());
let key_id = keychain.derive_key_id(derivation).unwrap();
(key_id, derivation)
})?;
Ok(res)
}
/// Build a coinbase output and the corresponding kernel /// Build a coinbase output and the corresponding kernel
fn receive_coinbase( fn receive_coinbase(
config: &WalletConfig, config: &WalletConfig,
@ -178,25 +208,15 @@ fn receive_coinbase(
block_fees: &BlockFees block_fees: &BlockFees
) -> Result<(Output, TxKernel, BlockFees), Error> { ) -> Result<(Output, TxKernel, BlockFees), Error> {
let root_key_id = keychain.root_key_id(); let root_key_id = keychain.root_key_id();
let key_id = block_fees.key_id();
// operate within a lock on wallet data let (key_id, derivation) = match key_id {
Some(key_id) => retrieve_existing_key(config, key_id)?,
None => next_available_key(config, keychain)?,
};
// Now acquire the wallet lock and write the new output.
WalletData::with_wallet(&config.data_file_dir, |wallet_data| { WalletData::with_wallet(&config.data_file_dir, |wallet_data| {
let key_id = block_fees.key_id();
let (key_id, derivation) = match key_id {
Some(key_id) => {
if let Some(existing) = wallet_data.get_output(&key_id) {
(existing.key_id.clone(), existing.n_child)
} else {
panic!("should never happen");
}
},
None => {
let derivation = wallet_data.next_child(root_key_id.clone());
let key_id = keychain.derive_key_id(derivation)?;
(key_id, derivation)
}
};
// track the new output and return the stuff needed for reward // track the new output and return the stuff needed for reward
wallet_data.add_output(OutputData { wallet_data.add_output(OutputData {
root_key_id: root_key_id.clone(), root_key_id: root_key_id.clone(),
@ -208,29 +228,29 @@ fn receive_coinbase(
lock_height: 0, lock_height: 0,
is_coinbase: true, is_coinbase: true,
}); });
})?;
debug!( debug!(
LOGGER, LOGGER,
"Received coinbase and built candidate output - {:?}, {:?}, {}", "Received coinbase and built candidate output - {:?}, {:?}, {}",
root_key_id.clone(), root_key_id.clone(),
key_id.clone(), key_id.clone(),
derivation, derivation,
); );
debug!(LOGGER, "block_fees - {:?}", block_fees); debug!(LOGGER, "block_fees - {:?}", block_fees);
let mut block_fees = block_fees.clone(); let mut block_fees = block_fees.clone();
block_fees.key_id = Some(key_id.clone()); block_fees.key_id = Some(key_id.clone());
debug!(LOGGER, "block_fees updated - {:?}", block_fees); debug!(LOGGER, "block_fees updated - {:?}", block_fees);
let (out, kern) = Block::reward_output( let (out, kern) = Block::reward_output(
&keychain, &keychain,
&key_id, &key_id,
block_fees.fees, block_fees.fees,
)?; )?;
Ok((out, kern, block_fees)) Ok((out, kern, block_fees))
})?
} }
/// Builds a full transaction from the partial one sent to us for transfer /// Builds a full transaction from the partial one sent to us for transfer
@ -243,36 +263,33 @@ fn receive_transaction(
) -> Result<Transaction, Error> { ) -> Result<Transaction, Error> {
let root_key_id = keychain.root_key_id(); let root_key_id = keychain.root_key_id();
let (key_id, derivation) = next_available_key(config, keychain)?;
// double check the fee amount included in the partial tx
// we don't necessarily want to just trust the sender
// we could just overwrite the fee here (but we won't) due to the ecdsa sig
let fee = tx_fee(partial.inputs.len(), partial.outputs.len() + 1, None);
if fee != partial.fee {
return Err(Error::FeeDispute {
sender_fee: partial.fee,
recipient_fee: fee,
});
}
let out_amount = amount - fee;
let (tx_final, _) = build::transaction(vec![
build::initial_tx(partial),
build::with_excess(blinding),
build::output(out_amount, key_id.clone()),
// build::with_fee(fee_amount),
], keychain)?;
// make sure the resulting transaction is valid (could have been lied to on excess).
tx_final.validate(&keychain.secp())?;
// operate within a lock on wallet data // operate within a lock on wallet data
WalletData::with_wallet(&config.data_file_dir, |wallet_data| { WalletData::with_wallet(&config.data_file_dir, |wallet_data| {
let derivation = wallet_data.next_child(root_key_id.clone());
let key_id = keychain.derive_key_id(derivation)?;
// double check the fee amount included in the partial tx
// we don't necessarily want to just trust the sender
// we could just overwrite the fee here (but we won't) due to the ecdsa sig
let fee = tx_fee(partial.inputs.len(), partial.outputs.len() + 1, None);
if fee != partial.fee {
return Err(Error::FeeDispute {
sender_fee: partial.fee,
recipient_fee: fee,
});
}
let out_amount = amount - fee;
let (tx_final, _) = build::transaction(vec![
build::initial_tx(partial),
build::with_excess(blinding),
build::output(out_amount, key_id.clone()),
// build::with_fee(fee_amount),
], keychain)?;
// make sure the resulting transaction is valid (could have been lied to on
// excess)
tx_final.validate(&keychain.secp())?;
// track the new output and return the finalized transaction to broadcast
wallet_data.add_output(OutputData { wallet_data.add_output(OutputData {
root_key_id: root_key_id.clone(), root_key_id: root_key_id.clone(),
key_id: key_id.clone(), key_id: key_id.clone(),
@ -283,14 +300,15 @@ fn receive_transaction(
lock_height: 0, lock_height: 0,
is_coinbase: false, is_coinbase: false,
}); });
debug!( })?;
LOGGER,
"Received txn and built output - {:?}, {:?}, {}",
root_key_id.clone(),
key_id.clone(),
derivation,
);
Ok(tx_final) debug!(
})? LOGGER,
"Received txn and built output - {:?}, {:?}, {}",
root_key_id.clone(),
key_id.clone(),
derivation,
);
Ok(tx_final)
} }

View file

@ -81,25 +81,21 @@ fn build_send_tx(
) -> Result<(Transaction, BlindingFactor), Error> { ) -> Result<(Transaction, BlindingFactor), Error> {
let key_id = keychain.clone().root_key_id(); let key_id = keychain.clone().root_key_id();
// operate within a lock on wallet data // select some spendable coins from the wallet
WalletData::with_wallet(&config.data_file_dir, |wallet_data| { let coins = WalletData::read_wallet(&config.data_file_dir, |wallet_data| {
wallet_data.select(key_id.clone(), current_height, minimum_confirmations)
})?;
// select some spendable coins from our local wallet // build transaction skeleton with inputs and change
let coins = wallet_data.select(key_id.clone(), current_height, minimum_confirmations); let mut parts = inputs_and_change(&coins, config, keychain, key_id, amount)?;
// build transaction skeleton with inputs and change // This is more proof of concept than anything but here we set lock_height
// TODO - should probably also check we are sending enough to cover the fees + non-zero output // on tx being sent (based on current chain height via api).
let mut parts = inputs_and_change(&coins, keychain, key_id, wallet_data, amount)?; parts.push(build::with_lock_height(lock_height));
// This is more proof of concept than anything but here we set a let (tx, blind) = build::transaction(parts, &keychain)?;
// lock_height on the transaction being sent (based on current chain height via
// api).
parts.push(build::with_lock_height(lock_height));
let (tx, blind) = build::transaction(parts, &keychain)?; Ok((tx, blind))
Ok((tx, blind))
})?
} }
pub fn issue_burn_tx( pub fn issue_burn_tx(
@ -117,39 +113,48 @@ pub fn issue_burn_tx(
let key_id = keychain.root_key_id(); let key_id = keychain.root_key_id();
// operate within a lock on wallet data // select some spendable coins from the wallet
WalletData::with_wallet(&config.data_file_dir, |mut wallet_data| { let coins = WalletData::read_wallet(&config.data_file_dir, |wallet_data| {
wallet_data.select(key_id.clone(), current_height, minimum_confirmations)
})?;
// select some spendable coins from the wallet let mut parts = inputs_and_change(&coins, config, keychain, key_id, amount)?;
let coins = wallet_data.select(key_id.clone(), current_height, minimum_confirmations);
// build transaction skeleton with inputs and change // add burn output and fees
let mut parts = inputs_and_change(&coins, keychain, key_id, &mut wallet_data, amount)?; let fee = tx_fee(coins.len(), 2, None);
parts.push(build::output(amount - fee, Identifier::zero()));
// add burn output and fees // finalize the burn transaction and send
let fee = tx_fee(coins.len(), 2, None); let (tx_burn, _) = build::transaction(parts, &keychain)?;
parts.push(build::output(amount - fee, Identifier::zero())); tx_burn.validate(&keychain.secp())?;
// finalize the burn transaction and send let tx_hex = util::to_hex(ser::ser_vec(&tx_burn).unwrap());
let (tx_burn, _) = build::transaction(parts, &keychain)?; let url = format!("{}/v1/pool/push", config.check_node_api_http_addr.as_str());
tx_burn.validate(&keychain.secp())?; let _: () = api::client::post(url.as_str(), &TxWrapper { tx_hex: tx_hex })
.map_err(|e| Error::Node(e))?;
Ok(())
}
let tx_hex = util::to_hex(ser::ser_vec(&tx_burn).unwrap()); fn next_available_key(
let url = format!("{}/v1/pool/push", config.check_node_api_http_addr.as_str()); config: &WalletConfig,
let _: () = api::client::post(url.as_str(), &TxWrapper { tx_hex: tx_hex }) keychain: &Keychain,
.map_err(|e| Error::Node(e))?; ) -> Result<(Identifier, u32), Error> {
Ok(()) let res = WalletData::read_wallet(&config.data_file_dir, |wallet_data| {
})? let root_key_id = keychain.root_key_id();
let derivation = wallet_data.next_child(root_key_id.clone());
let key_id = keychain.derive_key_id(derivation).unwrap();
(key_id, derivation)
})?;
Ok(res)
} }
fn inputs_and_change( fn inputs_and_change(
coins: &Vec<OutputData>, coins: &Vec<OutputData>,
config: &WalletConfig,
keychain: &Keychain, keychain: &Keychain,
root_key_id: Identifier, root_key_id: Identifier,
wallet_data: &mut WalletData,
amount: u64, amount: u64,
) -> Result<Vec<Box<build::Append>>, Error> { ) -> Result<Vec<Box<build::Append>>, Error> {
let mut parts = vec![]; let mut parts = vec![];
// calculate the total across all inputs, and how much is left // calculate the total across all inputs, and how much is left
@ -177,27 +182,29 @@ fn inputs_and_change(
parts.push(build::input(coin.value, key_id)); parts.push(build::input(coin.value, key_id));
} }
// derive an additional pubkey for change and build the change output let (change_key, change_derivation) = next_available_key(config, keychain)?;
let change_derivation = wallet_data.next_child(root_key_id.clone());
let change_key = keychain.derive_key_id(change_derivation)?;
parts.push(build::output(change, change_key.clone())); parts.push(build::output(change, change_key.clone()));
// we got that far, time to start tracking the output representing our change // Acquire wallet lock, add the new change output and lock coins being spent.
wallet_data.add_output(OutputData { WalletData::with_wallet(&config.data_file_dir, |wallet_data| {
root_key_id: root_key_id.clone(), // we got that far, time to start tracking the output representing our change
key_id: change_key.clone(), wallet_data.add_output(OutputData {
n_child: change_derivation, root_key_id: root_key_id.clone(),
value: change as u64, key_id: change_key.clone(),
status: OutputStatus::Unconfirmed, n_child: change_derivation,
height: 0, value: change as u64,
lock_height: 0, status: OutputStatus::Unconfirmed,
is_coinbase: false, height: 0,
}); lock_height: 0,
is_coinbase: false,
});
// now lock the ouputs we're spending so we avoid accidental double spend attempt // now lock the ouputs we're spending so we avoid accidental double spend attempt
for coin in coins { for coin in coins {
wallet_data.lock_output(coin); wallet_data.lock_output(coin);
} }
})?;
Ok(parts) Ok(parts)
} }

View file

@ -188,6 +188,16 @@ impl OutputData {
self.status = OutputStatus::Locked; self.status = OutputStatus::Locked;
} }
/// How many confirmations has this output received?
pub fn num_confirmations(&self, current_height: u64) -> u64 {
if self.status == OutputStatus::Unconfirmed {
0
} else {
current_height - self.height
}
}
/// Check if output is eligible for spending based on state and height.
pub fn eligible_to_spend( pub fn eligible_to_spend(
&self, &self,
current_height: u64, current_height: u64,
@ -318,6 +328,18 @@ pub struct WalletData {
} }
impl WalletData { impl WalletData {
/// Allows for reading wallet data (without needing to acquire the write lock).
pub fn read_wallet<T, F>(data_file_dir: &str, f: F) -> Result<T, Error>
where F: FnOnce(&WalletData) -> T
{
// open the wallet readonly and do what needs to be done with it
let data_file_path = &format!("{}{}{}", data_file_dir, MAIN_SEPARATOR, DAT_FILE);
let wdat = WalletData::read_or_create(data_file_path)?;
let res = f(&wdat);
Ok(res)
}
/// Allows the reading and writing of the wallet data within a file lock. /// Allows the reading and writing of the wallet data within a file lock.
/// Just provide a closure taking a mutable WalletData. The lock should /// Just provide a closure taking a mutable WalletData. The lock should
/// be held for as short a period as possible to avoid contention. /// be held for as short a period as possible to avoid contention.
@ -335,7 +357,7 @@ impl WalletData {
let data_file_path = &format!("{}{}{}", data_file_dir, MAIN_SEPARATOR, DAT_FILE); let data_file_path = &format!("{}{}{}", data_file_dir, MAIN_SEPARATOR, DAT_FILE);
let lock_file_path = &format!("{}{}{}", data_file_dir, MAIN_SEPARATOR, LOCK_FILE); let lock_file_path = &format!("{}{}{}", data_file_dir, MAIN_SEPARATOR, LOCK_FILE);
// create the lock files, if it already exists, will produce an error // create the lock file, if it already exists, will produce an error
// sleep and retry a few times if we cannot get it the first time // sleep and retry a few times if we cannot get it the first time
let mut retries = 0; let mut retries = 0;
loop { loop {
@ -351,10 +373,11 @@ impl WalletData {
}); });
match result { match result {
Ok(_) => { Ok(_) => {
info!(LOGGER, "acquired wallet lock ...");
break; break;
} }
Err(e) => { Err(e) => {
if retries >= 6 { if retries >= 10 {
info!( info!(
LOGGER, LOGGER,
"failed to obtain wallet.lock after {} retries, \ "failed to obtain wallet.lock after {} retries, \
@ -369,12 +392,11 @@ impl WalletData {
retries retries
); );
retries += 1; retries += 1;
thread::sleep(time::Duration::from_millis(1000)); thread::sleep(time::Duration::from_millis(250));
} }
} }
} }
// do what needs to be done // do what needs to be done
let mut wdat = WalletData::read_or_create(data_file_path)?; let mut wdat = WalletData::read_or_create(data_file_path)?;
let res = f(&mut wdat); let res = f(&mut wdat);
@ -387,6 +409,8 @@ impl WalletData {
)) ))
})?; })?;
info!(LOGGER, "... released wallet lock");
Ok(res) Ok(res)
} }