add retry logic to miner when hitting wallet coinbase API (#213)

* mount v2 router for flexibility, wallet checker now refreshes multiple outputs via single api call
* fix the api router
* wallet api handlers, miner uses wallet_client
* retry logic via tokio_retry, miner creates new coinbase output via wallet API (retries several times)
* move wallet client into wallet crateand rework the lock acquisition logic to use tokio_retry
This commit is contained in:
AntiochP 2017-10-27 13:36:03 -04:00 committed by Ignotus Peverell
parent efe414bf07
commit 8b324f7429
12 changed files with 308 additions and 115 deletions

View file

@ -1,4 +1,4 @@
// Copyright 2016 The Grin Developers
// Copyright 2017 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.

View file

@ -19,13 +19,14 @@ secp256k1zkp = { git = "https://github.com/mimblewimble/rust-secp256k1-zkp" }
futures = "^0.1.15"
futures-cpupool = "^0.1.3"
hyper = { git = "https://github.com/hyperium/hyper" }
hyper = "~0.11.4"
slog = { version = "^2.0.12", features = ["max_level_trace", "release_max_level_trace"] }
time = "^0.1"
serde = "~1.0.8"
serde_derive = "~1.0.8"
tokio-core="^0.1.1"
tokio-timer="^0.1.0"
serde_json = "~1.0.2"
tokio-core="~0.1.1"
tokio-timer="~0.1.0"
rand = "^0.3"
itertools = "~0.6.0"

View file

@ -30,6 +30,7 @@ extern crate rand;
extern crate serde;
#[macro_use]
extern crate serde_derive;
extern crate serde_json;
extern crate time;
extern crate tokio_core;
extern crate tokio_timer;

View file

@ -17,13 +17,11 @@
use rand::{self, Rng};
use std::sync::{Arc, RwLock};
use std::thread;
use std::{thread, str};
use std;
use std::str;
use time;
use adapters::PoolToChainAdapter;
use api;
use core::consensus;
use core::core;
use core::core::Proof;
@ -36,15 +34,15 @@ use pow::types::MinerConfig;
use core::ser;
use core::ser::AsFixedBytes;
use util::LOGGER;
// use core::genesis;
use types::Error;
use chain;
use secp;
use pool;
use util;
use keychain::{Identifier, Keychain};
use wallet::{BlockFees, WalletReceiveRequest, CbData};
use wallet;
use wallet::BlockFees;
use pow::plugin::PluginMiner;
@ -566,8 +564,10 @@ impl Miner {
height,
};
let (output, kernel, block_fees) = self.get_coinbase(block_fees);
// TODO - error handling, things can go wrong with get_coinbase (wallet api down etc.)
let (output, kernel, block_fees) = self.get_coinbase(block_fees).unwrap();
let mut b = core::Block::with_reward(head, txs, output, kernel).unwrap();
debug!(
LOGGER,
"(Server ID: {}) Built new block with {} inputs and {} outputs, difficulty: {}",
@ -591,26 +591,33 @@ impl Miner {
(b, block_fees)
}
fn get_coinbase(&self, block_fees: BlockFees) -> (core::Output, core::TxKernel, BlockFees) {
if self.config.burn_reward {
///
/// Probably only want to do this when testing.
///
fn burn_reward(
&self,
block_fees: BlockFees,
) -> Result<(core::Output, core::TxKernel, BlockFees), Error> {
let keychain = Keychain::from_random_seed().unwrap();
let key_id = keychain.derive_key_id(1).unwrap();
let (out, kern) = core::Block::reward_output(&keychain, &key_id, block_fees.fees)
let (out, kernel) = core::Block::reward_output(&keychain, &key_id, block_fees.fees)
.unwrap();
(out, kern, block_fees)
Ok((out, kernel, block_fees))
}
fn get_coinbase(
&self,
block_fees: BlockFees,
) -> Result<(core::Output, core::TxKernel, BlockFees), Error> {
if self.config.burn_reward {
self.burn_reward(block_fees)
} else {
let url = format!(
"{}/v1/receive/coinbase",
self.config.wallet_receiver_url.as_str()
);
let request = WalletReceiveRequest::Coinbase(block_fees.clone());
let res: CbData = api::client::post(url.as_str(), &request).expect(
format!(
"(Server ID: {}) Wallet receiver unreachable, could not claim reward. Is it running?",
self.debug_output_id
.as_str()
).as_str(),
);
"{}/v2/receive/coinbase",
self.config.wallet_receiver_url.as_str());
let res = wallet::client::create_coinbase(&url, &block_fees)?;
let out_bin = util::from_hex(res.output).unwrap();
let kern_bin = util::from_hex(res.kernel).unwrap();
let key_id_bin = util::from_hex(res.key_id).unwrap();
@ -624,7 +631,7 @@ impl Miner {
debug!(LOGGER, "block_fees here: {:?}", block_fees);
(output, kernel, block_fees)
Ok((output, kernel, block_fees))
}
}
}

View file

@ -20,6 +20,7 @@ use p2p;
use pool;
use store;
use pow;
use wallet;
use core::global::MiningParameterMode;
/// Error type wrapping underlying module errors.
@ -31,8 +32,10 @@ pub enum Error {
Chain(chain::Error),
/// Error originating from the peer-to-peer network.
P2P(p2p::Error),
/// Error originating from HTTP API calls
/// Error originating from HTTP API calls.
API(api::Error),
/// Error originating from wallet API.
Wallet(wallet::Error),
}
impl From<chain::Error> for Error {
@ -59,6 +62,12 @@ impl From<api::Error> for Error {
}
}
impl From<wallet::Error> for Error {
fn from(e: wallet::Error) -> Error {
Error::Wallet(e)
}
}
/// Type of seeding the server will use to find other peers on the network.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum Seeding {

View file

@ -16,7 +16,12 @@ blake2-rfc = "~0.2.17"
serde = "~1.0.8"
serde_derive = "~1.0.8"
serde_json = "~1.0.2"
bodyparser = "~0.7.0"
futures = "^0.1.15"
iron = "~0.5.1"
hyper = "~0.11.4"
tokio-core="~0.1.1"
tokio-retry="~0.1.0"
router = "~0.5.1"
grin_api = { path = "../api" }
grin_core = { path = "../core" }

74
wallet/src/client.rs Normal file
View file

@ -0,0 +1,74 @@
// Copyright 2017 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{io, time};
use std::ops::FnMut;
use futures::{Future, Stream};
use hyper;
use hyper::{Method, Request};
use hyper::header::ContentType;
use tokio_core::reactor;
use tokio_retry::Retry;
use tokio_retry::strategy::FibonacciBackoff;
use serde_json;
use types::*;
use util::LOGGER;
/// Call the wallet API to create a coinbase output for the given block_fees.
/// Will retry based on default "retry forever with backoff" behavior.
pub fn create_coinbase(url: &str, block_fees: &BlockFees) -> Result<CbData, Error> {
retry_backoff_forever(|| {
let res = single_create_coinbase(&url, &block_fees);
if let Err(_) = res {
error!(LOGGER, "Failed to get coinbase via wallet API (will retry)...");
}
res
})
}
/// Runs the specified function wrapped in some basic retry logic.
fn retry_backoff_forever<F, R>(f: F) -> Result<R, Error>
where F: (FnMut() -> Result<R, Error>)
{
let mut core = reactor::Core::new()?;
let retry_strategy = FibonacciBackoff::from_millis(100)
.max_delay(time::Duration::from_secs(10));
let retry_future = Retry::spawn(core.handle(), retry_strategy, f);
let res = core.run(retry_future).unwrap();
Ok(res)
}
/// Makes a single request to the wallet API to create a new coinbase output.
fn single_create_coinbase(url: &str, block_fees: &BlockFees) -> Result<CbData, Error> {
let mut core = reactor::Core::new()?;
let client = hyper::Client::new(&core.handle());
let mut req = Request::new(Method::Post, url.parse()?);
req.headers_mut().set(ContentType::json());
let json = serde_json::to_string(&block_fees)?;
req.set_body(json);
let work = client.request(req).and_then(|res| {
res.body().concat2().and_then(move |body| {
let coinbase: CbData = serde_json::from_slice(&body)
.map_err(|e| {io::Error::new(io::ErrorKind::Other, e)})?;
Ok(coinbase)
})
});
let res = core.run(work)?;
Ok(res)
}

88
wallet/src/handlers.rs Normal file
View file

@ -0,0 +1,88 @@
// Copyright 2017 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use iron::prelude::*;
use iron::Handler;
use iron::status;
use serde_json;
use bodyparser;
use receiver::receive_coinbase;
use core::ser;
use api;
use keychain::Keychain;
use types::*;
use util;
pub struct CoinbaseHandler {
pub config: WalletConfig,
pub keychain: Keychain,
}
impl CoinbaseHandler {
fn build_coinbase(&self, block_fees: &BlockFees) -> Result<CbData, Error> {
let (out, kern, block_fees) = receive_coinbase(
&self.config,
&self.keychain,
block_fees,
).map_err(|e| {
api::Error::Internal(format!("Error building coinbase: {:?}", e))
})?;
let out_bin = ser::ser_vec(&out).map_err(|e| {
api::Error::Internal(format!("Error serializing output: {:?}", e))
})?;
let kern_bin = ser::ser_vec(&kern).map_err(|e| {
api::Error::Internal(format!("Error serializing kernel: {:?}", e))
})?;
let key_id_bin = match block_fees.key_id {
Some(key_id) => {
ser::ser_vec(&key_id).map_err(|e| {
api::Error::Internal(
format!("Error serializing kernel: {:?}", e),
)
})?
}
None => vec![],
};
Ok(CbData {
output: util::to_hex(out_bin),
kernel: util::to_hex(kern_bin),
key_id: util::to_hex(key_id_bin),
})
}
}
// TODO - error handling - what to return if we fail to get the wallet lock for some reason...
impl Handler for CoinbaseHandler {
fn handle(&self, req: &mut Request) -> IronResult<Response> {
let struct_body = req.get::<bodyparser::Struct<BlockFees>>();
if let Ok(Some(block_fees)) = struct_body {
let coinbase = self.build_coinbase(&block_fees)
.map_err(|e| IronError::new(e, status::BadRequest))?;
if let Ok(json) = serde_json::to_string(&coinbase) {
Ok(Response::with((status::Ok, json)))
} else {
Ok(Response::with((status::BadRequest, "")))
}
} else {
Ok(Response::with((status::BadRequest, "")))
}
}
}

View file

@ -1,4 +1,4 @@
// Copyright 2016 The Grin Developers
// Copyright 2017 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -24,7 +24,13 @@ extern crate serde;
extern crate serde_derive;
extern crate serde_json;
extern crate bodyparser;
extern crate futures;
extern crate tokio_core;
extern crate tokio_retry;
extern crate hyper;
extern crate iron;
#[macro_use]
extern crate router;
extern crate grin_api as api;
@ -34,13 +40,15 @@ extern crate grin_util as util;
extern crate secp256k1zkp as secp;
mod checker;
mod handlers;
mod info;
mod receiver;
mod sender;
mod types;
pub mod client;
pub mod server;
pub use info::show_info;
pub use receiver::{WalletReceiver, receive_json_tx};
pub use sender::{issue_send_tx, issue_burn_tx};
pub use types::{BlockFees, CbData, WalletConfig, WalletReceiveRequest, WalletSeed};
pub use types::{BlockFees, CbData, Error, WalletConfig, WalletReceiveRequest, WalletSeed};

View file

@ -67,10 +67,11 @@ pub struct TxWrapper {
/// Receive an already well formed JSON transaction issuance and finalize the
/// transaction, adding our receiving output, to broadcast to the rest of the
/// network.
pub fn receive_json_tx(config: &WalletConfig,
pub fn receive_json_tx(
config: &WalletConfig,
keychain: &Keychain,
partial_tx_str: &str)
-> Result<(), Error> {
partial_tx_str: &str,
) -> Result<(), Error> {
let (amount, blinding, partial_tx) = partial_tx_from_json(keychain, partial_tx_str)?;
let final_tx = receive_transaction(config, keychain, amount, blinding, partial_tx)?;
let tx_hex = util::to_hex(ser::ser_vec(&final_tx).unwrap());
@ -96,49 +97,11 @@ impl ApiEndpoint for WalletReceiver {
type OP_OUT = CbData;
fn operations(&self) -> Vec<Operation> {
vec![Operation::Custom("coinbase".to_string()),
Operation::Custom("receive_json_tx".to_string())]
vec![Operation::Custom("receive_json_tx".to_string())]
}
fn operation(&self, op: String, input: WalletReceiveRequest) -> ApiResult<CbData> {
match op.as_str() {
"coinbase" => {
match input {
WalletReceiveRequest::Coinbase(cb_fees) => {
debug!(LOGGER, "Operation {} with fees {:?}", op, cb_fees);
let (out, kern, block_fees) = receive_coinbase(
&self.config,
&self.keychain,
&cb_fees,
).map_err(|e| {
api::Error::Internal(format!("Error building coinbase: {:?}", e))
})?;
let out_bin = ser::ser_vec(&out).map_err(|e| {
api::Error::Internal(format!("Error serializing output: {:?}", e))
})?;
let kern_bin = ser::ser_vec(&kern).map_err(|e| {
api::Error::Internal(format!("Error serializing kernel: {:?}", e))
})?;
let key_id_bin = match block_fees.key_id {
Some(key_id) => {
ser::ser_vec(&key_id).map_err(|e| {
api::Error::Internal(
format!("Error serializing kernel: {:?}", e),
)
})?
}
None => vec![],
};
Ok(CbData {
output: util::to_hex(out_bin),
kernel: util::to_hex(kern_bin),
key_id: util::to_hex(key_id_bin),
})
}
_ => Err(api::Error::Argument(format!("Incorrect request data: {}", op))),
}
}
"receive_json_tx" => {
match input {
WalletReceiveRequest::PartialTransaction(partial_tx_str) => {
@ -202,7 +165,7 @@ fn next_available_key(
}
/// Build a coinbase output and the corresponding kernel
fn receive_coinbase(
pub fn receive_coinbase(
config: &WalletConfig,
keychain: &Keychain,
block_fees: &BlockFees

View file

@ -1,4 +1,4 @@
// Copyright 2016 The Grin Developers
// Copyright 2017 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -15,6 +15,7 @@
use api::ApiServer;
use keychain::Keychain;
use handlers::CoinbaseHandler;
use receiver::WalletReceiver;
use types::WalletConfig;
use util::LOGGER;
@ -31,11 +32,23 @@ pub fn start_rest_apis(wallet_config: WalletConfig, keychain: Keychain) {
apis.register_endpoint(
"/receive".to_string(),
WalletReceiver {
keychain: keychain,
config: wallet_config.clone(),
keychain: keychain.clone(),
},
);
let coinbase_handler = CoinbaseHandler {
config: wallet_config.clone(),
keychain: keychain.clone(),
};
// let tx_handler = TxHandler{};
let router = router!(
receive_coinbase: post "/receive/coinbase" => coinbase_handler,
// receive_tx: post "/receive/tx" => tx_handler,
);
apis.register_handler("/v2", router);
apis.start(wallet_config.api_http_addr).unwrap_or_else(|e| {
error!(LOGGER, "Failed to start Grin wallet receiver: {}.", e);
});

View file

@ -1,4 +1,4 @@
// Copyright 2016 The Grin Developers
// Copyright 2017 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -14,7 +14,7 @@
use blake2;
use rand::{thread_rng, Rng};
use std::{fmt, num, thread, time};
use std::{fmt, num, error};
use std::convert::From;
use std::fs::{self, File, OpenOptions};
use std::io::{self, Read, Write};
@ -23,8 +23,13 @@ use std::path::MAIN_SEPARATOR;
use std::collections::HashMap;
use std::cmp::min;
use hyper;
use serde_json;
use secp;
use tokio_core::reactor;
use tokio_retry::Retry;
use tokio_retry::strategy::FibonacciBackoff;
use api;
use core::core::{Transaction, transaction};
@ -68,6 +73,26 @@ pub enum Error {
IOError(io::Error),
/// Error when contacting a node through its API
Node(api::Error),
/// Error originating from hyper.
Hyper(hyper::Error),
/// Error originating from hyper uri parsing.
Uri(hyper::error::UriError),
}
impl error::Error for Error {
fn description(&self) -> &str {
match *self {
_ => "some kind of wallet error",
}
}
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
_ => write!(f, "some kind of wallet error"),
}
}
}
impl From<keychain::Error> for Error {
@ -112,6 +137,18 @@ impl From<io::Error> for Error {
}
}
impl From<hyper::Error> for Error {
fn from(e: hyper::Error) -> Error {
Error::Hyper(e)
}
}
impl From<hyper::error::UriError> for Error {
fn from(e: hyper::error::UriError) -> Error {
Error::Uri(e)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WalletConfig {
// Whether to run a wallet
@ -361,47 +398,34 @@ impl WalletData {
let data_file_path = &format!("{}{}{}", data_file_dir, MAIN_SEPARATOR, DAT_FILE);
let lock_file_path = &format!("{}{}{}", data_file_dir, MAIN_SEPARATOR, LOCK_FILE);
// create the lock file, if it already exists, will produce an error
// sleep and retry a few times if we cannot get it the first time
let mut retries = 0;
loop {
let result = OpenOptions::new()
info!(LOGGER, "Acquiring wallet lock ...");
let action = || {
debug!(LOGGER, "Attempting to acquire wallet lock");
OpenOptions::new()
.write(true)
.create_new(true)
.open(lock_file_path)
.map_err(|_| {
Error::WalletData(format!(
"Could not create wallet lock file. Either \
some other process is using the wallet or there is a write access issue."
))
});
match result {
Ok(_) => {
info!(LOGGER, "acquired wallet lock ...");
break;
}
Err(e) => {
if retries >= 10 {
info!(
};
// use tokio_retry to cleanly define some retry logic
let mut core = reactor::Core::new().unwrap();
let retry_strategy = FibonacciBackoff::from_millis(10).take(10);
let retry_future = Retry::spawn(core.handle(), retry_strategy, action);
let retry_result = core.run(retry_future);
match retry_result {
Ok(_) => {},
Err(_) => {
error!(
LOGGER,
"failed to obtain wallet.lock after {} retries, \
unable to successfully open the wallet",
retries
"Failed to acquire wallet lock file (multiple retries)",
);
return Err(e);
}
debug!(
LOGGER,
"failed to obtain wallet.lock, retries - {}, sleeping and retrying",
retries
);
retries += 1;
thread::sleep(time::Duration::from_millis(250));
}
return Err(Error::WalletData(format!("Failed to acquire lock file")));
}
}
// do what needs to be done
// We successfully acquired the lock - so do what needs to be done.
let mut wdat = WalletData::read_or_create(data_file_path)?;
let res = f(&mut wdat);
wdat.write(data_file_path)?;