diff --git a/grin.toml b/grin.toml index b0d588def..dcb1c5a35 100644 --- a/grin.toml +++ b/grin.toml @@ -174,6 +174,16 @@ burn_reward = false #slow_down_in_millis = 30 +#################################### +### STRATUM SERVER CONFIGURATION ### +#################################### + +#flag whether stratum server is enabled +enable_stratum_server = false + +#what port and address for the stratum server to listen on +stratum_server_addr = "127.0.0.1:13416" + ######################################### ### CUCKOO MINER PLUGIN CONFIGURATION ### ######################################### diff --git a/pow/src/types.rs b/pow/src/types.rs index 04b41adaf..3ec8dd37d 100644 --- a/pow/src/types.rs +++ b/pow/src/types.rs @@ -63,6 +63,12 @@ pub struct MinerConfig { /// a testing attribute for the time being that artifically slows down the /// mining loop by adding a sleep to the thread pub slow_down_in_millis: Option, + + /// Run a stratum mining server rather than mining locally in-process + pub enable_stratum_server: bool, + + /// If enabled, the address and port to listen on + pub stratum_server_addr: Option, } impl Default for MinerConfig { @@ -76,6 +82,8 @@ impl Default for MinerConfig { burn_reward: false, slow_down_in_millis: Some(0), attempt_time_per_block: 2, + enable_stratum_server: false, + stratum_server_addr: None, } } } diff --git a/servers/Cargo.toml b/servers/Cargo.toml index 7a272f529..35c2767b8 100644 --- a/servers/Cargo.toml +++ b/servers/Cargo.toml @@ -15,6 +15,8 @@ serde = "1" serde_derive = "1" serde_json = "1" time = "0.1" +bufstream = "~0.1" +jsonrpc-core = "~4.0" grin_api = { path = "../api" } grin_chain = { path = "../chain" } diff --git a/servers/src/common/stats.rs b/servers/src/common/stats.rs index 5f682b614..eaeac2849 100644 --- a/servers/src/common/stats.rs +++ b/servers/src/common/stats.rs @@ -17,6 +17,7 @@ use std::sync::{Arc, RwLock}; use std::sync::atomic::AtomicBool; +use std::time::SystemTime; use chain; use p2p; @@ -30,6 +31,8 @@ pub struct ServerStateInfo { pub awaiting_peers: Arc, /// Mining stats pub mining_stats: Arc>, + /// Stratum stats + pub stratum_stats: Arc>, } impl Default for ServerStateInfo { @@ -37,6 +40,7 @@ impl Default for ServerStateInfo { ServerStateInfo { awaiting_peers: Arc::new(AtomicBool::new(false)), mining_stats: Arc::new(RwLock::new(MiningStats::default())), + stratum_stats: Arc::new(RwLock::new(StratumStats::default())), } } } @@ -56,6 +60,8 @@ pub struct ServerStats { pub awaiting_peers: bool, /// Handle to current mining stats pub mining_stats: MiningStats, + /// Handle to current stratum server stats + pub stratum_stats: StratumStats, /// Peer stats pub peer_stats: Vec, /// Difficulty calculation statistics @@ -82,6 +88,44 @@ pub struct MiningStats { pub device_stats: Option>>, } +/// Struct to return relevant information about stratum workers +#[derive(Clone, Serialize, Debug)] +pub struct WorkerStats { + /// Unique ID for this worker + pub id: String, + /// whether stratum worker is currently connected + pub is_connected: bool, + /// Timestamp of most recent communication with this worker + pub last_seen: SystemTime, + /// pow difficulty this worker is using + pub pow_difficulty: u64, + /// number of valid shares submitted + pub num_accepted: u64, + /// number of invalid shares submitted + pub num_rejected: u64, + /// number of shares submitted too late + pub num_stale: u64, +} + +/// Struct to return relevant information about the stratum server +#[derive(Clone, Serialize, Debug)] +pub struct StratumStats { + /// whether stratum server is enabled + pub is_enabled: bool, + /// whether stratum server is running + pub is_running: bool, + /// Number of connected workers + pub num_workers: usize, + /// what block height we're mining at + pub block_height: u64, + /// current network difficulty we're working on + pub network_difficulty: u64, + /// cuckoo size used for mining + pub cuckoo_size: u16, + /// Individual worker status + pub worker_stats: Vec, +} + /// Stats on the last WINDOW blocks and the difficulty calculation #[derive(Clone)] pub struct DiffStats { @@ -167,3 +211,31 @@ impl Default for MiningStats { } } } + +impl Default for WorkerStats { + fn default() -> WorkerStats { + WorkerStats { + id: String::from("unknown"), + is_connected: false, + last_seen: SystemTime::now(), + pow_difficulty: 0, + num_accepted: 0, + num_rejected: 0, + num_stale: 0, + } + } +} + +impl Default for StratumStats { + fn default() -> StratumStats { + StratumStats { + is_enabled: false, + is_running: false, + num_workers: 0, + block_height: 0, + network_difficulty: 0, + cuckoo_size: 0, + worker_stats: Vec::new(), + } + } +} diff --git a/servers/src/grin/server.rs b/servers/src/grin/server.rs index 1681296c2..987300ba8 100644 --- a/servers/src/grin/server.rs +++ b/servers/src/grin/server.rs @@ -30,6 +30,7 @@ use core::core::target::Difficulty; use core::core::hash::Hashed; use grin::dandelion_monitor; use mining::miner; +use mining::stratumserver; use p2p; use pool; use grin::seed; @@ -67,12 +68,20 @@ impl Server { { let mut mining_config = config.mining_config.clone(); let serv = Arc::new(Server::new(config)?); + if mining_config.as_mut().unwrap().enable_mining { { let mut mining_stats = serv.state_info.mining_stats.write().unwrap(); mining_stats.is_enabled = true; } - serv.start_miner(mining_config.unwrap()); + serv.start_miner(mining_config.clone().unwrap()); + } + if mining_config.as_mut().unwrap().enable_stratum_server { + { + let mut stratum_stats = serv.state_info.stratum_stats.write().unwrap(); + stratum_stats.is_enabled = true; + } + serv.start_stratum_server(mining_config.clone().unwrap()); } info_callback(serv.clone()); @@ -275,6 +284,29 @@ impl Server { }); } + /// Start a minimal "stratum" mining service on a separate thread + pub fn start_stratum_server(&self, config: pow::types::MinerConfig) { + let cuckoo_size = global::sizeshift(); + let proof_size = global::proofsize(); + let currently_syncing = self.currently_syncing.clone(); + + let mut stratum_server = stratumserver::StratumServer::new( + config.clone(), + self.chain.clone(), + self.tx_pool.clone(), + ); + let stratum_stats = self.state_info.stratum_stats.clone(); + let _ = thread::Builder::new() + .name("stratum_server".to_string()) + .spawn(move || { + let secs_5 = time::Duration::from_secs(5); + while currently_syncing.load(Ordering::Relaxed) { + thread::sleep(secs_5); + } + stratum_server.run_loop(config.clone(), stratum_stats, cuckoo_size as u32, proof_size); + }); + } + /// The chain head pub fn head(&self) -> chain::Tip { self.chain.head().unwrap() @@ -292,6 +324,7 @@ impl Server { /// consumers pub fn get_server_stats(&self) -> Result { let mining_stats = self.state_info.mining_stats.read().unwrap().clone(); + let stratum_stats = self.state_info.stratum_stats.read().unwrap().clone(); let awaiting_peers = self.state_info.awaiting_peers.load(Ordering::Relaxed); // Fill out stats on our current difficulty calculation @@ -359,6 +392,7 @@ impl Server { is_syncing: self.currently_syncing.load(Ordering::Relaxed), awaiting_peers: awaiting_peers, mining_stats: mining_stats, + stratum_stats: stratum_stats, peer_stats: peer_stats, diff_stats: diff_stats, }) diff --git a/servers/src/lib.rs b/servers/src/lib.rs index a51eb47eb..18adfaa73 100644 --- a/servers/src/lib.rs +++ b/servers/src/lib.rs @@ -21,8 +21,10 @@ #![deny(unused_mut)] #![warn(missing_docs)] +extern crate bufstream; extern crate hyper; extern crate itertools; +extern crate jsonrpc_core; extern crate rand; extern crate serde; #[macro_use] diff --git a/servers/src/mining/mine_block.rs b/servers/src/mining/mine_block.rs new file mode 100644 index 000000000..222fae3c1 --- /dev/null +++ b/servers/src/mining/mine_block.rs @@ -0,0 +1,206 @@ +// Copyright 2018 The Grin Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Build a block to mine: gathers transactions from the pool, assembles +//! them into a block and returns it. + +use std::thread; +use std::sync::{Arc, RwLock}; +use time; +use std::time::Duration; +use rand::{self, Rng}; + +use chain; +use pool; +use core::consensus; +use core::core; +use core::core::Transaction; +use core::ser; +use keychain::{Identifier, Keychain}; +use wallet; +use wallet::BlockFees; +use util; +use util::LOGGER; +use common::types::Error; +use common::adapters::PoolToChainAdapter; + +// Ensure a block suitable for mining is built and returned +// If a wallet listener URL is not provided the reward will be "burnt" +// Warning: This call does not return until/unless a new block can be built +pub fn get_block( + chain: &Arc, + tx_pool: &Arc>>, + key_id: Option, + max_tx: u32, + wallet_listener_url: Option, +) -> (core::Block, BlockFees) { + // get the latest chain state and build a block on top of it + let mut result = build_block( + chain, + tx_pool, + key_id.clone(), + max_tx, + wallet_listener_url.clone(), + ); + while let Err(e) = result { + match e { + self::Error::Chain(chain::Error::DuplicateCommitment(_)) => { + debug!( + LOGGER, + "Duplicate commit for potential coinbase detected. Trying next derivation." + ); + } + ae => { + warn!(LOGGER, "Error building new block: {:?}. Retrying.", ae); + } + } + thread::sleep(Duration::from_millis(100)); + result = build_block( + chain, + tx_pool, + key_id.clone(), + max_tx, + wallet_listener_url.clone(), + ); + } + return result.unwrap(); +} + +/// Builds a new block with the chain head as previous and eligible +/// transactions from the pool. +fn build_block( + chain: &Arc, + tx_pool: &Arc>>, + key_id: Option, + max_tx: u32, + wallet_listener_url: Option, +) -> Result<(core::Block, BlockFees), Error> { + // prepare the block header timestamp + let head = chain.head_header().unwrap(); + let mut now_sec = time::get_time().sec; + let head_sec = head.timestamp.to_timespec().sec; + if now_sec <= head_sec { + now_sec = head_sec + 1; + } + + // get the difficulty our block should be at + let diff_iter = chain.difficulty_iter(); + let difficulty = consensus::next_difficulty(diff_iter).unwrap(); + + // extract current transaction from the pool + let txs_box = tx_pool + .read() + .unwrap() + .prepare_mineable_transactions(max_tx); + let txs: Vec<&Transaction> = txs_box.iter().map(|tx| tx.as_ref()).collect(); + + // build the coinbase and the block itself + let fees = txs.iter().map(|tx| tx.fee()).sum(); + let height = head.height + 1; + let block_fees = BlockFees { + fees, + key_id, + height, + }; + + let (output, kernel, block_fees) = get_coinbase(wallet_listener_url, block_fees)?; + let mut b = core::Block::with_reward(&head, txs, output, kernel, difficulty.clone())?; + + // making sure we're not spending time mining a useless block + b.validate(&head)?; + + let mut rng = rand::OsRng::new().unwrap(); + b.header.nonce = rng.gen(); + b.header.timestamp = time::at_utc(time::Timespec::new(now_sec, 0)); + + let b_difficulty = + (b.header.total_difficulty.clone() - head.total_difficulty.clone()).into_num(); + debug!( + LOGGER, + "Built new block with {} inputs and {} outputs, network difficulty: {}, cumulative difficulty {}", + b.inputs.len(), + b.outputs.len(), + b_difficulty, + b.header.clone().total_difficulty.clone().into_num(), + ); + + let roots_result = chain.set_txhashset_roots(&mut b, false); + + match roots_result { + Ok(_) => Ok((b, block_fees)), + + // If it's a duplicate commitment, it's likely trying to use + // a key that's already been derived but not in the wallet + // for some reason, allow caller to retry + Err(chain::Error::DuplicateCommitment(e)) => { + Err(Error::Chain(chain::Error::DuplicateCommitment(e))) + } + + //Some other issue, possibly duplicate kernel + Err(e) => { + error!( + LOGGER, + "Error setting txhashset root to build a block: {:?}", e + ); + Err(Error::Chain(chain::Error::Other(format!("{:?}", e)))) + } + } +} + +/// +/// Probably only want to do this when testing. +/// +fn burn_reward(block_fees: BlockFees) -> Result<(core::Output, core::TxKernel, BlockFees), Error> { + warn!(LOGGER, "Burning block fees: {:?}", block_fees); + let keychain = Keychain::from_random_seed().unwrap(); + let key_id = keychain.derive_key_id(1).unwrap(); + let (out, kernel) = + core::Block::reward_output(&keychain, &key_id, block_fees.fees, block_fees.height).unwrap(); + Ok((out, kernel, block_fees)) +} + +// Connect to the wallet listener and get coinbase. +// Warning: If a wallet listener URL is not provided the reward will be "burnt" +fn get_coinbase( + wallet_listener_url: Option, + block_fees: BlockFees, +) -> Result<(core::Output, core::TxKernel, BlockFees), Error> { + match wallet_listener_url { + None => { + // Burn it + return burn_reward(block_fees); + } + Some(wallet_listener_url) => { + // Get the wallet coinbase + let url = format!("{}/v1/receive/coinbase", wallet_listener_url.as_str()); + + let res = wallet::client::create_coinbase(&url, &block_fees)?; + + let out_bin = util::from_hex(res.output).unwrap(); + let kern_bin = util::from_hex(res.kernel).unwrap(); + let key_id_bin = util::from_hex(res.key_id).unwrap(); + let output = ser::deserialize(&mut &out_bin[..]).unwrap(); + let kernel = ser::deserialize(&mut &kern_bin[..]).unwrap(); + let key_id = ser::deserialize(&mut &key_id_bin[..]).unwrap(); + let block_fees = BlockFees { + key_id: Some(key_id), + ..block_fees + }; + + debug!(LOGGER, "get_coinbase: {:?}", block_fees); + + return Ok((output, kernel, block_fees)); + } + } +} diff --git a/servers/src/mining/miner.rs b/servers/src/mining/miner.rs index 60be7a866..06b8af2c9 100644 --- a/servers/src/mining/miner.rs +++ b/servers/src/mining/miner.rs @@ -12,41 +12,34 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Mining service, gathers transactions from the pool, assemble them in a -//! block and mine the block to produce a valid header with its proof-of-work. +//! Mining service, gets a block to mine, and based on mining configuration chooses +//! a version of the cuckoo miner to mine the block and produce a valid header with +//! its proof-of-work. Any valid mined blocks are submitted to the network. -use rand::{self, Rng}; use std::sync::{Arc, RwLock}; use std::sync::atomic::{AtomicBool, Ordering}; use std::thread; use std::time::Duration; use time; +use itertools::Itertools; use common::adapters::PoolToChainAdapter; use core::consensus; -use core::core; use core::core::Proof; -use core::core::{Block, BlockHeader, Transaction}; +use core::core::{Block, BlockHeader}; use core::core::hash::{Hash, Hashed}; use pow::{cuckoo, MiningWorker}; use pow::types::MinerConfig; +use pow::plugin::PluginMiner; use core::ser; use core::global; use core::ser::AsFixedBytes; use util::LOGGER; -use common::types::Error; use common::stats::MiningStats; use chain; use pool; -use util; -use keychain::{Identifier, Keychain}; -use wallet; -use wallet::BlockFees; - -use pow::plugin::PluginMiner; - -use itertools::Itertools; +use mining::mine_block; // Max number of transactions this miner will assemble in a block const MAX_TX: u32 = 5000; @@ -223,9 +216,8 @@ impl Miner { } info!( LOGGER, - "Mining: Cuckoo{} at {} gps (graphs per second)", - cuckoo_size, - sps_total); + "Mining: Cuckoo{} at {} gps (graphs per second)", cuckoo_size, sps_total + ); if sps_total.is_finite() { let mut mining_stats = mining_stats.write().unwrap(); mining_stats.combined_gps = sps_total; @@ -483,7 +475,7 @@ impl Miner { } // iteration, we keep the returned derivation to provide it back when - // nothing has changed + // nothing has changed. We only want to create a new key_id for each new block. let mut key_id = None; { @@ -498,22 +490,18 @@ impl Miner { // get the latest chain state and build a block on top of it let head = self.chain.head_header().unwrap(); let mut latest_hash = self.chain.head().unwrap().last_block_h; - - let mut result = self.build_block(&head, key_id.clone()); - while let Err(e) = result { - match e { - self::Error::Chain(chain::Error::DuplicateCommitment(_)) => { - debug!(LOGGER, "Duplicate commit for potential coinbase detected. Trying next derivation."); - } - ae => { - warn!(LOGGER, "Error building new block: {:?}. Retrying.", ae); - } - } - thread::sleep(Duration::from_millis(100)); - result = self.build_block(&head, key_id.clone()); + let mut wallet_listener_url: Option = None; + if !self.config.burn_reward { + wallet_listener_url = Some(self.config.wallet_listener_url.clone()); } - let (mut b, block_fees) = result.unwrap(); + let (mut b, block_fees) = mine_block::get_block( + &self.chain, + &self.tx_pool, + key_id.clone(), + MAX_TX.clone(), + wallet_listener_url, + ); { let mut mining_stats = mining_stats.write().unwrap(); mining_stats.block_height = b.header.height; @@ -595,127 +583,4 @@ impl Miner { } } } - - /// Builds a new block with the chain head as previous and eligible - /// transactions from the pool. - fn build_block( - &self, - head: &core::BlockHeader, - key_id: Option, - ) -> Result<(core::Block, BlockFees), Error> { - // prepare the block header timestamp - let mut now_sec = time::get_time().sec; - let head_sec = head.timestamp.to_timespec().sec; - if now_sec <= head_sec { - now_sec = head_sec + 1; - } - - // get the difficulty our block should be at - let diff_iter = self.chain.difficulty_iter(); - let difficulty = consensus::next_difficulty(diff_iter).unwrap(); - - // extract current transaction from the pool - let txs_box = self.tx_pool - .read() - .unwrap() - .prepare_mineable_transactions(MAX_TX); - let txs: Vec<&Transaction> = txs_box.iter().map(|tx| tx.as_ref()).collect(); - - // build the coinbase and the block itself - let fees = txs.iter().map(|tx| tx.fee()).sum(); - let height = head.height + 1; - let block_fees = BlockFees { - fees, - key_id, - height, - }; - - let (output, kernel, block_fees) = self.get_coinbase(block_fees)?; - let mut b = core::Block::with_reward(head, txs, output, kernel, difficulty.clone())?; - - debug!( - LOGGER, - "(Server ID: {}) Built new block with {} inputs and {} outputs, network difficulty: {}, cumulative difficulty {}", - self.debug_output_id, - b.inputs.len(), - b.outputs.len(), - difficulty.clone().into_num(), - b.header.clone().total_difficulty.clone().into_num(), - ); - - // making sure we're not spending time mining a useless block - b.validate(&head)?; - - let mut rng = rand::OsRng::new().unwrap(); - b.header.nonce = rng.gen(); - b.header.timestamp = time::at_utc(time::Timespec::new(now_sec, 0)); - - let roots_result = self.chain.set_txhashset_roots(&mut b, false); - - match roots_result { - Ok(_) => Ok((b, block_fees)), - - // If it's a duplicate commitment, it's likely trying to use - // a key that's already been derived but not in the wallet - // for some reason, allow caller to retry - Err(chain::Error::DuplicateCommitment(e)) => { - Err(Error::Chain(chain::Error::DuplicateCommitment(e))) - } - - //Some other issue, possibly duplicate kernel - Err(e) => { - error!( - LOGGER, - "Error setting txhashset root to build a block: {:?}", e - ); - Err(Error::Chain(chain::Error::Other(format!("{:?}", e)))) - } - } - } - - /// - /// Probably only want to do this when testing. - /// - fn burn_reward( - &self, - block_fees: BlockFees, - ) -> Result<(core::Output, core::TxKernel, BlockFees), Error> { - let keychain = Keychain::from_random_seed().unwrap(); - let key_id = keychain.derive_key_id(1).unwrap(); - let (out, kernel) = - core::Block::reward_output(&keychain, &key_id, block_fees.fees, block_fees.height) - .unwrap(); - Ok((out, kernel, block_fees)) - } - - fn get_coinbase( - &self, - block_fees: BlockFees, - ) -> Result<(core::Output, core::TxKernel, BlockFees), Error> { - if self.config.burn_reward { - self.burn_reward(block_fees) - } else { - let url = format!( - "{}/v1/receive/coinbase", - self.config.wallet_listener_url.as_str() - ); - - let res = wallet::client::create_coinbase(&url, &block_fees)?; - - let out_bin = util::from_hex(res.output).unwrap(); - let kern_bin = util::from_hex(res.kernel).unwrap(); - let key_id_bin = util::from_hex(res.key_id).unwrap(); - let output = ser::deserialize(&mut &out_bin[..]).unwrap(); - let kernel = ser::deserialize(&mut &kern_bin[..]).unwrap(); - let key_id = ser::deserialize(&mut &key_id_bin[..]).unwrap(); - let block_fees = BlockFees { - key_id: Some(key_id), - ..block_fees - }; - - debug!(LOGGER, "get_coinbase: {:?}", block_fees); - - Ok((output, kernel, block_fees)) - } - } } diff --git a/servers/src/mining/mod.rs b/servers/src/mining/mod.rs index 451ac09aa..6b367bd98 100644 --- a/servers/src/mining/mod.rs +++ b/servers/src/mining/mod.rs @@ -15,3 +15,5 @@ //! Mining + Mining server pub mod miner; +pub mod stratumserver; +mod mine_block; diff --git a/servers/src/mining/stratumserver.rs b/servers/src/mining/stratumserver.rs new file mode 100644 index 000000000..53c8b6932 --- /dev/null +++ b/servers/src/mining/stratumserver.rs @@ -0,0 +1,574 @@ +// Copyright 2018 The Grin Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Mining Stratum Server +use std::thread; +use std::time::Duration; +use std::net::{TcpListener, TcpStream}; +use std::io::{ErrorKind, Write}; +use std::error::Error; +use time; +use util::LOGGER; +use std::io::BufRead; +use bufstream::BufStream; +use std::sync::{Arc, Mutex, RwLock}; +use serde_json; +use std::time::SystemTime; + +use common::adapters::PoolToChainAdapter; +use core::core::{Block, BlockHeader}; +use pow::types::MinerConfig; +use mining::miner::*; +use mining::mine_block; +use chain; +use pool; +use common::stats::{StratumStats, WorkerStats}; + +// Max number of transactions this miner will assemble in a block +const MAX_TX: u32 = 5000; + +// ---------------------------------------- +// http://www.jsonrpc.org/specification +// RPC Methods + +#[derive(Serialize, Deserialize, Debug)] +struct RpcRequest { + id: String, + jsonrpc: String, + method: String, + params: Option, +} + +#[derive(Serialize, Deserialize, Debug)] +struct RpcResponse { + id: String, + jsonrpc: String, + result: Option, + error: Option, +} + +#[derive(Serialize, Deserialize, Debug)] +struct RpcError { + code: i32, + message: String, +} + +#[derive(Serialize, Deserialize, Debug)] +struct LoginParams { + login: String, + pass: String, + agent: String, +} + +#[derive(Serialize, Deserialize, Debug)] +struct SubmitParams { + height: u64, + nonce: u64, + pow: Vec, +} + +// ---------------------------------------- +// Worker Factory Thread Function + +// Run in a thread. Adds new connections to the workers list +fn accept_workers(id: String, address: String, workers: &mut Arc>>, stratum_stats: &mut Arc>) { + let listener = TcpListener::bind(address).expect("Failed to bind to listen address"); + let mut worker_id: u32 = 0; + for stream in listener.incoming() { + match stream { + Ok(stream) => { + warn!( + LOGGER, + "(Server ID: {}) New connection: {}", + id, + stream.peer_addr().unwrap() + ); + stream + .set_nonblocking(true) + .expect("set_nonblocking call failed"); + let mut worker = Worker::new(worker_id.to_string(), BufStream::new(stream)); + workers.lock().unwrap().push(worker); + // stats for this worker (worker stat objects are added and updated but never removed) + let mut worker_stats = WorkerStats::default(); + worker_stats.is_connected = true; + worker_stats.id = worker_id.to_string(); + worker_stats.pow_difficulty = 1; // XXX TODO + let mut stratum_stats = stratum_stats.write().unwrap(); + stratum_stats.worker_stats.push(worker_stats); + worker_id = worker_id + 1; + } + Err(e) => { + warn!( + LOGGER, + "(Server ID: {}) Error accepting connection: {:?}", id, e + ); + } + } + } + // close the socket server + drop(listener); +} + +// ---------------------------------------- +// Worker Object - a connected stratum client - a miner, pool, proxy, etc... + +#[derive(Serialize, Deserialize, Debug)] +pub struct JobTemplate { + pre_pow: String, +} + +pub struct Worker { + id: String, + stream: BufStream, + error: bool, + authenticated: bool, +} + +impl Worker { + /// Creates a new Stratum Worker. + pub fn new(id: String, stream: BufStream) -> Worker { + Worker { + id: id, + stream: stream, + error: false, + authenticated: false, + } + } + + // Get Message from the worker + fn read_message(&mut self) -> Option { + // Read and return a single message or None + let mut line = String::new(); + match self.stream.read_line(&mut line) { + Ok(_) => { + return Some(line); + } + Err(ref e) if e.kind() == ErrorKind::WouldBlock => { + // Not an error, just no messages ready + return None; + } + Err(e) => { + warn!( + LOGGER, + "(Server ID: {}) Error in connection with stratum client: {}", self.id, e + ); + self.error = true; + return None; + } + } + } + + // Send Message to the worker + fn send_message(&mut self, message_in: String) { + // Write and Flush the message + let mut message = message_in.clone(); + if !message.ends_with("\n") { + message += "\n"; + } + match self.stream.write(message.as_bytes()) { + Ok(_) => match self.stream.flush() { + Ok(_) => {} + Err(e) => { + warn!( + LOGGER, + "(Server ID: {}) Error in connection with stratum client: {}", self.id, e + ); + self.error = true; + } + }, + Err(e) => { + warn!( + LOGGER, + "(Server ID: {}) Error in connection with stratum client: {}", self.id, e + ); + self.error = true; + return; + } + } + } +} // impl Worker + +// ---------------------------------------- +// Grin Stratum Server + +pub struct StratumServer { + id: String, + config: MinerConfig, + chain: Arc, + tx_pool: Arc>>, + current_block: Block, + workers: Arc>>, +} + +impl StratumServer { + /// Creates a new Stratum Server. + pub fn new( + config: MinerConfig, + chain_ref: Arc, + tx_pool: Arc>>, + ) -> StratumServer { + StratumServer { + id: String::from("StratumServer"), + config: config, + chain: chain_ref, + tx_pool: tx_pool, + current_block: Block::default(), + workers: Arc::new(Mutex::new(Vec::new())), + } + } + + // Build and return a JobTemplate for mining the current block + fn build_block_template(&self, bh: BlockHeader) -> JobTemplate { + // Serialize the block header into pre and post nonce strings + let mut pre_pow_writer = HeaderPrePowWriter::default(); + bh.write_pre_pow(&mut pre_pow_writer).unwrap(); + let pre = pre_pow_writer.as_hex_string(false); + let job_template = JobTemplate { pre_pow: pre }; + return job_template; + } + + // Handle an RPC request message from the worker(s) + fn handle_rpc_requests(&mut self, stratum_stats: &mut Arc>) { + let mut workers_l = self.workers.lock().unwrap(); + for num in 0..workers_l.len() { + match workers_l[num].read_message() { + Some(the_message) => { + // Decompose the request from the JSONRpc wrapper + let request: RpcRequest = match serde_json::from_str(&the_message) { + Ok(request) => request, + Err(e) => { + // not a valid JSON RpcRequest - disconnect the worker + warn!( + LOGGER, + "(Server ID: {}) Failed to parse JSONRpc: {} - {:?}", + self.id, + e.description(), + the_message.as_bytes(), + ); + workers_l[num].error = true; + continue; + } + }; + + let mut stratum_stats = stratum_stats.write().unwrap(); + let worker_stats_id = stratum_stats.worker_stats.iter().position(|r| r.id == workers_l[num].id).unwrap(); + stratum_stats.worker_stats[worker_stats_id].last_seen = SystemTime::now(); + + // Call the handler function for requested method + let (response, err) = match request.method.as_str() { + "login" => { + let (response, err) = self.handle_login(request.params); + // XXX TODO Future? - Validate username and password + if err == false { + workers_l[num].authenticated = true; + } + (response, err) + } + "submit" => self.handle_submit(request.params, &mut stratum_stats.worker_stats[worker_stats_id]), + "keepalive" => self.handle_keepalive(), + "getjobtemplate" => { + let b = self.current_block.header.clone(); + self.handle_getjobtemplate(b) + } + "status" => self.handle_status(&stratum_stats.worker_stats[worker_stats_id]), + _ => { + // Called undefined method + let e = r#"{"code": -32601, "message": "Method not found"}"#; + let err = e.to_string(); + (err, true) + } + }; + + // Package the reply as RpcResponse json + let rpc_response: String; + if err == true { + let rpc_err: RpcError = serde_json::from_str(&response).unwrap(); + let resp = RpcResponse { + id: workers_l[num].id.clone(), + jsonrpc: String::from("2.0"), + result: None, + error: Some(rpc_err), + }; + rpc_response = serde_json::to_string(&resp).unwrap(); + } else { + let resp = RpcResponse { + id: workers_l[num].id.clone(), + jsonrpc: String::from("2.0"), + result: Some(response), + error: None, + }; + rpc_response = serde_json::to_string(&resp).unwrap(); + } + + // Send the reply + workers_l[num].send_message(rpc_response); + } + None => {} // No message for us from this worker + } + } + } + + // Handle STATUS message + fn handle_status(&self, worker_stats: &WorkerStats) -> (String, bool) { + // Return stratum and worker stats in json for use by a dashboard or healthcheck. + let response = serde_json::to_string(&worker_stats).unwrap(); + return (response, false); + } + + // Handle GETJOBTEMPLATE message + fn handle_getjobtemplate(&self, bh: BlockHeader) -> (String, bool) { + // Build a JobTemplate from a BlockHeader and return JSON + let job_template = self.build_block_template(bh); + let job_template_json = serde_json::to_string(&job_template).unwrap(); + return (job_template_json, false); + } + + // Handle KEEPALIVE message + fn handle_keepalive(&self) -> (String, bool) { + return (String::from("ok"), false); + } + + // Handle LOGIN message + fn handle_login(&self, params: Option) -> (String, bool) { + // Extract the params string into a LoginParams struct + let params_str = match params { + Some(val) => val, + None => String::from("{}"), + }; + let _login_params: LoginParams = match serde_json::from_str(¶ms_str) { + Ok(val) => val, + Err(_e) => { + let r = r#"{"code": -32600, "message": "Invalid Request"}"#; + return (String::from(r), true); + } + }; + return (String::from("ok"), false); + } + + // Handle SUBMIT message + // params contains a solved block header + // we are expecting real solutions at the full difficulty. + fn handle_submit(&self, params: Option, worker_stats: &mut WorkerStats) -> (String, bool) { + // Extract the params string into a SubmitParams struct + let params_str = match params { + Some(val) => val, + None => String::from("{}"), + }; + let submit_params: SubmitParams = match serde_json::from_str(¶ms_str) { + Ok(val) => val, + Err(_e) => { + let r = r#"{"code": -32600, "message": "Invalid Request"}"#; + return (String::from(r), true); + } + }; + + let mut b: Block; + if submit_params.height == self.current_block.header.height { + // Reconstruct the block header with this nonce and pow added + b = self.current_block.clone(); + b.header.nonce = submit_params.nonce; + b.header.pow.proof_size = submit_params.pow.len(); + b.header.pow.nonces = submit_params.pow; + info!( + LOGGER, + "(Server ID: {}) Found proof of work, adding block {}", + self.id, + b.hash() + ); + // Submit the block to grin server (known here as "self.miner") + let res = self.chain.process_block(b.clone(), chain::Options::MINE); + if let Err(e) = res { + error!( + LOGGER, + "(Server ID: {}) Error validating mined block: {:?}", self.id, e + ); + worker_stats.num_rejected += 1; + let e = r#"{"code": -1, "message": "Solution validation failed"}"#; + let err = e.to_string(); + return (err, true); + } + } else { + warn!( + LOGGER, + "(Server ID: {}) Found POW for block at height: {} - but too late", + self.id, + submit_params.height + ); + worker_stats.num_stale += 1; + let e = r#"{"code": -1, "message": "Solution submitted too late"}"#; + let err = e.to_string(); + return (err, true); + } + worker_stats.num_accepted += 1; + return (String::from("ok"), false); + } // handle submit a solution + + // Purge dead/sick workers - remove all workers marked in error state + fn clean_workers(&mut self, stratum_stats: &mut Arc>) { + let mut start = 0; + let mut workers_l = self.workers.lock().unwrap(); + loop { + for num in start..workers_l.len() { + if workers_l[num].error == true { + warn!( + LOGGER, + "(Server ID: {}) Dropping worker: {}", + self.id, + workers_l[num].id; + ); + // Update worker stats + let mut stratum_stats = stratum_stats.write().unwrap(); + let worker_stats_id = stratum_stats.worker_stats.iter().position(|r| r.id == workers_l[num].id).unwrap(); + stratum_stats.worker_stats[worker_stats_id].is_connected = false; + // Remove the dead worker + workers_l.remove(num); + break; + } + start = num + 1; + } + if start >= workers_l.len() { + let mut stratum_stats = stratum_stats.write().unwrap(); + stratum_stats.num_workers = workers_l.len(); + return; + } + } + } + + // Broadcast a jobtemplate RpcRequest to all connected workers + fn broadcast_job(&mut self) { + debug!( + LOGGER, + "(Server ID: {}) sending block {} to stratum clients", + self.id, + self.current_block.header.height + ); + + // Package new block into RpcRequest + let job_template = self.build_block_template(self.current_block.header.clone()); + let job_template_json = serde_json::to_string(&job_template).unwrap(); + let job_request = RpcRequest { + id: String::from("Stratum"), + jsonrpc: String::from("2.0"), + method: String::from("job"), + params: Some(job_template_json), + }; + let job_request_json = serde_json::to_string(&job_request).unwrap(); + + // Push the new block to all connected clients + let mut workers_l = self.workers.lock().unwrap(); + for num in 0..workers_l.len() { + workers_l[num].send_message(job_request_json.clone()); + } + } + + /// "main()" - Starts the stratum-server. Listens for a connection, then enters a + /// loop, building a new block on top of the existing chain anytime required and sending that to + /// the connected stratum miner, proxy, or pool, and accepts full solutions to be submitted. + pub fn run_loop(&mut self, miner_config: MinerConfig, stratum_stats: Arc>, cuckoo_size: u32, proof_size: usize) { + info!( + LOGGER, + "(Server ID: {}) Starting stratum server with cuckoo_size = {}, proof_size = {}", + self.id, + cuckoo_size, + proof_size + ); + + // "globals" for this function + let attempt_time_per_block = miner_config.attempt_time_per_block; + let mut deadline: i64 = 0; + // to prevent the wallet from generating a new HD key derivation for each + // iteration, we keep the returned derivation to provide it back when + // nothing has changed. We only want to create a key_id for each new block, + // and reuse it when we rebuild the current block to add new tx. + let mut key_id = None; + let mut head = self.chain.head().unwrap(); + let mut current_hash = head.prev_block_h; + let mut latest_hash; + let listen_addr = miner_config.stratum_server_addr.clone().unwrap(); + + // Start a thread to accept new worker connections + let mut workers_th = self.workers.clone(); + let id_th = self.id.clone(); + let mut stats_th = stratum_stats.clone(); + let _listener_th = thread::spawn(move || { + accept_workers(id_th, listen_addr, &mut workers_th, &mut stats_th); + }); + + // We have started + { + let mut stratum_stats = stratum_stats.write().unwrap(); + stratum_stats.is_running = true; + stratum_stats.cuckoo_size = cuckoo_size as u16; + } + + warn!( + LOGGER, + "Stratum server started on {}", + miner_config.stratum_server_addr.unwrap() + ); + + // Main Loop + loop { + // Remove workers with failed connections + self.clean_workers(&mut stratum_stats.clone()); + + // get the latest chain state + head = self.chain.head().unwrap(); + latest_hash = head.last_block_h; + + // Build a new block if: + // There is a new block on the chain + // or We are rebuilding the current one to include new transactions + if current_hash != latest_hash || time::get_time().sec >= deadline { + if current_hash != latest_hash { + // A brand new block, so we will generate a new key_id + key_id = None; + } + let mut wallet_listener_url: Option = None; + if !self.config.burn_reward { + wallet_listener_url = Some(self.config.wallet_listener_url.clone()); + } + + let (new_block, block_fees) = mine_block::get_block( + &self.chain, + &self.tx_pool, + key_id.clone(), + MAX_TX.clone(), + wallet_listener_url, + ); + self.current_block = new_block; + key_id = block_fees.key_id(); + current_hash = latest_hash; + // set a new deadline for rebuilding with fresh transactions + deadline = time::get_time().sec + attempt_time_per_block as i64; + + { + let mut stratum_stats = stratum_stats.write().unwrap(); + stratum_stats.block_height = self.current_block.header.height; + stratum_stats.network_difficulty = + (self.current_block.header.total_difficulty.clone() - head.total_difficulty.clone()).into_num(); + } + + // Send this job to all connected workers + self.broadcast_job(); + } + + // Handle any messages from the workers + self.handle_rpc_requests(&mut stratum_stats.clone()); + + // sleep before restarting loop + thread::sleep(Duration::from_millis(500)); + } // Main Loop + } // fn run_loop() +} // StratumServer diff --git a/servers/tests/framework/mod.rs b/servers/tests/framework/mod.rs index fbf41f626..45bef2a9e 100644 --- a/servers/tests/framework/mod.rs +++ b/servers/tests/framework/mod.rs @@ -557,3 +557,38 @@ impl LocalServerContainerPool { } } } + +/// Create and return a ServerConfig +pub fn config(n: u16, test_name_dir: &str, seed_n: u16) -> servers::ServerConfig { + servers::ServerConfig { + api_http_addr: format!("127.0.0.1:{}", 20000 + n), + db_root: format!("target/tmp/{}/grin-sync-{}", test_name_dir, n), + p2p_config: p2p::P2PConfig { + port: 10000 + n, + ..p2p::P2PConfig::default() + }, + seeding_type: servers::Seeding::List, + seeds: Some(vec![format!("127.0.0.1:{}", 10000 + seed_n)]), + chain_type: core::global::ChainTypes::AutomatedTesting, + archive_mode: Some(true), + skip_sync_wait: Some(true), + ..Default::default() + } +} + +/// Create and return a MinerConfig +pub fn miner_config() -> pow::types::MinerConfig { + let mut plugin_config = pow::types::CuckooMinerPluginConfig::default(); + let mut plugin_config_vec: Vec = Vec::new(); + plugin_config.type_filter = String::from("mean_cpu"); + plugin_config_vec.push(plugin_config); + + pow::types::MinerConfig { + enable_mining: true, + burn_reward: true, + miner_async_mode: Some(false), + miner_plugin_dir: None, + miner_plugin_config: Some(plugin_config_vec), + ..Default::default() + } +} diff --git a/servers/tests/simulnet.rs b/servers/tests/simulnet.rs index daa10e74b..9ed640518 100644 --- a/servers/tests/simulnet.rs +++ b/servers/tests/simulnet.rs @@ -23,6 +23,8 @@ extern crate grin_wallet as wallet; mod framework; +use std::fs; +use std::sync::Arc; use std::thread; use std::time; use std::default::Default; @@ -31,7 +33,7 @@ use core::global; use core::global::ChainTypes; use framework::{LocalServerContainerConfig, LocalServerContainerPool, - LocalServerContainerPoolConfig}; + LocalServerContainerPoolConfig, config, miner_config}; /// Testing the frameworks by starting a fresh server, creating a genesis /// Block and mining into a wallet for a bit @@ -186,7 +188,7 @@ fn simulate_block_propagation() { // instantiates 5 servers on different ports let mut servers = vec![]; for n in 0..5 { - let s = servers::Server::new(config(10 * n, test_name_dir, 0)).unwrap(); + let s = servers::Server::new(framework::config(10 * n, test_name_dir, 0)).unwrap(); servers.push(s); thread::sleep(time::Duration::from_millis(100)); } @@ -226,13 +228,13 @@ fn simulate_full_sync() { let test_name_dir = "grin-sync"; framework::clean_all_output(test_name_dir); - let s1 = servers::Server::new(config(1000, "grin-sync", 1000)).unwrap(); + let s1 = servers::Server::new(framework::config(1000, "grin-sync", 1000)).unwrap(); // mine a few blocks on server 1 s1.start_miner(miner_config()); thread::sleep(time::Duration::from_secs(8)); #[ignore(unused_mut)] // mut needed? - let mut conf = config(1001, "grin-sync", 1000); + let mut conf = framework::config(1001, "grin-sync", 1000); let s2 = servers::Server::new(conf).unwrap(); while s2.head().height < 4 { thread::sleep(time::Duration::from_millis(100)); @@ -253,7 +255,7 @@ fn simulate_fast_sync() { let test_name_dir = "grin-fast"; framework::clean_all_output(test_name_dir); - let s1 = servers::Server::new(config(2000, "grin-fast", 2000)).unwrap(); + let s1 = servers::Server::new(framework::config(2000, "grin-fast", 2000)).unwrap(); // mine a few blocks on server 1 s1.start_miner(miner_config()); thread::sleep(time::Duration::from_secs(8)); @@ -280,7 +282,7 @@ fn simulate_fast_sync_double() { framework::clean_all_output("grin-double-fast1"); framework::clean_all_output("grin-double-fast2"); - let s1 = servers::Server::new(config(3000, "grin-double-fast1", 3000)).unwrap(); + let s1 = servers::Server::new(framework::config(3000, "grin-double-fast1", 3000)).unwrap(); // mine a few blocks on server 1 s1.start_miner(miner_config()); thread::sleep(time::Duration::from_secs(8)); @@ -308,36 +310,3 @@ fn simulate_fast_sync_double() { s1.stop(); s2.stop(); } - -fn config(n: u16, test_name_dir: &str, seed_n: u16) -> servers::ServerConfig { - servers::ServerConfig { - api_http_addr: format!("127.0.0.1:{}", 20000 + n), - db_root: format!("target/tmp/{}/grin-sync-{}", test_name_dir, n), - p2p_config: p2p::P2PConfig { - port: 10000 + n, - ..p2p::P2PConfig::default() - }, - seeding_type: servers::Seeding::List, - seeds: Some(vec![format!("127.0.0.1:{}", 10000 + seed_n)]), - chain_type: core::global::ChainTypes::AutomatedTesting, - archive_mode: Some(true), - skip_sync_wait: Some(true), - ..Default::default() - } -} - -fn miner_config() -> pow::types::MinerConfig { - let mut plugin_config = pow::types::CuckooMinerPluginConfig::default(); - let mut plugin_config_vec: Vec = Vec::new(); - plugin_config.type_filter = String::from("mean_cpu"); - plugin_config_vec.push(plugin_config); - - pow::types::MinerConfig { - enable_mining: true, - burn_reward: true, - miner_async_mode: Some(false), - miner_plugin_dir: None, - miner_plugin_config: Some(plugin_config_vec), - ..Default::default() - } -} diff --git a/servers/tests/stratum.rs b/servers/tests/stratum.rs new file mode 100644 index 000000000..0522e1af6 --- /dev/null +++ b/servers/tests/stratum.rs @@ -0,0 +1,150 @@ +// Copyright 2018 The Grin Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +extern crate grin_api as api; +extern crate grin_chain as chain; +extern crate grin_core as core; +extern crate grin_servers as servers; +extern crate grin_p2p as p2p; +extern crate grin_pow as pow; +extern crate grin_util as util; +extern crate grin_wallet as wallet; + +extern crate bufstream; +extern crate serde_json; + +mod framework; + +use std::io::prelude::*; +use std::net::TcpStream; +use bufstream::BufStream; +use serde_json::Value; + +use std::thread; +use std::time; + +use core::global; +use core::global::ChainTypes; + +use framework::{config, miner_config}; + +// Create a grin server, and a stratum server. +// Simulate a few JSONRpc requests and verify the results. +// Validate disconnected workers +// Validate broadcasting new jobs +#[test] +fn basic_stratum_server() { + util::init_test_logger(); + global::set_mining_mode(ChainTypes::AutomatedTesting); + + let test_name_dir = "stratum_server"; + framework::clean_all_output(test_name_dir); + + // Create a server + let s = servers::Server::new(config(4000, test_name_dir, 0)).unwrap(); + + // Get mining config with stratumserver enabled + let mut miner_cfg = miner_config(); + miner_cfg.enable_mining = false; + miner_cfg.attempt_time_per_block = 999; + miner_cfg.enable_stratum_server = true; + miner_cfg.stratum_server_addr = Some(String::from("127.0.0.1:11101")); + + // Start stratum server + s.start_stratum_server(miner_cfg); + + // Wait for stratum server to start and + // Verify stratum server accepts connections + loop { + if let Ok(_stream) = TcpStream::connect("127.0.0.1:11101") { + break; + } else { + thread::sleep(time::Duration::from_millis(500)); + } + // As this stream falls out of scope it will be disconnected + } + + // Create a few new worker connections + let mut workers = vec![]; + for _n in 0..5 { + let w = TcpStream::connect("127.0.0.1:11101").unwrap(); + w.set_nonblocking(true) + .expect("Failed to set TcpStream to non-blocking"); + let stream = BufStream::new(w); + workers.push(stream); + } + assert!(workers.len() == 5); + + // Simulate a worker lost connection + workers.remove(4); + + // Swallow the genesis block + thread::sleep(time::Duration::from_secs(1)); // Wait for the server to broadcast + let mut response = String::new(); + for n in 0..workers.len() { + let _result = workers[n].read_line(&mut response); + } + + // Verify a few stratum JSONRpc commands + // getjobtemplate - expected block template result + let mut response = String::new(); + let job_req = "{\"id\": \"Stratum\", \"jsonrpc\": \"2.0\", \"method\": \"getjobtemplate\"}\n"; + workers[2].write(job_req.as_bytes()).unwrap(); + workers[2].flush().unwrap(); + thread::sleep(time::Duration::from_secs(1)); // Wait for the server to reply + match workers[2].read_line(&mut response) { + Ok(_) => { + let r: Value = serde_json::from_str(&response).unwrap(); + assert_eq!(r["error"], serde_json::Value::Null); + assert_ne!(r["result"], serde_json::Value::Null); + } + Err(_e) => { + assert!(false); + } + } + + // keepalive - expected "ok" result + let mut response = String::new(); + let job_req = "{\"id\":\"3\",\"jsonrpc\":\"2.0\",\"method\":\"keepalive\"}\n"; + let ok_resp = "{\"id\":\"3\",\"jsonrpc\":\"2.0\",\"result\":\"ok\",\"error\":null}\n"; + workers[2].write(job_req.as_bytes()).unwrap(); + workers[2].flush().unwrap(); + thread::sleep(time::Duration::from_secs(1)); // Wait for the server to reply + let _st = workers[2].read_line(&mut response); + assert_eq!(response.as_str(), ok_resp); + + // "doesnotexist" - error expected + let mut response = String::new(); + let job_req = "{\"id\":\"4\",\"jsonrpc\":\"2.0\",\"method\":\"doesnotexist\"}\n"; + let ok_resp = "{\"id\":\"4\",\"jsonrpc\":\"2.0\",\"result\":null,\"error\":{\"code\":-32601,\"message\":\"Method not found\"}}\n"; + workers[3].write(job_req.as_bytes()).unwrap(); + workers[3].flush().unwrap(); + thread::sleep(time::Duration::from_secs(1)); // Wait for the server to reply + let _st = workers[3].read_line(&mut response); + assert_eq!(response.as_str(), ok_resp); + + // Simulate a worker lost connection + workers.remove(1); + + // Start mining blocks + s.start_miner(miner_config()); + + // Verify blocks are being broadcast to workers + let expected = String::from("job"); + thread::sleep(time::Duration::from_secs(3)); // Wait for a few mined blocks + let mut jobtemplate = String::new(); + let _st = workers[2].read_line(&mut jobtemplate); + let job_template: Value = serde_json::from_str(&jobtemplate).unwrap(); + assert_eq!(job_template["method"], expected); +}