diff --git a/Cargo.lock b/Cargo.lock index 9e0e258..7dc537b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3856,8 +3856,9 @@ dependencies = [ "tls-api", "tls-api-native-tls", "tls-api-openssl", + "tokio 0.2.25", "tokio 1.38.0", - "tokio-util 0.7.11", + "tokio-util 0.2.0", "toml 0.8.14", "tor-config", "tor-hscrypto", diff --git a/Cargo.toml b/Cargo.toml index 0b713c5..0710977 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -94,7 +94,8 @@ tls-api = "0.9.0" tls-api-native-tls = "0.9.0" ## stratum server -tokio-util = { version = "0.7.8", features = ["codec"] } +tokio-old = {version = "0.2", features = ["full"], package = "tokio" } +tokio-util-old = { version = "0.2", features = ["codec"], package = "tokio-util" } [target.'cfg(all(not(target_os = "windows"), not(target_os = "android")))'.dependencies] eye = { version = "0.5.0", default-features = false } diff --git a/src/gui/views/network/setup/stratum.rs b/src/gui/views/network/setup/stratum.rs index 0627ef0..046c412 100644 --- a/src/gui/views/network/setup/stratum.rs +++ b/src/gui/views/network/setup/stratum.rs @@ -134,7 +134,7 @@ impl StratumSetup { ui.vertical_centered(|ui| { // Show loading indicator or controls to start/stop stratum server. - if Node::get_sync_status().unwrap() == SyncStatus::NoSync && + if Node::get_sync_status().unwrap_or(SyncStatus::Initial) == SyncStatus::NoSync && self.is_port_available && self.wallet_name.is_some() { if Node::is_stratum_starting() || Node::is_stratum_stopping() { ui.vertical_centered(|ui| { @@ -147,6 +147,8 @@ impl StratumSetup { let disable_text = format!("{} {}", POWER, t!("network_settings.disable")); View::action_button(ui, disable_text, || { Node::stop_stratum(); + let (ip, port) = NodeConfig::get_stratum_address(); + self.is_port_available = NodeConfig::is_stratum_port_available(&ip, &port); }); ui.add_space(6.0); } else { diff --git a/src/node/mine_block.rs b/src/node/mine_block.rs index bdef222..d5594ca 100644 --- a/src/node/mine_block.rs +++ b/src/node/mine_block.rs @@ -84,7 +84,7 @@ pub fn get_block( while let Err(e) = result { let mut new_key_id = key_id.to_owned(); match e { - self::Error::Chain(c) => match c { + Error::Chain(c) => match c { grin_chain::Error::DuplicateCommitment(_) => { debug!( "Duplicate commit for potential coinbase detected. Trying next derivation." @@ -93,10 +93,10 @@ pub fn get_block( new_key_id = None; } _ => { - error!("Chain Error: {}", c); + error!("Chain Error: {:?}", c); } }, - self::Error::WalletComm(_) => { + Error::WalletComm(_) => { error!( "Error building new block: Can't connect to wallet listener at {:?}; will retry", wallet_listener_url.as_ref().unwrap() @@ -233,11 +233,11 @@ fn burn_reward(block_fees: BlockFees) -> Result<(Output, TxKernel, BlockFees), E fn get_coinbase( wallet_listener_url: Option, block_fees: BlockFees, -) -> Result<(core::Output, core::TxKernel, BlockFees), Error> { - match wallet_listener_url { +) -> Result<(Output, TxKernel, BlockFees), Error> { + return match wallet_listener_url { None => { // Burn it - return burn_reward(block_fees); + burn_reward(block_fees) } Some(wallet_listener_url) => { let res = create_coinbase(&wallet_listener_url, &block_fees)?; @@ -245,12 +245,12 @@ fn get_coinbase( let kernel = res.kernel; let key_id = res.key_id; let block_fees = BlockFees { - key_id: key_id, + key_id, ..block_fees }; debug!("get_coinbase: {:?}", block_fees); - return Ok((output, kernel, block_fees)); + Ok((output, kernel, block_fees)) } } } @@ -273,7 +273,7 @@ fn create_coinbase(dest: &str, block_fees: &BlockFees) -> Result let timeout = grin_api::client::TimeOut::default(); let res: String = grin_api::client::send_request(req, timeout).map_err(|e| { let report = format!( - "Failed to get coinbase from {}. Is the wallet listening? {}", + "Failed to get coinbase from {}. Is the wallet listening? {:?}", dest, e ); error!("{}", report); diff --git a/src/node/node.rs b/src/node/node.rs index 4f081a6..48f10b2 100644 --- a/src/node/node.rs +++ b/src/node/node.rs @@ -616,10 +616,15 @@ pub fn start_stratum_mining_server(server: &Server, config: StratumServerConfig) ); let stop_state = NODE_STATE.stratum_stop_state.clone(); stop_state.reset(); - let _ = thread::Builder::new() - .name("stratum_server".to_string()) - .spawn(move || { + let server_state = stop_state.clone(); + thread::spawn(move || { stratum_server.run_loop(proof_size, sync_state, stop_state); + server_state.reset(); + // Reset stratum stats. + { + let mut w_stratum_stats = NODE_STATE.stratum_stats.write(); + *w_stratum_stats = StratumStats::default(); + } }); } diff --git a/src/node/stratum.rs b/src/node/stratum.rs index 5ad9c58..c72ee56 100644 --- a/src/node/stratum.rs +++ b/src/node/stratum.rs @@ -14,25 +14,23 @@ //! Mining Stratum Server - - use futures::channel::mpsc; use futures::pin_mut; use futures::{SinkExt, StreamExt, TryStreamExt}; -use tokio::net::TcpListener; -use tokio::runtime::Runtime; -use tokio_util::codec::{Framed, LinesCodec}; +use tokio_old::net::TcpListener; +use tokio_old::runtime::Runtime; +use tokio_util_old::codec::{Framed, LinesCodec}; use grin_util::RwLock; use chrono::prelude::Utc; use serde_json::Value; use std::collections::HashMap; -use std::net::{SocketAddr, TcpStream}; -use std::panic::panic_any; +use std::net::SocketAddr; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::thread; use std::time::{Duration, SystemTime}; +use futures::future::{abortable, AbortHandle}; use grin_chain::{self, SyncState}; use grin_servers::common::stats::{StratumStats, WorkerStats}; @@ -40,18 +38,18 @@ use grin_servers::common::types::StratumServerConfig; use grin_core::consensus::graph_weight; use grin_core::core::hash::Hashed; use grin_core::core::Block; -use grin_core::global; +use grin_core::global::min_edge_bits; use grin_core::{pow, ser}; -use crate::node::mine_block; use grin_util::ToHex; use grin_servers::ServerTxPool; -use log::{debug, error, info, warn}; + +use log::{debug, error}; use serde_derive::{Deserialize, Serialize}; +use crate::node::mine_block::get_block; use crate::wallet::WalletConfig; type Tx = mpsc::UnboundedSender; - // ---------------------------------------- // http://www.jsonrpc.org/specification // RPC Methods @@ -144,7 +142,7 @@ impl From for RpcError T: std::error::Error, { fn from(e: T) -> Self { - error!("Received unhandled error: {}", e); + println!("Received unhandled error: {}", e); RpcError::internal_error() } } @@ -354,7 +352,7 @@ impl Handler { // Build a JobTemplate from a BlockHeader and return JSON let job_template = self.build_block_template(); let response = serde_json::to_value(&job_template).unwrap(); - debug!( + println!( "(Server ID: {}) sending block {} with id {} to single worker", self.id, job_template.height, job_template.job_id, ); @@ -408,7 +406,7 @@ impl Handler { || b.is_none() { // Return error status - error!( + println!( "(Server ID: {}) Share at height {}, edge_bits {}, nonce {}, job_id {} submitted too late", self.id, params.height, params.edge_bits, params.nonce, params.job_id, ); @@ -428,7 +426,7 @@ impl Handler { if !b.header.pow.is_primary() && !b.header.pow.is_secondary() { // Return error status - error!( + println!( "(Server ID: {}) Failed to validate solution at height {}, hash {}, edge_bits {}, nonce {}, job_id {}: cuckoo size too small", self.id, params.height, b.hash(), params.edge_bits, params.nonce, params.job_id, ); @@ -445,7 +443,7 @@ impl Handler { // If the difficulty is too low its an error if unscaled_share_difficulty < state.minimum_share_difficulty { // Return error status - error!( + println!( "(Server ID: {}) Share at height {}, hash {}, edge_bits {}, nonce {}, job_id {} rejected due to low difficulty: {}/{}", self.id, params.height, b.hash(), params.edge_bits, params.nonce, params.job_id, unscaled_share_difficulty, state.minimum_share_difficulty, ); @@ -460,8 +458,8 @@ impl Handler { let res = self.chain.process_block(b.clone(), grin_chain::Options::MINE); if let Err(e) = res { // Return error status - error!( - "(Server ID: {}) Failed to validate solution at height {}, hash {}, edge_bits {}, nonce {}, job_id {}, {}", + println!( + "(Server ID: {}) Failed to validate solution at height {}, hash {}, edge_bits {}, nonce {}, job_id {}, {:?}", self.id, params.height, b.hash(), @@ -480,7 +478,7 @@ impl Handler { self.workers.stratum_stats.write().blocks_found += 1; // Log message to make it obvious we found a block let stats = self.workers.get_stats(worker_id)?; - warn!( + println!( "(Server ID: {}) Solution Found for block {}, hash {} - Yay!!! Worker ID: {}, blocks found: {}, shares: {}", self.id, params.height, b.hash(), @@ -493,7 +491,7 @@ impl Handler { let res = pow::verify_size(&b.header); if res.is_err() { // Return error status - error!( + println!( "(Server ID: {}) Failed to validate share at height {}, hash {}, edge_bits {}, nonce {}, job_id {}. {:?}", self.id, params.height, @@ -516,7 +514,7 @@ impl Handler { Some(login) => login, }; - info!( + println!( "(Server ID: {}) Got share at height {}, hash {}, edge_bits {}, nonce {}, job_id {}, difficulty {}/{}, submitted by {}", self.id, b.header.height, @@ -531,7 +529,7 @@ impl Handler { self.workers .update_stats(worker_id, |worker_stats| worker_stats.num_accepted += 1); let submit_response = if share_is_block { - format!("block found - {}", b.hash().to_hex()) + format!("blockfound - {}", b.hash().to_hex()) } else { "ok".to_string() }; @@ -542,6 +540,7 @@ impl Handler { } // handle submit a solution fn broadcast_job(&self) { + debug!("broadcast job"); // Package new block into RpcRequest let job_template = self.build_block_template(); let job_template_json = serde_json::to_string(&job_template).unwrap(); @@ -561,7 +560,8 @@ impl Handler { self.workers.broadcast(job_request_json); } - pub fn run(&self, config: &StratumServerConfig, + pub fn run(&self, + config: &StratumServerConfig, tx_pool: &ServerTxPool, stop_state: Arc) { debug!("Run main loop"); @@ -569,11 +569,10 @@ impl Handler { let mut head = self.chain.head().unwrap(); let mut current_hash = head.prev_block_h; loop { - // Stop main loop on stratum stop. if stop_state.is_stopped() { - panic_any("Stopped"); + thread::sleep(Duration::from_millis(1500)); + break; } - // get the latest chain state head = self.chain.head().unwrap(); let latest_hash = head.last_block_h; @@ -585,6 +584,7 @@ impl Handler { && self.workers.count() > 0 { { + debug!("resend updated block"); let mut state = self.current_state.write(); let wallet_listener_url = if !config.burn_reward { if let Ok(id) = config.wallet_listener_url.parse::() { @@ -604,7 +604,7 @@ impl Handler { let clear_blocks = current_hash != latest_hash; // Build the new block (version) - let (new_block, block_fees) = mine_block::get_block( + let (new_block, block_fees) = get_block( &self.chain, tx_pool, state.current_key_id.clone(), @@ -651,104 +651,79 @@ impl Handler { // ---------------------------------------- // Worker Factory Thread Function -async fn accept_connections(listen_addr: SocketAddr, - handler: Arc, - stop_state: Arc) { - info!("Start tokio stratum server"); +fn accept_connections(listen_addr: SocketAddr, + handler: Arc, + stop_state: Arc) { + debug!("Start tokio stratum server"); + let task = async move { + let mut listener = TcpListener::bind(&listen_addr).await.unwrap_or_else(|_| { + panic!("Stratum: Failed to bind to listen address {}", listen_addr) + }); + let server = listener + .incoming() + .filter_map(|s| async { s.map_err(|e| error!("accept error = {:?}", e)).ok() }) + .for_each(move |socket| { + let handler = handler.clone(); + async move { + // Spawn a task to process the connection + let (tx, mut rx) = mpsc::unbounded(); - let _state_check = stop_state.clone(); + let worker_id = handler.workers.add_worker(tx); + debug!("Worker {} connected", worker_id); - // let task = async move { - // - // }; + let framed = Framed::new(socket, LinesCodec::new()); + let (mut writer, mut reader) = framed.split(); - let listener = TcpListener::bind(&listen_addr).await.unwrap_or_else(|_| { - panic!("Stratum: Failed to bind to listen address {}", listen_addr) - }); - let stop_socket = &stop_state.clone(); - loop { - let (socket, _) = listener.accept().await.unwrap(); - // Stop listener on stratum stop. - { - if stop_socket.is_stopped() { - break; - } - } - - let handler = handler.clone(); - - let process = || async move { - // Spawn a task to process the connection - let (tx, mut rx) = mpsc::unbounded(); - - let worker_id = handler.workers.add_worker(tx); - info!("Worker {} connected", worker_id); - - let framed = Framed::new(socket, LinesCodec::new()); - let (mut writer, mut reader) = framed.split(); - - let h = handler.clone(); - let stop_read = stop_socket.clone(); - let read = async move { - while let Some(line) = reader - .try_next() - .await - .map_err(|e| error!("error reading line: {}", e))? - { - // Stop read on stratum stop. - { - if stop_read.is_stopped() { - break; + let h = handler.clone(); + let read = async move { + while let Some(line) = reader + .try_next() + .await + .map_err(|e| debug!("error reading line: {}", e))? + { + let request = serde_json::from_str(&line) + .map_err(|e| debug!("error serializing line: {}", e))?; + let resp = h.handle_rpc_requests(request, worker_id); + h.workers.send_to(worker_id, resp); } - } - let request = serde_json::from_str(&line) - .map_err(|e| error!("error serializing line: {}", e))?; - let resp = h.handle_rpc_requests(request, worker_id); - h.workers.send_to(worker_id, resp); - } - Result::<_, ()>::Ok(()) - }; + Result::<_, ()>::Ok(()) + }; - let stop_write = stop_socket.clone(); - let write = async move { - while let Some(line) = rx.next().await { - // Stop write on stratum stop. - { - if stop_write.is_stopped() { - break; + let write = async move { + while let Some(line) = rx.next().await { + writer + .send(line) + .await + .map_err(|e| debug!("error writing line: {}", e))?; } - } - writer - .send(line) - .await - .map_err(|e| error!("error writing line: {}", e))?; + + Result::<_, ()>::Ok(()) + }; + + let task = async move { + pin_mut!(read, write); + futures::future::select(read, write).await; + handler.workers.remove_worker(worker_id); + debug!("Worker {} disconnected", worker_id); + }; + tokio_old::spawn(task); } - Result::<_, ()>::Ok(()) - }; + }); + server.await + }; - let task = async move { - pin_mut!(read, write); - futures::future::select(read, write).await; - handler.workers.remove_worker(worker_id); - info!("Worker {} disconnected", worker_id); - }; - tokio::spawn(task); - - Result::<_, ()>::Ok(()) - }; - - let _ = (process)().await; - } + let mut rt = Runtime::new().unwrap(); + let (task, handle) = abortable(task); + rt.spawn(check_stop_state(stop_state, handle)); + rt.block_on(task).unwrap(); } -async fn check_stop_state(stop_state: Arc, listen_addr: SocketAddr) { +async fn check_stop_state(stop_state: Arc, handle: AbortHandle) { loop { // Ping stratum socket on stop to handle TcpListener unbind. if stop_state.is_stopped() { - thread::spawn(move || { - let _ = TcpStream::connect(listen_addr).unwrap(); - }); + handle.abort(); break; } thread::sleep(Duration::from_millis(1000)); @@ -944,10 +919,11 @@ impl StratumServer { /// existing chain anytime required and sending that to the connected /// stratum miner, proxy, or pool, and accepts full solutions to /// be submitted. - pub fn run_loop(&mut self, proof_size: usize, + pub fn run_loop(&mut self, + proof_size: usize, sync_state: Arc, stop_state: Arc) { - info!( + debug!( "(Server ID: {}) Starting stratum server with proof_size = {}", self.id, proof_size ); @@ -963,55 +939,28 @@ impl StratumServer { .expect("Stratum: Incorrect address "); let handler = Arc::new(Handler::from_stratum(&self)); + let h = handler.clone(); + + let stop_socket = stop_state.clone(); + let check_state = stop_socket.clone(); + let _listener_th = thread::spawn(move || { + accept_connections(listen_addr, h, stop_socket); + }); + + // We have started + { + let mut stratum_stats = self.stratum_stats.write(); + stratum_stats.is_running = true; + stratum_stats.edge_bits = (min_edge_bits() + 1) as u16; + stratum_stats.minimum_share_difficulty = self.config.minimum_share_difficulty; + } // Initial Loop. Waiting node complete syncing while self.sync_state.is_syncing() { thread::sleep(Duration::from_millis(50)); } - let h = handler.clone(); - - let task_stop_state = stop_state.clone(); - - let task_config = self.config.clone(); - let task_tx_pool = self.tx_pool.clone(); - let task_stats = self.stratum_stats.clone(); - let _ = thread::spawn(move || { - let rt = Runtime::new().unwrap(); - - let main_stop_state = task_stop_state.clone(); - let main_task = async move { - handler.run(&task_config, &task_tx_pool, main_stop_state); - }; - // Run main loop. - rt.spawn(main_task); - - // Run task to periodically check stop state. - rt.spawn(check_stop_state(task_stop_state.clone(), listen_addr)); - // Run connections listener and block thread till it will exit on stop. - rt.block_on(accept_connections(listen_addr, h, task_stop_state.clone())); - - // We have stopped. - { - let mut stratum_stats = task_stats.write(); - stratum_stats.is_running = false; - } - - task_stop_state.reset(); - }); - - warn!( - "Stratum server started on {}", - self.config.stratum_server_addr.clone().unwrap() - ); - - // We have started. - { - let mut stratum_stats = self.stratum_stats.write(); - stratum_stats.is_running = true; - stratum_stats.edge_bits = (global::min_edge_bits() + 1) as u16; - stratum_stats.minimum_share_difficulty = self.config.minimum_share_difficulty; - } + handler.run(&self.config, &self.tx_pool, check_state); } // fn run_loop() } // StratumServer