From 8b324f7429ff0b00290fcb453f4f310bcf267490 Mon Sep 17 00:00:00 2001 From: AntiochP <30642645+antiochp@users.noreply.github.com> Date: Fri, 27 Oct 2017 13:36:03 -0400 Subject: [PATCH] add retry logic to miner when hitting wallet coinbase API (#213) * mount v2 router for flexibility, wallet checker now refreshes multiple outputs via single api call * fix the api router * wallet api handlers, miner uses wallet_client * retry logic via tokio_retry, miner creates new coinbase output via wallet API (retries several times) * move wallet client into wallet crateand rework the lock acquisition logic to use tokio_retry --- api/src/handlers.rs | 2 +- grin/Cargo.toml | 7 +-- grin/src/lib.rs | 1 + grin/src/miner.rs | 57 +++++++++++++----------- grin/src/types.rs | 11 ++++- wallet/Cargo.toml | 5 +++ wallet/src/client.rs | 74 +++++++++++++++++++++++++++++++ wallet/src/handlers.rs | 88 +++++++++++++++++++++++++++++++++++++ wallet/src/lib.rs | 12 +++++- wallet/src/receiver.rs | 51 +++------------------- wallet/src/server.rs | 17 +++++++- wallet/src/types.rs | 98 ++++++++++++++++++++++++++---------------- 12 files changed, 308 insertions(+), 115 deletions(-) create mode 100644 wallet/src/client.rs create mode 100644 wallet/src/handlers.rs diff --git a/api/src/handlers.rs b/api/src/handlers.rs index bb3a7ff9f..adf71e877 100644 --- a/api/src/handlers.rs +++ b/api/src/handlers.rs @@ -1,4 +1,4 @@ -// Copyright 2016 The Grin Developers +// Copyright 2017 The Grin Developers // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/grin/Cargo.toml b/grin/Cargo.toml index 1a2eb6fad..3b182a717 100644 --- a/grin/Cargo.toml +++ b/grin/Cargo.toml @@ -19,13 +19,14 @@ secp256k1zkp = { git = "https://github.com/mimblewimble/rust-secp256k1-zkp" } futures = "^0.1.15" futures-cpupool = "^0.1.3" -hyper = { git = "https://github.com/hyperium/hyper" } +hyper = "~0.11.4" slog = { version = "^2.0.12", features = ["max_level_trace", "release_max_level_trace"] } time = "^0.1" serde = "~1.0.8" serde_derive = "~1.0.8" -tokio-core="^0.1.1" -tokio-timer="^0.1.0" +serde_json = "~1.0.2" +tokio-core="~0.1.1" +tokio-timer="~0.1.0" rand = "^0.3" itertools = "~0.6.0" diff --git a/grin/src/lib.rs b/grin/src/lib.rs index a87e7fc94..298ec499e 100644 --- a/grin/src/lib.rs +++ b/grin/src/lib.rs @@ -30,6 +30,7 @@ extern crate rand; extern crate serde; #[macro_use] extern crate serde_derive; +extern crate serde_json; extern crate time; extern crate tokio_core; extern crate tokio_timer; diff --git a/grin/src/miner.rs b/grin/src/miner.rs index 5e7db383c..699590f64 100644 --- a/grin/src/miner.rs +++ b/grin/src/miner.rs @@ -17,13 +17,11 @@ use rand::{self, Rng}; use std::sync::{Arc, RwLock}; -use std::thread; +use std::{thread, str}; use std; -use std::str; use time; use adapters::PoolToChainAdapter; -use api; use core::consensus; use core::core; use core::core::Proof; @@ -36,15 +34,15 @@ use pow::types::MinerConfig; use core::ser; use core::ser::AsFixedBytes; use util::LOGGER; - -// use core::genesis; +use types::Error; use chain; use secp; use pool; use util; use keychain::{Identifier, Keychain}; -use wallet::{BlockFees, WalletReceiveRequest, CbData}; +use wallet; +use wallet::BlockFees; use pow::plugin::PluginMiner; @@ -566,8 +564,10 @@ impl Miner { height, }; - let (output, kernel, block_fees) = self.get_coinbase(block_fees); + // TODO - error handling, things can go wrong with get_coinbase (wallet api down etc.) + let (output, kernel, block_fees) = self.get_coinbase(block_fees).unwrap(); let mut b = core::Block::with_reward(head, txs, output, kernel).unwrap(); + debug!( LOGGER, "(Server ID: {}) Built new block with {} inputs and {} outputs, difficulty: {}", @@ -591,26 +591,33 @@ impl Miner { (b, block_fees) } - fn get_coinbase(&self, block_fees: BlockFees) -> (core::Output, core::TxKernel, BlockFees) { + /// + /// Probably only want to do this when testing. + /// + fn burn_reward( + &self, + block_fees: BlockFees, + ) -> Result<(core::Output, core::TxKernel, BlockFees), Error> { + let keychain = Keychain::from_random_seed().unwrap(); + let key_id = keychain.derive_key_id(1).unwrap(); + let (out, kernel) = core::Block::reward_output(&keychain, &key_id, block_fees.fees) + .unwrap(); + Ok((out, kernel, block_fees)) + } + + fn get_coinbase( + &self, + block_fees: BlockFees, + ) -> Result<(core::Output, core::TxKernel, BlockFees), Error> { if self.config.burn_reward { - let keychain = Keychain::from_random_seed().unwrap(); - let key_id = keychain.derive_key_id(1).unwrap(); - let (out, kern) = core::Block::reward_output(&keychain, &key_id, block_fees.fees) - .unwrap(); - (out, kern, block_fees) + self.burn_reward(block_fees) } else { let url = format!( - "{}/v1/receive/coinbase", - self.config.wallet_receiver_url.as_str() - ); - let request = WalletReceiveRequest::Coinbase(block_fees.clone()); - let res: CbData = api::client::post(url.as_str(), &request).expect( - format!( - "(Server ID: {}) Wallet receiver unreachable, could not claim reward. Is it running?", - self.debug_output_id - .as_str() - ).as_str(), - ); + "{}/v2/receive/coinbase", + self.config.wallet_receiver_url.as_str()); + + let res = wallet::client::create_coinbase(&url, &block_fees)?; + let out_bin = util::from_hex(res.output).unwrap(); let kern_bin = util::from_hex(res.kernel).unwrap(); let key_id_bin = util::from_hex(res.key_id).unwrap(); @@ -624,7 +631,7 @@ impl Miner { debug!(LOGGER, "block_fees here: {:?}", block_fees); - (output, kernel, block_fees) + Ok((output, kernel, block_fees)) } } } diff --git a/grin/src/types.rs b/grin/src/types.rs index 22889c5e2..1c91a7a24 100644 --- a/grin/src/types.rs +++ b/grin/src/types.rs @@ -20,6 +20,7 @@ use p2p; use pool; use store; use pow; +use wallet; use core::global::MiningParameterMode; /// Error type wrapping underlying module errors. @@ -31,8 +32,10 @@ pub enum Error { Chain(chain::Error), /// Error originating from the peer-to-peer network. P2P(p2p::Error), - /// Error originating from HTTP API calls + /// Error originating from HTTP API calls. API(api::Error), + /// Error originating from wallet API. + Wallet(wallet::Error), } impl From for Error { @@ -59,6 +62,12 @@ impl From for Error { } } +impl From for Error { + fn from(e: wallet::Error) -> Error { + Error::Wallet(e) + } +} + /// Type of seeding the server will use to find other peers on the network. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub enum Seeding { diff --git a/wallet/Cargo.toml b/wallet/Cargo.toml index 94f0a3a6e..a608f1f61 100644 --- a/wallet/Cargo.toml +++ b/wallet/Cargo.toml @@ -16,7 +16,12 @@ blake2-rfc = "~0.2.17" serde = "~1.0.8" serde_derive = "~1.0.8" serde_json = "~1.0.2" +bodyparser = "~0.7.0" +futures = "^0.1.15" iron = "~0.5.1" +hyper = "~0.11.4" +tokio-core="~0.1.1" +tokio-retry="~0.1.0" router = "~0.5.1" grin_api = { path = "../api" } grin_core = { path = "../core" } diff --git a/wallet/src/client.rs b/wallet/src/client.rs new file mode 100644 index 000000000..e0c92db78 --- /dev/null +++ b/wallet/src/client.rs @@ -0,0 +1,74 @@ +// Copyright 2017 The Grin Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{io, time}; +use std::ops::FnMut; + +use futures::{Future, Stream}; +use hyper; +use hyper::{Method, Request}; +use hyper::header::ContentType; +use tokio_core::reactor; +use tokio_retry::Retry; +use tokio_retry::strategy::FibonacciBackoff; +use serde_json; + +use types::*; +use util::LOGGER; + +/// Call the wallet API to create a coinbase output for the given block_fees. +/// Will retry based on default "retry forever with backoff" behavior. +pub fn create_coinbase(url: &str, block_fees: &BlockFees) -> Result { + retry_backoff_forever(|| { + let res = single_create_coinbase(&url, &block_fees); + if let Err(_) = res { + error!(LOGGER, "Failed to get coinbase via wallet API (will retry)..."); + } + res + }) +} + +/// Runs the specified function wrapped in some basic retry logic. +fn retry_backoff_forever(f: F) -> Result + where F: (FnMut() -> Result) +{ + let mut core = reactor::Core::new()?; + let retry_strategy = FibonacciBackoff::from_millis(100) + .max_delay(time::Duration::from_secs(10)); + let retry_future = Retry::spawn(core.handle(), retry_strategy, f); + let res = core.run(retry_future).unwrap(); + Ok(res) +} + +/// Makes a single request to the wallet API to create a new coinbase output. +fn single_create_coinbase(url: &str, block_fees: &BlockFees) -> Result { + let mut core = reactor::Core::new()?; + let client = hyper::Client::new(&core.handle()); + + let mut req = Request::new(Method::Post, url.parse()?); + req.headers_mut().set(ContentType::json()); + let json = serde_json::to_string(&block_fees)?; + req.set_body(json); + + let work = client.request(req).and_then(|res| { + res.body().concat2().and_then(move |body| { + let coinbase: CbData = serde_json::from_slice(&body) + .map_err(|e| {io::Error::new(io::ErrorKind::Other, e)})?; + Ok(coinbase) + }) + }); + + let res = core.run(work)?; + Ok(res) +} diff --git a/wallet/src/handlers.rs b/wallet/src/handlers.rs new file mode 100644 index 000000000..66290cbd3 --- /dev/null +++ b/wallet/src/handlers.rs @@ -0,0 +1,88 @@ +// Copyright 2017 The Grin Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use iron::prelude::*; +use iron::Handler; +use iron::status; +use serde_json; +use bodyparser; + +use receiver::receive_coinbase; +use core::ser; +use api; +use keychain::Keychain; +use types::*; +use util; + + +pub struct CoinbaseHandler { + pub config: WalletConfig, + pub keychain: Keychain, +} + +impl CoinbaseHandler { + fn build_coinbase(&self, block_fees: &BlockFees) -> Result { + let (out, kern, block_fees) = receive_coinbase( + &self.config, + &self.keychain, + block_fees, + ).map_err(|e| { + api::Error::Internal(format!("Error building coinbase: {:?}", e)) + })?; + + let out_bin = ser::ser_vec(&out).map_err(|e| { + api::Error::Internal(format!("Error serializing output: {:?}", e)) + })?; + + let kern_bin = ser::ser_vec(&kern).map_err(|e| { + api::Error::Internal(format!("Error serializing kernel: {:?}", e)) + })?; + + let key_id_bin = match block_fees.key_id { + Some(key_id) => { + ser::ser_vec(&key_id).map_err(|e| { + api::Error::Internal( + format!("Error serializing kernel: {:?}", e), + ) + })? + } + None => vec![], + }; + + Ok(CbData { + output: util::to_hex(out_bin), + kernel: util::to_hex(kern_bin), + key_id: util::to_hex(key_id_bin), + }) + } +} + +// TODO - error handling - what to return if we fail to get the wallet lock for some reason... +impl Handler for CoinbaseHandler { + fn handle(&self, req: &mut Request) -> IronResult { + let struct_body = req.get::>(); + + if let Ok(Some(block_fees)) = struct_body { + let coinbase = self.build_coinbase(&block_fees) + .map_err(|e| IronError::new(e, status::BadRequest))?; + if let Ok(json) = serde_json::to_string(&coinbase) { + Ok(Response::with((status::Ok, json))) + } else { + Ok(Response::with((status::BadRequest, ""))) + } + } else { + Ok(Response::with((status::BadRequest, ""))) + } + } +} diff --git a/wallet/src/lib.rs b/wallet/src/lib.rs index 088fda892..e2190dbf5 100644 --- a/wallet/src/lib.rs +++ b/wallet/src/lib.rs @@ -1,4 +1,4 @@ -// Copyright 2016 The Grin Developers +// Copyright 2017 The Grin Developers // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -24,7 +24,13 @@ extern crate serde; extern crate serde_derive; extern crate serde_json; +extern crate bodyparser; +extern crate futures; +extern crate tokio_core; +extern crate tokio_retry; +extern crate hyper; extern crate iron; +#[macro_use] extern crate router; extern crate grin_api as api; @@ -34,13 +40,15 @@ extern crate grin_util as util; extern crate secp256k1zkp as secp; mod checker; +mod handlers; mod info; mod receiver; mod sender; mod types; +pub mod client; pub mod server; pub use info::show_info; pub use receiver::{WalletReceiver, receive_json_tx}; pub use sender::{issue_send_tx, issue_burn_tx}; -pub use types::{BlockFees, CbData, WalletConfig, WalletReceiveRequest, WalletSeed}; +pub use types::{BlockFees, CbData, Error, WalletConfig, WalletReceiveRequest, WalletSeed}; diff --git a/wallet/src/receiver.rs b/wallet/src/receiver.rs index 44a83ce22..bb6a33de0 100644 --- a/wallet/src/receiver.rs +++ b/wallet/src/receiver.rs @@ -67,10 +67,11 @@ pub struct TxWrapper { /// Receive an already well formed JSON transaction issuance and finalize the /// transaction, adding our receiving output, to broadcast to the rest of the /// network. -pub fn receive_json_tx(config: &WalletConfig, - keychain: &Keychain, - partial_tx_str: &str) - -> Result<(), Error> { +pub fn receive_json_tx( + config: &WalletConfig, + keychain: &Keychain, + partial_tx_str: &str, +) -> Result<(), Error> { let (amount, blinding, partial_tx) = partial_tx_from_json(keychain, partial_tx_str)?; let final_tx = receive_transaction(config, keychain, amount, blinding, partial_tx)?; let tx_hex = util::to_hex(ser::ser_vec(&final_tx).unwrap()); @@ -96,49 +97,11 @@ impl ApiEndpoint for WalletReceiver { type OP_OUT = CbData; fn operations(&self) -> Vec { - vec![Operation::Custom("coinbase".to_string()), - Operation::Custom("receive_json_tx".to_string())] + vec![Operation::Custom("receive_json_tx".to_string())] } fn operation(&self, op: String, input: WalletReceiveRequest) -> ApiResult { match op.as_str() { - "coinbase" => { - match input { - WalletReceiveRequest::Coinbase(cb_fees) => { - debug!(LOGGER, "Operation {} with fees {:?}", op, cb_fees); - let (out, kern, block_fees) = receive_coinbase( - &self.config, - &self.keychain, - &cb_fees, - ).map_err(|e| { - api::Error::Internal(format!("Error building coinbase: {:?}", e)) - })?; - let out_bin = ser::ser_vec(&out).map_err(|e| { - api::Error::Internal(format!("Error serializing output: {:?}", e)) - })?; - let kern_bin = ser::ser_vec(&kern).map_err(|e| { - api::Error::Internal(format!("Error serializing kernel: {:?}", e)) - })?; - let key_id_bin = match block_fees.key_id { - Some(key_id) => { - ser::ser_vec(&key_id).map_err(|e| { - api::Error::Internal( - format!("Error serializing kernel: {:?}", e), - ) - })? - } - None => vec![], - }; - - Ok(CbData { - output: util::to_hex(out_bin), - kernel: util::to_hex(kern_bin), - key_id: util::to_hex(key_id_bin), - }) - } - _ => Err(api::Error::Argument(format!("Incorrect request data: {}", op))), - } - } "receive_json_tx" => { match input { WalletReceiveRequest::PartialTransaction(partial_tx_str) => { @@ -202,7 +165,7 @@ fn next_available_key( } /// Build a coinbase output and the corresponding kernel -fn receive_coinbase( +pub fn receive_coinbase( config: &WalletConfig, keychain: &Keychain, block_fees: &BlockFees diff --git a/wallet/src/server.rs b/wallet/src/server.rs index c7c2ca79f..8ae3fc457 100644 --- a/wallet/src/server.rs +++ b/wallet/src/server.rs @@ -1,4 +1,4 @@ -// Copyright 2016 The Grin Developers +// Copyright 2017 The Grin Developers // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -15,6 +15,7 @@ use api::ApiServer; use keychain::Keychain; +use handlers::CoinbaseHandler; use receiver::WalletReceiver; use types::WalletConfig; use util::LOGGER; @@ -31,11 +32,23 @@ pub fn start_rest_apis(wallet_config: WalletConfig, keychain: Keychain) { apis.register_endpoint( "/receive".to_string(), WalletReceiver { - keychain: keychain, config: wallet_config.clone(), + keychain: keychain.clone(), }, ); + let coinbase_handler = CoinbaseHandler { + config: wallet_config.clone(), + keychain: keychain.clone(), + }; + // let tx_handler = TxHandler{}; + + let router = router!( + receive_coinbase: post "/receive/coinbase" => coinbase_handler, + // receive_tx: post "/receive/tx" => tx_handler, + ); + apis.register_handler("/v2", router); + apis.start(wallet_config.api_http_addr).unwrap_or_else(|e| { error!(LOGGER, "Failed to start Grin wallet receiver: {}.", e); }); diff --git a/wallet/src/types.rs b/wallet/src/types.rs index 379df9239..741274752 100644 --- a/wallet/src/types.rs +++ b/wallet/src/types.rs @@ -1,4 +1,4 @@ -// Copyright 2016 The Grin Developers +// Copyright 2017 The Grin Developers // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ use blake2; use rand::{thread_rng, Rng}; -use std::{fmt, num, thread, time}; +use std::{fmt, num, error}; use std::convert::From; use std::fs::{self, File, OpenOptions}; use std::io::{self, Read, Write}; @@ -23,8 +23,13 @@ use std::path::MAIN_SEPARATOR; use std::collections::HashMap; use std::cmp::min; +use hyper; use serde_json; use secp; +use tokio_core::reactor; +use tokio_retry::Retry; +use tokio_retry::strategy::FibonacciBackoff; + use api; use core::core::{Transaction, transaction}; @@ -68,6 +73,26 @@ pub enum Error { IOError(io::Error), /// Error when contacting a node through its API Node(api::Error), + /// Error originating from hyper. + Hyper(hyper::Error), + /// Error originating from hyper uri parsing. + Uri(hyper::error::UriError), +} + +impl error::Error for Error { + fn description(&self) -> &str { + match *self { + _ => "some kind of wallet error", + } + } +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + _ => write!(f, "some kind of wallet error"), + } + } } impl From for Error { @@ -112,6 +137,18 @@ impl From for Error { } } +impl From for Error { + fn from(e: hyper::Error) -> Error { + Error::Hyper(e) + } +} + +impl From for Error { + fn from(e: hyper::error::UriError) -> Error { + Error::Uri(e) + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct WalletConfig { // Whether to run a wallet @@ -361,47 +398,34 @@ impl WalletData { let data_file_path = &format!("{}{}{}", data_file_dir, MAIN_SEPARATOR, DAT_FILE); let lock_file_path = &format!("{}{}{}", data_file_dir, MAIN_SEPARATOR, LOCK_FILE); - // create the lock file, if it already exists, will produce an error - // sleep and retry a few times if we cannot get it the first time - let mut retries = 0; - loop { - let result = OpenOptions::new() + info!(LOGGER, "Acquiring wallet lock ..."); + + let action = || { + debug!(LOGGER, "Attempting to acquire wallet lock"); + OpenOptions::new() .write(true) .create_new(true) .open(lock_file_path) - .map_err(|_| { - Error::WalletData(format!( - "Could not create wallet lock file. Either \ - some other process is using the wallet or there is a write access issue." - )) - }); - match result { - Ok(_) => { - info!(LOGGER, "acquired wallet lock ..."); - break; - } - Err(e) => { - if retries >= 10 { - info!( - LOGGER, - "failed to obtain wallet.lock after {} retries, \ - unable to successfully open the wallet", - retries - ); - return Err(e); - } - debug!( - LOGGER, - "failed to obtain wallet.lock, retries - {}, sleeping and retrying", - retries - ); - retries += 1; - thread::sleep(time::Duration::from_millis(250)); - } + }; + + // use tokio_retry to cleanly define some retry logic + let mut core = reactor::Core::new().unwrap(); + let retry_strategy = FibonacciBackoff::from_millis(10).take(10); + let retry_future = Retry::spawn(core.handle(), retry_strategy, action); + let retry_result = core.run(retry_future); + + match retry_result { + Ok(_) => {}, + Err(_) => { + error!( + LOGGER, + "Failed to acquire wallet lock file (multiple retries)", + ); + return Err(Error::WalletData(format!("Failed to acquire lock file"))); } } - // do what needs to be done + // We successfully acquired the lock - so do what needs to be done. let mut wdat = WalletData::read_or_create(data_file_path)?; let res = f(&mut wdat); wdat.write(data_file_path)?;