diff --git a/Cargo.lock b/Cargo.lock index 039a622..443d99a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -922,7 +922,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13aea89a5c93364a98e9b37b2fa237effbb694d5cfe01c5b70941f7eb087d5e3" dependencies = [ "cfg-if 0.1.10", - "dirs-sys", + "dirs-sys 0.3.7", +] + +[[package]] +name = "dirs" +version = "5.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44c45a9d03d6676652bcb5e724c7e988de1acad23a711b5217ab9cbecbec2225" +dependencies = [ + "dirs-sys 0.4.1", ] [[package]] @@ -936,6 +945,18 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "dirs-sys" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "520f05a5cbd335fae5a99ff7a6ab8627577660ee5cfd6a94a6a929b52ff0321c" +dependencies = [ + "libc", + "option-ext", + "redox_users", + "windows-sys 0.48.0", +] + [[package]] name = "dispatch" version = "0.2.0" @@ -1073,19 +1094,6 @@ dependencies = [ "winit", ] -[[package]] -name = "egui_demo_lib" -version = "0.20.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b191e5870e6cff885cf56d4e6c3a10ce0c66d9d855a92c1eaca96b0f1111c0aa" -dependencies = [ - "egui", - "egui_extras", - "enum-map", - "tracing", - "unicode_names2", -] - [[package]] name = "egui_extras" version = "0.20.0" @@ -1126,27 +1134,6 @@ dependencies = [ "bytemuck", ] -[[package]] -name = "enum-map" -version = "2.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "988f0d17a0fa38291e5f41f71ea8d46a5d5497b9054d5a759fae2cbb819f2356" -dependencies = [ - "enum-map-derive", - "serde", -] - -[[package]] -name = "enum-map-derive" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a4da76b3b6116d758c7ba93f7ec6a35d2e2cf24feda76c6e38a375f4d5c59f2" -dependencies = [ - "proc-macro2 1.0.60", - "quote 1.0.28", - "syn 1.0.109", -] - [[package]] name = "enum_primitive" version = "0.1.1" @@ -1710,10 +1697,9 @@ dependencies = [ "android_logger", "built", "chrono", - "dirs", + "dirs 5.0.1", "eframe", "egui", - "egui_demo_lib", "egui_extras", "env_logger 0.10.0", "futures 0.3.28", @@ -1732,14 +1718,14 @@ dependencies = [ "openssl-sys", "pnet", "pollster 0.3.0", - "rand 0.6.5", + "rand 0.8.5", "rust-i18n", "serde", "serde_derive", "serde_json", "sys-locale", - "tokio", - "tokio-util 0.2.0", + "tokio 1.29.1", + "tokio-util 0.7.8", "toml 0.7.4", "wgpu", "winit", @@ -1771,7 +1757,7 @@ dependencies = [ "serde_derive", "serde_json", "thiserror", - "tokio", + "tokio 0.2.25", "tokio-rustls", "url", ] @@ -1802,7 +1788,7 @@ dependencies = [ name = "grin_config" version = "5.2.0-beta.1" dependencies = [ - "dirs", + "dirs 2.0.2", "grin_core", "grin_p2p", "grin_servers", @@ -1936,7 +1922,7 @@ dependencies = [ "serde", "serde_derive", "serde_json", - "tokio", + "tokio 0.2.25", "tokio-util 0.2.0", "walkdir", ] @@ -1993,7 +1979,7 @@ dependencies = [ "http", "indexmap", "slab", - "tokio", + "tokio 0.2.25", "tokio-util 0.3.1", "tracing", "tracing-futures", @@ -2131,8 +2117,8 @@ dependencies = [ "httpdate", "itoa 0.4.8", "pin-project", - "socket2", - "tokio", + "socket2 0.3.19", + "tokio 0.2.25", "tower-service", "tracing", "want", @@ -2151,7 +2137,7 @@ dependencies = [ "log", "rustls", "rustls-native-certs", - "tokio", + "tokio 0.2.25", "tokio-rustls", "webpki", ] @@ -2164,7 +2150,7 @@ checksum = "0d1f9b0b8258e3ef8f45928021d3ef14096c2b93b99e4b8cfcabf1f58ec84b0a" dependencies = [ "bytes 0.5.6", "hyper", - "tokio", + "tokio 0.2.25", "tokio-io-timeout", ] @@ -3078,6 +3064,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "option-ext" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" + [[package]] name = "ordered-float" version = "1.1.1" @@ -3345,6 +3337,12 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22686f4785f02a4fcc856d3b3bb19bf6c8160d103f7a99cc258bddd0251dc7f2" +[[package]] +name = "ppv-lite86" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" + [[package]] name = "proc-macro-crate" version = "1.3.1" @@ -3424,7 +3422,7 @@ checksum = "6d71dacdc3c88c1fde3885a3be3fbab9f35724e6ce99467f7d9c5026132184ca" dependencies = [ "autocfg 0.1.8", "libc", - "rand_chacha", + "rand_chacha 0.1.1", "rand_core 0.4.2", "rand_hc", "rand_isaac", @@ -3435,6 +3433,17 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + [[package]] name = "rand_chacha" version = "0.1.1" @@ -3445,6 +3454,16 @@ dependencies = [ "rand_core 0.3.1", ] +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core 0.6.4", +] + [[package]] name = "rand_core" version = "0.3.1" @@ -3465,6 +3484,9 @@ name = "rand_core" version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom", +] [[package]] name = "rand_hc" @@ -4056,6 +4078,16 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "socket2" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64a4a911eed85daf18834cfaa86a79b7d266ff93ff5ba14005426219480ed662" +dependencies = [ + "libc", + "winapi 0.3.9", +] + [[package]] name = "spin" version = "0.5.2" @@ -4289,10 +4321,30 @@ dependencies = [ "pin-project-lite 0.1.12", "signal-hook-registry", "slab", - "tokio-macros", + "tokio-macros 0.2.6", "winapi 0.3.9", ] +[[package]] +name = "tokio" +version = "1.29.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "532826ff75199d5833b9d2c5fe410f29235e25704ee5f0ef599fb51c21f4a4da" +dependencies = [ + "autocfg 1.1.0", + "backtrace", + "bytes 1.4.0", + "libc", + "mio 0.8.8", + "num_cpus", + "parking_lot 0.12.1", + "pin-project-lite 0.2.9", + "signal-hook-registry", + "socket2 0.4.9", + "tokio-macros 2.1.0", + "windows-sys 0.48.0", +] + [[package]] name = "tokio-io-timeout" version = "0.4.0" @@ -4300,7 +4352,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9390a43272c8a6ac912ed1d1e2b6abeafd5047e05530a2fa304deee041a06215" dependencies = [ "bytes 0.5.6", - "tokio", + "tokio 0.2.25", ] [[package]] @@ -4314,6 +4366,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "tokio-macros" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" +dependencies = [ + "proc-macro2 1.0.60", + "quote 1.0.28", + "syn 2.0.18", +] + [[package]] name = "tokio-rustls" version = "0.13.1" @@ -4322,7 +4385,7 @@ checksum = "15cb62a0d2770787abc96e99c1cd98fcf17f94959f3af63ca85bdfb203f051b4" dependencies = [ "futures-core", "rustls", - "tokio", + "tokio 0.2.25", "webpki", ] @@ -4337,7 +4400,7 @@ dependencies = [ "futures-sink", "log", "pin-project-lite 0.1.12", - "tokio", + "tokio 0.2.25", ] [[package]] @@ -4351,7 +4414,21 @@ dependencies = [ "futures-sink", "log", "pin-project-lite 0.1.12", - "tokio", + "tokio 0.2.25", +] + +[[package]] +name = "tokio-util" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" +dependencies = [ + "bytes 1.4.0", + "futures-core", + "futures-sink", + "pin-project-lite 0.2.9", + "tokio 1.29.1", + "tracing", ] [[package]] @@ -4521,12 +4598,6 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c" -[[package]] -name = "unicode_names2" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "446c96c6dd42604779487f0a981060717156648c1706aa1f464677f03c6cc059" - [[package]] name = "unsafe-any" version = "0.4.2" diff --git a/Cargo.toml b/Cargo.toml index c33fabc..a86f4d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,11 +35,10 @@ wgpu = "0.14.0" egui = { version = "0.20.1", default-features = false } egui_extras = { version = "0.20.0" } eframe = { version = "0.20.1", features = [ "wgpu" ] } -egui_demo_lib = "0.20.0" ## grin_servers futures = "0.3" -dirs = "2.0" +dirs = "5.0.1" ## other once_cell = "1.10.0" @@ -48,15 +47,15 @@ sys-locale = "0.3.0" chrono = "0.4.23" lazy_static = "1.4.0" toml = "0.7.4" -serde = "1.0.164" +serde = "1" pnet = "0.33.0" # stratum server serde_derive = "1" serde_json = "1" -tokio = {version = "0.2", features = ["full"] } -tokio-util = { version = "0.2", features = ["codec"] } -rand = "0.6" +tokio = {version = "1.29.1", features = ["full"] } +tokio-util = { version = "0.7.8", features = ["codec"] } +rand = "0.8.5" [patch.crates-io] winit = { git = "https://github.com/rib/winit", branch = "android-activity" } diff --git a/locales/en.yml b/locales/en.yml index 17ca6bc..112f54b 100644 --- a/locales/en.yml +++ b/locales/en.yml @@ -56,8 +56,8 @@ network_metrics: difficulty_window: 'Difficulty window %{size}' network_mining: loading: Mining will be available after the synchronization - server_setup: Stratum server setup enable_server: Enable server + disable_server: Disable server info: 'Mining server is enabled, you can change its settings by selecting %{settings} at the bottom of the screen. Data is updating when devices are connected.' info_settings: To change the settings of enabled server, you will need to restart the node. rewards_wallet: Wallet for rewards diff --git a/locales/ru.yml b/locales/ru.yml index 6cd1b20..2f91287 100644 --- a/locales/ru.yml +++ b/locales/ru.yml @@ -56,12 +56,12 @@ network_metrics: difficulty_window: 'Окно сложности %{size}' network_mining: loading: Майнинг будет доступен после синхронизации - server_setup: Настройка stratum-сервера enable_server: Включить сервер + disable_server: Выключить сервер info: 'Сервер майнинга запущен, вы можете изменить его настройки, выбрав %{settings} внизу экрана. Данные обновляются, когда устройства подключены.' info_settings: Для изменения настроек запущенного сервера потребуется перезапуск узла. rewards_wallet: Кошелёк для наград - server: Stratum-сервер + server: Stratum сервер address: Адрес miners: Майнеры devices: Устройства diff --git a/src/gui/views/network/configs/stratum.rs b/src/gui/views/network/configs/stratum.rs index c4537ca..d551131 100644 --- a/src/gui/views/network/configs/stratum.rs +++ b/src/gui/views/network/configs/stratum.rs @@ -65,19 +65,32 @@ impl StratumSetup { pub const MIN_SHARE_DIFF_MODAL: &'static str = "stratum_min_share_diff"; pub fn ui(&mut self, ui: &mut Ui, cb: &dyn PlatformCallbacks) { - View::sub_title(ui, format!("{} {}", HARD_DRIVES, t!("network_mining.server_setup"))); + View::sub_title(ui, format!("{} {}", HARD_DRIVES, t!("network_mining.server"))); View::horizontal_line(ui, Colors::STROKE); ui.add_space(6.0); ui.vertical_centered(|ui| { - // Show button to enable stratum server if port is available and server is not running. - if self.is_port_available && !Node::is_stratum_server_starting() && Node::is_running() - && !Node::get_stratum_stats().is_running { - ui.add_space(6.0); - View::button(ui, t!("network_mining.enable_server"), Colors::GOLD, || { - Node::start_stratum_server(); - }); - ui.add_space(6.0); + // Show loading indicator or controls to start/stop stratum server if port is available. + if self.is_port_available { + if Node::is_stratum_starting() || Node::is_stratum_stopping() { + ui.vertical_centered(|ui| { + ui.add_space(8.0); + View::small_loading_spinner(ui); + ui.add_space(8.0); + }); + } else if Node::get_stratum_stats().is_running { + ui.add_space(6.0); + View::button(ui, t!("network_mining.disable_server"), Colors::GOLD, || { + Node::stop_stratum(); + }); + ui.add_space(6.0); + } else { + ui.add_space(6.0); + View::button(ui, t!("network_mining.enable_server"), Colors::GOLD, || { + Node::start_stratum(); + }); + ui.add_space(6.0); + } } // Show stratum server autorun checkbox. diff --git a/src/gui/views/network/mining.rs b/src/gui/views/network/mining.rs index 8874093..65b7954 100644 --- a/src/gui/views/network/mining.rs +++ b/src/gui/views/network/mining.rs @@ -18,7 +18,7 @@ use grin_chain::SyncStatus; use grin_servers::WorkerStats; use crate::gui::Colors; -use crate::gui::icons::{BARBELL, CLOCK_AFTERNOON, COMPUTER_TOWER, CPU, CUBE, FADERS, FOLDER_DASHED, FOLDER_NOTCH_MINUS, FOLDER_NOTCH_PLUS, PLUGS, PLUGS_CONNECTED, POLYGON}; +use crate::gui::icons::{BARBELL, CLOCK_AFTERNOON, CPU, CUBE, FADERS, FOLDER_DASHED, FOLDER_NOTCH_MINUS, FOLDER_NOTCH_PLUS, HARD_DRIVES, PLUGS, PLUGS_CONNECTED, POLYGON}; use crate::gui::platform::PlatformCallbacks; use crate::gui::views::{Modal, NetworkContainer, View}; use crate::gui::views::network::{NetworkTab, NetworkTabType}; @@ -45,7 +45,7 @@ impl NetworkTab for NetworkMining { } // Show loading spinner when node is stopping or stratum server is starting. - if Node::is_stopping() || Node::is_stratum_server_starting() { + if Node::is_stopping() || Node::is_stratum_starting() { ui.centered_and_justified(|ui| { View::big_loading_spinner(ui); }); @@ -80,7 +80,7 @@ impl NetworkTab for NetworkMining { } // Show stratum mining server info. - View::sub_title(ui, format!("{} {}", COMPUTER_TOWER, t!("network_mining.server"))); + View::sub_title(ui, format!("{} {}", HARD_DRIVES, t!("network_mining.server"))); ui.columns(2, |columns| { columns[0].vertical_centered(|ui| { let (stratum_addr, stratum_port) = NodeConfig::get_stratum_address(); diff --git a/src/node/mine_block.rs b/src/node/mine_block.rs index 01ed940..e41b267 100644 --- a/src/node/mine_block.rs +++ b/src/node/mine_block.rs @@ -15,6 +15,7 @@ //! Build a block to mine: gathers transactions from the pool, assembles //! them into a block and returns it. +use std::panic::panic_any; use chrono::prelude::{DateTime, NaiveDateTime, Utc}; use rand::{thread_rng, Rng}; use serde_json::{json, Value}; @@ -33,6 +34,7 @@ use grin_keychain::{ExtKeychain, Identifier, Keychain}; use grin_servers::ServerTxPool; use log::{debug, error, trace, warn}; use serde_derive::{Deserialize, Serialize}; +use crate::node::stratum::StratumStopState; /// Fees in block to use for coinbase amount calculation /// (Duplicated from Grin wallet project) @@ -74,6 +76,7 @@ pub fn get_block( tx_pool: &ServerTxPool, key_id: Option, wallet_listener_url: Option, + stop_state: &Arc ) -> (core::Block, BlockFees) { let wallet_retry_interval = 5; // get the latest chain state and build a block on top of it @@ -111,6 +114,11 @@ pub fn get_block( thread::sleep(Duration::from_millis(100)); } + // Stop attempts to build a block on stop. + if stop_state.is_stopped() { + panic_any("Stopped"); + } + result = build_block(chain, tx_pool, new_key_id, wallet_listener_url.clone()); } return result.unwrap(); diff --git a/src/node/node.rs b/src/node/node.rs index 34f5ad7..ce147ac 100644 --- a/src/node/node.rs +++ b/src/node/node.rs @@ -27,7 +27,7 @@ use grin_servers::common::types::Error; use jni::sys::{jboolean, jstring}; use lazy_static::lazy_static; use crate::node::NodeConfig; -use crate::node::stratum::StratumServer; +use crate::node::stratum::{StratumStopState, StratumServer}; lazy_static! { /// Static thread-aware state of [`Node`] to be updated from another thread. @@ -40,6 +40,8 @@ pub struct Node { stats: Arc>>, /// Stratum server statistics. stratum_stats: Arc>, + /// Stratum server statistics. + stratum_stop_state: Arc, /// Running API server address. api_addr: Arc>>, /// Running P2P server port. @@ -63,6 +65,7 @@ impl Default for Node { Self { stats: Arc::new(RwLock::new(None)), stratum_stats: Arc::new(grin_util::RwLock::new(StratumStats::default())), + stratum_stop_state: Arc::new(StratumStopState::default()), api_addr: Arc::new(RwLock::new(None)), p2p_port: Arc::new(RwLock::new(None)), starting: AtomicBool::new(false), @@ -119,15 +122,30 @@ impl Node { } /// Request to start stratum server. - pub fn start_stratum_server() { + pub fn start_stratum() { NODE_STATE.start_stratum_needed.store(true, Ordering::Relaxed); } /// Check if stratum server is starting. - pub fn is_stratum_server_starting() -> bool { + pub fn is_stratum_starting() -> bool { NODE_STATE.start_stratum_needed.load(Ordering::Relaxed) } + /// Get stratum server statistics. + pub fn get_stratum_stats() -> grin_util::RwLockReadGuard<'static, StratumStats> { + NODE_STATE.stratum_stats.read() + } + + /// Stop stratum server. + pub fn stop_stratum() { + NODE_STATE.stratum_stop_state.stop() + } + + /// Check if stratum server is stopping. + pub fn is_stratum_stopping() -> bool { + NODE_STATE.stratum_stop_state.is_stopped() + } + /// Check if node is starting. pub fn is_starting() -> bool { NODE_STATE.starting.load(Ordering::Relaxed) @@ -153,11 +171,6 @@ impl Node { NODE_STATE.stats.read().unwrap() } - /// Get stratum server [`Server`] statistics. - pub fn get_stratum_stats() -> grin_util::RwLockReadGuard<'static, StratumStats> { - NODE_STATE.stratum_stats.read() - } - /// Get synchronization status, empty when [`Server`] is not running. pub fn get_sync_status() -> Option { // Return Shutdown status when node is stopping. @@ -219,7 +232,7 @@ impl Node { } // Start stratum mining server if requested. - let stratum_start_requested = Self::is_stratum_server_starting(); + let stratum_start_requested = Self::is_stratum_starting(); if stratum_start_requested { let (s_ip, s_port) = NodeConfig::get_stratum_address(); if NodeConfig::is_stratum_port_available(&s_ip, &s_port) { @@ -581,7 +594,8 @@ pub fn start_stratum_mining_server(server: &Server, config: StratumServerConfig) server.tx_pool.clone(), NODE_STATE.stratum_stats.clone(), ); - let stop_state = server.stop_state.clone(); + let stop_state = NODE_STATE.stratum_stop_state.clone(); + stop_state.reset(); let _ = thread::Builder::new() .name("stratum_server".to_string()) .spawn(move || { diff --git a/src/node/stratum.rs b/src/node/stratum.rs index f9f4a86..6c48a30 100644 --- a/src/node/stratum.rs +++ b/src/node/stratum.rs @@ -23,13 +23,14 @@ use tokio::net::TcpListener; use tokio::runtime::Runtime; use tokio_util::codec::{Framed, LinesCodec}; -use grin_util::{RwLock, StopState}; +use grin_util::RwLock; use chrono::prelude::Utc; use serde_json::Value; use std::collections::HashMap; use std::net::{SocketAddr, TcpStream}; use std::panic::panic_any; use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; use std::thread; use std::time::{Duration, SystemTime}; @@ -204,6 +205,36 @@ impl State { } } +/// Stratum server stop state shared between to stop stratum from node thread. +pub struct StratumStopState { + stopping: AtomicBool, +} + +impl Default for StratumStopState { + fn default() -> Self { + Self { + stopping: AtomicBool::new(false) + } + } +} + +impl StratumStopState { + /// Check if stratum server should be stopped. + pub fn is_stopped(&self) -> bool { + self.stopping.load(Ordering::Relaxed) + } + + /// Called to stop stratum server from node thread. + pub fn stop(&self) { + self.stopping.store(true, Ordering::Relaxed); + } + + /// Called before stratum server start to reset state. + pub fn reset(&self) { + self.stopping.store(false, Ordering::Relaxed); + } +} + struct Handler { id: String, workers: Arc, @@ -499,7 +530,7 @@ impl Handler { self.workers .update_stats(worker_id, |worker_stats| worker_stats.num_accepted += 1); let submit_response = if share_is_block { - format!("blockfound - {}", b.hash().to_hex()) + format!("block found - {}", b.hash().to_hex()) } else { "ok".to_string() }; @@ -510,7 +541,6 @@ impl Handler { } // handle submit a solution fn broadcast_job(&self) { - debug!("broadcast job"); // Package new block into RpcRequest let job_template = self.build_block_template(); let job_template_json = serde_json::to_string(&job_template).unwrap(); @@ -530,12 +560,19 @@ impl Handler { self.workers.broadcast(job_request_json); } - pub fn run(&self, config: &StratumServerConfig, tx_pool: &ServerTxPool, stop_state: Arc) { + pub fn run(&self, config: &StratumServerConfig, + tx_pool: &ServerTxPool, + stop_state: Arc) { debug!("Run main loop"); let mut deadline: i64 = 0; let mut head = self.chain.head().unwrap(); let mut current_hash = head.prev_block_h; loop { + // Stop main loop on stratum stop. + if stop_state.is_stopped() { + panic_any("Stopped"); + } + // get the latest chain state head = self.chain.head().unwrap(); let latest_hash = head.last_block_h; @@ -547,7 +584,6 @@ impl Handler { && self.workers.count() > 0 { { - debug!("resend updated block"); let mut state = self.current_state.write(); let wallet_listener_url = if !config.burn_reward { Some(config.wallet_listener_url.clone()) @@ -563,6 +599,7 @@ impl Handler { tx_pool, state.current_key_id.clone(), wallet_listener_url, + &stop_state ); // scaled difficulty @@ -604,80 +641,98 @@ impl Handler { // ---------------------------------------- // Worker Factory Thread Function -fn accept_connections(listen_addr: SocketAddr, handler: Arc, stop_state: Arc) { +async fn accept_connections(listen_addr: SocketAddr, + handler: Arc, + stop_state: Arc) { info!("Start tokio stratum server"); - let state_to_check = stop_state.clone(); + let state_check = stop_state.clone(); - let task = async move { - let mut listener = TcpListener::bind(&listen_addr).await.unwrap_or_else(|_| { - panic!("Stratum: Failed to bind to listen address {}", listen_addr) - }); - let state_socket = &stop_state.clone(); - let server = listener - .incoming() - .filter_map(|s| async { s.map_err(|e| error!("accept error = {:?}", e)).ok() }) - .for_each(move |socket| { - let handler = handler.clone(); - async move { - // Stop listener on node server stop. - if state_socket.is_stopped() { - panic_any("Stopped"); + // let task = async move { + // + // }; + + let listener = TcpListener::bind(&listen_addr).await.unwrap_or_else(|_| { + panic!("Stratum: Failed to bind to listen address {}", listen_addr) + }); + let stop_socket = &stop_state.clone(); + loop { + let (socket, _) = listener.accept().await.unwrap(); + // Stop listener on stratum stop. + { + if stop_socket.is_stopped() { + break; + } + } + + let handler = handler.clone(); + + let process = || async move { + // Spawn a task to process the connection + let (tx, mut rx) = mpsc::unbounded(); + + let worker_id = handler.workers.add_worker(tx); + info!("Worker {} connected", worker_id); + + let framed = Framed::new(socket, LinesCodec::new()); + let (mut writer, mut reader) = framed.split(); + + let h = handler.clone(); + let stop_read = stop_socket.clone(); + let read = async move { + while let Some(line) = reader + .try_next() + .await + .map_err(|e| error!("error reading line: {}", e))? + { + // Stop read on stratum stop. + { + if stop_read.is_stopped() { + break; + } } - // Spawn a task to process the connection - let (tx, mut rx) = mpsc::unbounded(); - - let worker_id = handler.workers.add_worker(tx); - info!("Worker {} connected", worker_id); - - let framed = Framed::new(socket, LinesCodec::new()); - let (mut writer, mut reader) = framed.split(); - - let h = handler.clone(); - let read = async move { - while let Some(line) = reader - .try_next() - .await - .map_err(|e| error!("error reading line: {}", e))? - { - let request = serde_json::from_str(&line) - .map_err(|e| error!("error serializing line: {}", e))?; - let resp = h.handle_rpc_requests(request, worker_id); - h.workers.send_to(worker_id, resp); - } - - Result::<_, ()>::Ok(()) - }; - - let write = async move { - while let Some(line) = rx.next().await { - writer - .send(line) - .await - .map_err(|e| error!("error writing line: {}", e))?; - } - - Result::<_, ()>::Ok(()) - }; - - let task = async move { - pin_mut!(read, write); - futures::future::select(read, write).await; - handler.workers.remove_worker(worker_id); - info!("Worker {} disconnected", worker_id); - }; - tokio::spawn(task); + let request = serde_json::from_str(&line) + .map_err(|e| error!("error serializing line: {}", e))?; + let resp = h.handle_rpc_requests(request, worker_id); + h.workers.send_to(worker_id, resp); } - }); - server.await - }; - let mut rt = Runtime::new().unwrap(); - rt.spawn(check_stop_state(state_to_check, listen_addr)); - rt.block_on(task); + Result::<_, ()>::Ok(()) + }; + + let stop_write = stop_socket.clone(); + let write = async move { + while let Some(line) = rx.next().await { + // Stop write on stratum stop. + { + if stop_write.is_stopped() { + break; + } + } + writer + .send(line) + .await + .map_err(|e| error!("error writing line: {}", e))?; + } + Result::<_, ()>::Ok(()) + }; + + let task = async move { + pin_mut!(read, write); + futures::future::select(read, write).await; + handler.workers.remove_worker(worker_id); + info!("Worker {} disconnected", worker_id); + }; + tokio::spawn(task); + + Result::<_, ()>::Ok(()) + }; + + let _ = (process)().await; + } } -async fn check_stop_state(stop_state: Arc, listen_addr: SocketAddr) { +async fn check_stop_state(stop_state: Arc, listen_addr: SocketAddr) { loop { // Ping stratum socket on stop to handle TcpListener unbind. if stop_state.is_stopped() { @@ -690,6 +745,7 @@ async fn check_stop_state(stop_state: Arc, listen_addr: SocketAddr) { } } + // ---------------------------------------- // Worker Object - a connected stratum client - a miner, pool, proxy, etc... @@ -878,7 +934,9 @@ impl StratumServer { /// existing chain anytime required and sending that to the connected /// stratum miner, proxy, or pool, and accepts full solutions to /// be submitted. - pub fn run_loop(&mut self, proof_size: usize, sync_state: Arc, stop_state: Arc) { + pub fn run_loop(&mut self, proof_size: usize, + sync_state: Arc, + stop_state: Arc) { info!( "(Server ID: {}) Starting stratum server with proof_size = {}", self.id, proof_size @@ -895,33 +953,55 @@ impl StratumServer { .expect("Stratum: Incorrect address "); let handler = Arc::new(Handler::from_stratum(&self)); - let h = handler.clone(); - - let s_state = stop_state.clone(); - - let _listener_th = thread::spawn(move || { - accept_connections(listen_addr, h, s_state); - }); - - // We have started - { - let mut stratum_stats = self.stratum_stats.write(); - stratum_stats.is_running = true; - stratum_stats.edge_bits = (global::min_edge_bits() + 1) as u16; - stratum_stats.minimum_share_difficulty = self.config.minimum_share_difficulty; - } - - warn!( - "Stratum server started on {}", - self.config.stratum_server_addr.clone().unwrap() - ); // Initial Loop. Waiting node complete syncing while self.sync_state.is_syncing() { thread::sleep(Duration::from_millis(50)); } - handler.run(&self.config, &self.tx_pool, stop_state.clone()); + let h = handler.clone(); + + let task_stop_state = stop_state.clone(); + + let task_config = self.config.clone(); + let task_tx_pool = self.tx_pool.clone(); + let task_stats = self.stratum_stats.clone(); + let _ = thread::spawn(move || { + let rt = Runtime::new().unwrap(); + + let main_stop_state = task_stop_state.clone(); + let main_task = async move { + handler.run(&task_config, &task_tx_pool, main_stop_state); + }; + // Run main loop. + rt.spawn(main_task); + + // Run task to periodically check stop state. + rt.spawn(check_stop_state(task_stop_state.clone(), listen_addr)); + // Run connections listener and block thread till it will exit on stop. + rt.block_on(accept_connections(listen_addr, h, task_stop_state.clone())); + + // We have stopped. + { + let mut stratum_stats = task_stats.write(); + stratum_stats.is_running = false; + } + + task_stop_state.reset(); + }); + + warn!( + "Stratum server started on {}", + self.config.stratum_server_addr.clone().unwrap() + ); + + // We have started. + { + let mut stratum_stats = self.stratum_stats.write(); + stratum_stats.is_running = true; + stratum_stats.edge_bits = (global::min_edge_bits() + 1) as u16; + stratum_stats.minimum_share_difficulty = self.config.minimum_share_difficulty; + } } // fn run_loop() } // StratumServer