stratum: start/stop refactoring, add ability to disable stratum server without node restart
This commit is contained in:
parent
31addbc157
commit
eb898da201
7 changed files with 234 additions and 153 deletions
|
@ -56,8 +56,8 @@ network_metrics:
|
||||||
difficulty_window: 'Difficulty window %{size}'
|
difficulty_window: 'Difficulty window %{size}'
|
||||||
network_mining:
|
network_mining:
|
||||||
loading: Mining will be available after the synchronization
|
loading: Mining will be available after the synchronization
|
||||||
server_setup: Stratum server setup
|
|
||||||
enable_server: Enable server
|
enable_server: Enable server
|
||||||
|
disable_server: Disable server
|
||||||
info: 'Mining server is enabled, you can change its settings by selecting %{settings} at the bottom of the screen. Data is updating when devices are connected.'
|
info: 'Mining server is enabled, you can change its settings by selecting %{settings} at the bottom of the screen. Data is updating when devices are connected.'
|
||||||
info_settings: To change the settings of enabled server, you will need to restart the node.
|
info_settings: To change the settings of enabled server, you will need to restart the node.
|
||||||
rewards_wallet: Wallet for rewards
|
rewards_wallet: Wallet for rewards
|
||||||
|
|
|
@ -56,12 +56,12 @@ network_metrics:
|
||||||
difficulty_window: 'Окно сложности %{size}'
|
difficulty_window: 'Окно сложности %{size}'
|
||||||
network_mining:
|
network_mining:
|
||||||
loading: Майнинг будет доступен после синхронизации
|
loading: Майнинг будет доступен после синхронизации
|
||||||
server_setup: Настройка stratum-сервера
|
|
||||||
enable_server: Включить сервер
|
enable_server: Включить сервер
|
||||||
|
disable_server: Выключить сервер
|
||||||
info: 'Сервер майнинга запущен, вы можете изменить его настройки, выбрав %{settings} внизу экрана. Данные обновляются, когда устройства подключены.'
|
info: 'Сервер майнинга запущен, вы можете изменить его настройки, выбрав %{settings} внизу экрана. Данные обновляются, когда устройства подключены.'
|
||||||
info_settings: Для изменения настроек запущенного сервера потребуется перезапуск узла.
|
info_settings: Для изменения настроек запущенного сервера потребуется перезапуск узла.
|
||||||
rewards_wallet: Кошелёк для наград
|
rewards_wallet: Кошелёк для наград
|
||||||
server: Stratum-сервер
|
server: Stratum сервер
|
||||||
address: Адрес
|
address: Адрес
|
||||||
miners: Майнеры
|
miners: Майнеры
|
||||||
devices: Устройства
|
devices: Устройства
|
||||||
|
|
|
@ -65,19 +65,32 @@ impl StratumSetup {
|
||||||
pub const MIN_SHARE_DIFF_MODAL: &'static str = "stratum_min_share_diff";
|
pub const MIN_SHARE_DIFF_MODAL: &'static str = "stratum_min_share_diff";
|
||||||
|
|
||||||
pub fn ui(&mut self, ui: &mut Ui, cb: &dyn PlatformCallbacks) {
|
pub fn ui(&mut self, ui: &mut Ui, cb: &dyn PlatformCallbacks) {
|
||||||
View::sub_title(ui, format!("{} {}", HARD_DRIVES, t!("network_mining.server_setup")));
|
View::sub_title(ui, format!("{} {}", HARD_DRIVES, t!("network_mining.server")));
|
||||||
View::horizontal_line(ui, Colors::STROKE);
|
View::horizontal_line(ui, Colors::STROKE);
|
||||||
ui.add_space(6.0);
|
ui.add_space(6.0);
|
||||||
|
|
||||||
ui.vertical_centered(|ui| {
|
ui.vertical_centered(|ui| {
|
||||||
// Show button to enable stratum server if port is available and server is not running.
|
// Show loading indicator or controls to start/stop stratum server if port is available.
|
||||||
if self.is_port_available && !Node::is_stratum_server_starting() && Node::is_running()
|
if self.is_port_available {
|
||||||
&& !Node::get_stratum_stats().is_running {
|
if Node::is_stratum_starting() || Node::is_stratum_stopping() {
|
||||||
ui.add_space(6.0);
|
ui.vertical_centered(|ui| {
|
||||||
View::button(ui, t!("network_mining.enable_server"), Colors::GOLD, || {
|
ui.add_space(8.0);
|
||||||
Node::start_stratum_server();
|
View::small_loading_spinner(ui);
|
||||||
});
|
ui.add_space(8.0);
|
||||||
ui.add_space(6.0);
|
});
|
||||||
|
} else if Node::get_stratum_stats().is_running {
|
||||||
|
ui.add_space(6.0);
|
||||||
|
View::button(ui, t!("network_mining.disable_server"), Colors::GOLD, || {
|
||||||
|
Node::stop_stratum();
|
||||||
|
});
|
||||||
|
ui.add_space(6.0);
|
||||||
|
} else {
|
||||||
|
ui.add_space(6.0);
|
||||||
|
View::button(ui, t!("network_mining.enable_server"), Colors::GOLD, || {
|
||||||
|
Node::start_stratum();
|
||||||
|
});
|
||||||
|
ui.add_space(6.0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Show stratum server autorun checkbox.
|
// Show stratum server autorun checkbox.
|
||||||
|
|
|
@ -18,7 +18,7 @@ use grin_chain::SyncStatus;
|
||||||
use grin_servers::WorkerStats;
|
use grin_servers::WorkerStats;
|
||||||
|
|
||||||
use crate::gui::Colors;
|
use crate::gui::Colors;
|
||||||
use crate::gui::icons::{BARBELL, CLOCK_AFTERNOON, COMPUTER_TOWER, CPU, CUBE, FADERS, FOLDER_DASHED, FOLDER_NOTCH_MINUS, FOLDER_NOTCH_PLUS, PLUGS, PLUGS_CONNECTED, POLYGON};
|
use crate::gui::icons::{BARBELL, CLOCK_AFTERNOON, CPU, CUBE, FADERS, FOLDER_DASHED, FOLDER_NOTCH_MINUS, FOLDER_NOTCH_PLUS, HARD_DRIVES, PLUGS, PLUGS_CONNECTED, POLYGON};
|
||||||
use crate::gui::platform::PlatformCallbacks;
|
use crate::gui::platform::PlatformCallbacks;
|
||||||
use crate::gui::views::{Modal, NetworkContainer, View};
|
use crate::gui::views::{Modal, NetworkContainer, View};
|
||||||
use crate::gui::views::network::{NetworkTab, NetworkTabType};
|
use crate::gui::views::network::{NetworkTab, NetworkTabType};
|
||||||
|
@ -45,7 +45,7 @@ impl NetworkTab for NetworkMining {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Show loading spinner when node is stopping or stratum server is starting.
|
// Show loading spinner when node is stopping or stratum server is starting.
|
||||||
if Node::is_stopping() || Node::is_stratum_server_starting() {
|
if Node::is_stopping() || Node::is_stratum_starting() {
|
||||||
ui.centered_and_justified(|ui| {
|
ui.centered_and_justified(|ui| {
|
||||||
View::big_loading_spinner(ui);
|
View::big_loading_spinner(ui);
|
||||||
});
|
});
|
||||||
|
@ -80,7 +80,7 @@ impl NetworkTab for NetworkMining {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Show stratum mining server info.
|
// Show stratum mining server info.
|
||||||
View::sub_title(ui, format!("{} {}", COMPUTER_TOWER, t!("network_mining.server")));
|
View::sub_title(ui, format!("{} {}", HARD_DRIVES, t!("network_mining.server")));
|
||||||
ui.columns(2, |columns| {
|
ui.columns(2, |columns| {
|
||||||
columns[0].vertical_centered(|ui| {
|
columns[0].vertical_centered(|ui| {
|
||||||
let (stratum_addr, stratum_port) = NodeConfig::get_stratum_address();
|
let (stratum_addr, stratum_port) = NodeConfig::get_stratum_address();
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
//! Build a block to mine: gathers transactions from the pool, assembles
|
//! Build a block to mine: gathers transactions from the pool, assembles
|
||||||
//! them into a block and returns it.
|
//! them into a block and returns it.
|
||||||
|
|
||||||
|
use std::panic::panic_any;
|
||||||
use chrono::prelude::{DateTime, NaiveDateTime, Utc};
|
use chrono::prelude::{DateTime, NaiveDateTime, Utc};
|
||||||
use rand::{thread_rng, Rng};
|
use rand::{thread_rng, Rng};
|
||||||
use serde_json::{json, Value};
|
use serde_json::{json, Value};
|
||||||
|
@ -33,6 +34,7 @@ use grin_keychain::{ExtKeychain, Identifier, Keychain};
|
||||||
use grin_servers::ServerTxPool;
|
use grin_servers::ServerTxPool;
|
||||||
use log::{debug, error, trace, warn};
|
use log::{debug, error, trace, warn};
|
||||||
use serde_derive::{Deserialize, Serialize};
|
use serde_derive::{Deserialize, Serialize};
|
||||||
|
use crate::node::stratum::StratumStopState;
|
||||||
|
|
||||||
/// Fees in block to use for coinbase amount calculation
|
/// Fees in block to use for coinbase amount calculation
|
||||||
/// (Duplicated from Grin wallet project)
|
/// (Duplicated from Grin wallet project)
|
||||||
|
@ -74,6 +76,7 @@ pub fn get_block(
|
||||||
tx_pool: &ServerTxPool,
|
tx_pool: &ServerTxPool,
|
||||||
key_id: Option<Identifier>,
|
key_id: Option<Identifier>,
|
||||||
wallet_listener_url: Option<String>,
|
wallet_listener_url: Option<String>,
|
||||||
|
stop_state: &Arc<StratumStopState>
|
||||||
) -> (core::Block, BlockFees) {
|
) -> (core::Block, BlockFees) {
|
||||||
let wallet_retry_interval = 5;
|
let wallet_retry_interval = 5;
|
||||||
// get the latest chain state and build a block on top of it
|
// get the latest chain state and build a block on top of it
|
||||||
|
@ -111,6 +114,11 @@ pub fn get_block(
|
||||||
thread::sleep(Duration::from_millis(100));
|
thread::sleep(Duration::from_millis(100));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Stop attempts to build a block on stop.
|
||||||
|
if stop_state.is_stopped() {
|
||||||
|
panic_any("Stopped");
|
||||||
|
}
|
||||||
|
|
||||||
result = build_block(chain, tx_pool, new_key_id, wallet_listener_url.clone());
|
result = build_block(chain, tx_pool, new_key_id, wallet_listener_url.clone());
|
||||||
}
|
}
|
||||||
return result.unwrap();
|
return result.unwrap();
|
||||||
|
@ -141,7 +149,6 @@ fn build_block(
|
||||||
// If this fails for *any* reason then fallback to an empty vec of txs.
|
// If this fails for *any* reason then fallback to an empty vec of txs.
|
||||||
// This will allow us to mine an "empty" block if the txpool is in an
|
// This will allow us to mine an "empty" block if the txpool is in an
|
||||||
// invalid (and unexpected) state.
|
// invalid (and unexpected) state.
|
||||||
println!("12345 prepare_mineable_transactions");
|
|
||||||
let txs = match tx_pool.read().prepare_mineable_transactions() {
|
let txs = match tx_pool.read().prepare_mineable_transactions() {
|
||||||
Ok(txs) => txs,
|
Ok(txs) => txs,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
@ -153,7 +160,6 @@ fn build_block(
|
||||||
vec![]
|
vec![]
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
println!("12345 after prepare_mineable_transactions");
|
|
||||||
|
|
||||||
// build the coinbase and the block itself
|
// build the coinbase and the block itself
|
||||||
let fees = txs.iter().map(|tx| tx.fee()).sum();
|
let fees = txs.iter().map(|tx| tx.fee()).sum();
|
||||||
|
|
|
@ -27,7 +27,7 @@ use grin_servers::common::types::Error;
|
||||||
use jni::sys::{jboolean, jstring};
|
use jni::sys::{jboolean, jstring};
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
use crate::node::NodeConfig;
|
use crate::node::NodeConfig;
|
||||||
use crate::node::stratum::StratumServer;
|
use crate::node::stratum::{StratumStopState, StratumServer};
|
||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
/// Static thread-aware state of [`Node`] to be updated from another thread.
|
/// Static thread-aware state of [`Node`] to be updated from another thread.
|
||||||
|
@ -40,6 +40,8 @@ pub struct Node {
|
||||||
stats: Arc<RwLock<Option<ServerStats>>>,
|
stats: Arc<RwLock<Option<ServerStats>>>,
|
||||||
/// Stratum server statistics.
|
/// Stratum server statistics.
|
||||||
stratum_stats: Arc<grin_util::RwLock<StratumStats>>,
|
stratum_stats: Arc<grin_util::RwLock<StratumStats>>,
|
||||||
|
/// Stratum server statistics.
|
||||||
|
stratum_stop_state: Arc<StratumStopState>,
|
||||||
/// Running API server address.
|
/// Running API server address.
|
||||||
api_addr: Arc<RwLock<Option<String>>>,
|
api_addr: Arc<RwLock<Option<String>>>,
|
||||||
/// Running P2P server port.
|
/// Running P2P server port.
|
||||||
|
@ -63,6 +65,7 @@ impl Default for Node {
|
||||||
Self {
|
Self {
|
||||||
stats: Arc::new(RwLock::new(None)),
|
stats: Arc::new(RwLock::new(None)),
|
||||||
stratum_stats: Arc::new(grin_util::RwLock::new(StratumStats::default())),
|
stratum_stats: Arc::new(grin_util::RwLock::new(StratumStats::default())),
|
||||||
|
stratum_stop_state: Arc::new(StratumStopState::default()),
|
||||||
api_addr: Arc::new(RwLock::new(None)),
|
api_addr: Arc::new(RwLock::new(None)),
|
||||||
p2p_port: Arc::new(RwLock::new(None)),
|
p2p_port: Arc::new(RwLock::new(None)),
|
||||||
starting: AtomicBool::new(false),
|
starting: AtomicBool::new(false),
|
||||||
|
@ -119,15 +122,30 @@ impl Node {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Request to start stratum server.
|
/// Request to start stratum server.
|
||||||
pub fn start_stratum_server() {
|
pub fn start_stratum() {
|
||||||
NODE_STATE.start_stratum_needed.store(true, Ordering::Relaxed);
|
NODE_STATE.start_stratum_needed.store(true, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Check if stratum server is starting.
|
/// Check if stratum server is starting.
|
||||||
pub fn is_stratum_server_starting() -> bool {
|
pub fn is_stratum_starting() -> bool {
|
||||||
NODE_STATE.start_stratum_needed.load(Ordering::Relaxed)
|
NODE_STATE.start_stratum_needed.load(Ordering::Relaxed)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get stratum server statistics.
|
||||||
|
pub fn get_stratum_stats() -> grin_util::RwLockReadGuard<'static, StratumStats> {
|
||||||
|
NODE_STATE.stratum_stats.read()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Stop stratum server.
|
||||||
|
pub fn stop_stratum() {
|
||||||
|
NODE_STATE.stratum_stop_state.stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check if stratum server is stopping.
|
||||||
|
pub fn is_stratum_stopping() -> bool {
|
||||||
|
NODE_STATE.stratum_stop_state.is_stopped()
|
||||||
|
}
|
||||||
|
|
||||||
/// Check if node is starting.
|
/// Check if node is starting.
|
||||||
pub fn is_starting() -> bool {
|
pub fn is_starting() -> bool {
|
||||||
NODE_STATE.starting.load(Ordering::Relaxed)
|
NODE_STATE.starting.load(Ordering::Relaxed)
|
||||||
|
@ -153,11 +171,6 @@ impl Node {
|
||||||
NODE_STATE.stats.read().unwrap()
|
NODE_STATE.stats.read().unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get stratum server [`Server`] statistics.
|
|
||||||
pub fn get_stratum_stats() -> grin_util::RwLockReadGuard<'static, StratumStats> {
|
|
||||||
NODE_STATE.stratum_stats.read()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get synchronization status, empty when [`Server`] is not running.
|
/// Get synchronization status, empty when [`Server`] is not running.
|
||||||
pub fn get_sync_status() -> Option<SyncStatus> {
|
pub fn get_sync_status() -> Option<SyncStatus> {
|
||||||
// Return Shutdown status when node is stopping.
|
// Return Shutdown status when node is stopping.
|
||||||
|
@ -219,7 +232,7 @@ impl Node {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start stratum mining server if requested.
|
// Start stratum mining server if requested.
|
||||||
let stratum_start_requested = Self::is_stratum_server_starting();
|
let stratum_start_requested = Self::is_stratum_starting();
|
||||||
if stratum_start_requested {
|
if stratum_start_requested {
|
||||||
let (s_ip, s_port) = NodeConfig::get_stratum_address();
|
let (s_ip, s_port) = NodeConfig::get_stratum_address();
|
||||||
if NodeConfig::is_stratum_port_available(&s_ip, &s_port) {
|
if NodeConfig::is_stratum_port_available(&s_ip, &s_port) {
|
||||||
|
@ -581,7 +594,8 @@ pub fn start_stratum_mining_server(server: &Server, config: StratumServerConfig)
|
||||||
server.tx_pool.clone(),
|
server.tx_pool.clone(),
|
||||||
NODE_STATE.stratum_stats.clone(),
|
NODE_STATE.stratum_stats.clone(),
|
||||||
);
|
);
|
||||||
let stop_state = server.stop_state.clone();
|
let stop_state = NODE_STATE.stratum_stop_state.clone();
|
||||||
|
stop_state.reset();
|
||||||
let _ = thread::Builder::new()
|
let _ = thread::Builder::new()
|
||||||
.name("stratum_server".to_string())
|
.name("stratum_server".to_string())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
|
|
|
@ -23,13 +23,14 @@ use tokio::net::TcpListener;
|
||||||
use tokio::runtime::Runtime;
|
use tokio::runtime::Runtime;
|
||||||
use tokio_util::codec::{Framed, LinesCodec};
|
use tokio_util::codec::{Framed, LinesCodec};
|
||||||
|
|
||||||
use grin_util::{RwLock, StopState};
|
use grin_util::RwLock;
|
||||||
use chrono::prelude::Utc;
|
use chrono::prelude::Utc;
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::net::{SocketAddr, TcpStream};
|
use std::net::{SocketAddr, TcpStream};
|
||||||
use std::panic::panic_any;
|
use std::panic::panic_any;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::thread;
|
use std::thread;
|
||||||
use std::time::{Duration, SystemTime};
|
use std::time::{Duration, SystemTime};
|
||||||
|
|
||||||
|
@ -204,6 +205,36 @@ impl State {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Stratum server stop state shared between to stop stratum from node thread.
|
||||||
|
pub struct StratumStopState {
|
||||||
|
stopping: AtomicBool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for StratumStopState {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
stopping: AtomicBool::new(false)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl StratumStopState {
|
||||||
|
/// Check if stratum server should be stopped.
|
||||||
|
pub fn is_stopped(&self) -> bool {
|
||||||
|
self.stopping.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Called to stop stratum server from node thread.
|
||||||
|
pub fn stop(&self) {
|
||||||
|
self.stopping.store(true, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Called before stratum server start to reset state.
|
||||||
|
pub fn reset(&self) {
|
||||||
|
self.stopping.store(false, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
struct Handler {
|
struct Handler {
|
||||||
id: String,
|
id: String,
|
||||||
workers: Arc<WorkersList>,
|
workers: Arc<WorkersList>,
|
||||||
|
@ -331,7 +362,6 @@ impl Handler {
|
||||||
|
|
||||||
// Build and return a JobTemplate for mining the current block
|
// Build and return a JobTemplate for mining the current block
|
||||||
fn build_block_template(&self) -> JobTemplate {
|
fn build_block_template(&self) -> JobTemplate {
|
||||||
println!("1 build template 12345");
|
|
||||||
let bh = self
|
let bh = self
|
||||||
.current_state
|
.current_state
|
||||||
.read()
|
.read()
|
||||||
|
@ -340,7 +370,6 @@ impl Handler {
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.header
|
.header
|
||||||
.clone();
|
.clone();
|
||||||
println!("2 build template 12345");
|
|
||||||
// Serialize the block header into pre and post nonce strings
|
// Serialize the block header into pre and post nonce strings
|
||||||
let mut header_buf = vec![];
|
let mut header_buf = vec![];
|
||||||
{
|
{
|
||||||
|
@ -501,7 +530,7 @@ impl Handler {
|
||||||
self.workers
|
self.workers
|
||||||
.update_stats(worker_id, |worker_stats| worker_stats.num_accepted += 1);
|
.update_stats(worker_id, |worker_stats| worker_stats.num_accepted += 1);
|
||||||
let submit_response = if share_is_block {
|
let submit_response = if share_is_block {
|
||||||
format!("blockfound - {}", b.hash().to_hex())
|
format!("block found - {}", b.hash().to_hex())
|
||||||
} else {
|
} else {
|
||||||
"ok".to_string()
|
"ok".to_string()
|
||||||
};
|
};
|
||||||
|
@ -512,7 +541,6 @@ impl Handler {
|
||||||
} // handle submit a solution
|
} // handle submit a solution
|
||||||
|
|
||||||
fn broadcast_job(&self) {
|
fn broadcast_job(&self) {
|
||||||
println!("12345 broadcast job");
|
|
||||||
// Package new block into RpcRequest
|
// Package new block into RpcRequest
|
||||||
let job_template = self.build_block_template();
|
let job_template = self.build_block_template();
|
||||||
let job_template_json = serde_json::to_string(&job_template).unwrap();
|
let job_template_json = serde_json::to_string(&job_template).unwrap();
|
||||||
|
@ -532,31 +560,18 @@ impl Handler {
|
||||||
self.workers.broadcast(job_request_json);
|
self.workers.broadcast(job_request_json);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn run(&self, config: &StratumServerConfig, tx_pool: &ServerTxPool, stop_state: Arc<StopState>) {
|
pub fn run(&self, config: &StratumServerConfig,
|
||||||
|
tx_pool: &ServerTxPool,
|
||||||
|
stop_state: Arc<StratumStopState>) {
|
||||||
debug!("Run main loop");
|
debug!("Run main loop");
|
||||||
let mut deadline: i64 = 0;
|
let mut deadline: i64 = 0;
|
||||||
let mut head = self.chain.head().unwrap();
|
let mut head = self.chain.head().unwrap();
|
||||||
let mut current_hash = head.prev_block_h;
|
let mut current_hash = head.prev_block_h;
|
||||||
loop {
|
loop {
|
||||||
println!("12345 looping");
|
// Stop main loop on stratum stop.
|
||||||
// Ping stratum socket on stop to handle TcpListener unbind.
|
|
||||||
if stop_state.is_stopped() {
|
if stop_state.is_stopped() {
|
||||||
println!("12345 prepare ping to stop");
|
panic_any("Stopped");
|
||||||
let listen_addr: SocketAddr = config
|
|
||||||
.stratum_server_addr
|
|
||||||
.clone()
|
|
||||||
.unwrap()
|
|
||||||
.parse()
|
|
||||||
.expect("Stratum: Incorrect address ");
|
|
||||||
thread::spawn(move || {
|
|
||||||
println!("12345 ping start");
|
|
||||||
let _ = TcpStream::connect(listen_addr).unwrap();
|
|
||||||
println!("12345 ping end");
|
|
||||||
});
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
println!("12345 looping2");
|
|
||||||
|
|
||||||
|
|
||||||
// get the latest chain state
|
// get the latest chain state
|
||||||
head = self.chain.head().unwrap();
|
head = self.chain.head().unwrap();
|
||||||
|
@ -569,9 +584,7 @@ impl Handler {
|
||||||
&& self.workers.count() > 0
|
&& self.workers.count() > 0
|
||||||
{
|
{
|
||||||
{
|
{
|
||||||
println!("12345 resend updated block");
|
|
||||||
let mut state = self.current_state.write();
|
let mut state = self.current_state.write();
|
||||||
println!("12345 after resend updated block");
|
|
||||||
let wallet_listener_url = if !config.burn_reward {
|
let wallet_listener_url = if !config.burn_reward {
|
||||||
Some(config.wallet_listener_url.clone())
|
Some(config.wallet_listener_url.clone())
|
||||||
} else {
|
} else {
|
||||||
|
@ -581,14 +594,13 @@ impl Handler {
|
||||||
let clear_blocks = current_hash != latest_hash;
|
let clear_blocks = current_hash != latest_hash;
|
||||||
|
|
||||||
// Build the new block (version)
|
// Build the new block (version)
|
||||||
println!("12345 get_block");
|
|
||||||
let (new_block, block_fees) = mine_block::get_block(
|
let (new_block, block_fees) = mine_block::get_block(
|
||||||
&self.chain,
|
&self.chain,
|
||||||
tx_pool,
|
tx_pool,
|
||||||
state.current_key_id.clone(),
|
state.current_key_id.clone(),
|
||||||
wallet_listener_url,
|
wallet_listener_url,
|
||||||
|
&stop_state
|
||||||
);
|
);
|
||||||
println!("12345 after get_block");
|
|
||||||
|
|
||||||
// scaled difficulty
|
// scaled difficulty
|
||||||
state.current_difficulty =
|
state.current_difficulty =
|
||||||
|
@ -609,20 +621,14 @@ impl Handler {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update the mining stats
|
// Update the mining stats
|
||||||
println!("12345 update_block_height");
|
|
||||||
self.workers.update_block_height(new_block.header.height);
|
self.workers.update_block_height(new_block.header.height);
|
||||||
println!("12345 after update_block_height");
|
|
||||||
let difficulty = new_block.header.total_difficulty() - head.total_difficulty;
|
let difficulty = new_block.header.total_difficulty() - head.total_difficulty;
|
||||||
println!("12345 update_network_difficulty");
|
|
||||||
self.workers.update_network_difficulty(difficulty.to_num());
|
self.workers.update_network_difficulty(difficulty.to_num());
|
||||||
println!("12345 after update_network_difficulty");
|
|
||||||
self.workers.update_network_hashrate();
|
self.workers.update_network_hashrate();
|
||||||
println!("12345 after update_network_hashrate");
|
|
||||||
|
|
||||||
// Add this new block candidate onto our list of block versions for this height
|
// Add this new block candidate onto our list of block versions for this height
|
||||||
state.current_block_versions.push(new_block);
|
state.current_block_versions.push(new_block);
|
||||||
}
|
}
|
||||||
println!("12345 resend updated block exit");
|
|
||||||
// Send this job to all connected workers
|
// Send this job to all connected workers
|
||||||
self.broadcast_job();
|
self.broadcast_job();
|
||||||
}
|
}
|
||||||
|
@ -635,93 +641,111 @@ impl Handler {
|
||||||
|
|
||||||
// ----------------------------------------
|
// ----------------------------------------
|
||||||
// Worker Factory Thread Function
|
// Worker Factory Thread Function
|
||||||
fn accept_connections(listen_addr: SocketAddr, handler: Arc<Handler>, stop_state: Arc<StopState>) {
|
async fn accept_connections(listen_addr: SocketAddr,
|
||||||
|
handler: Arc<Handler>,
|
||||||
|
stop_state: Arc<StratumStopState>) {
|
||||||
info!("Start tokio stratum server");
|
info!("Start tokio stratum server");
|
||||||
|
|
||||||
let task = async move {
|
let state_check = stop_state.clone();
|
||||||
let mut listener = TcpListener::bind(&listen_addr).await.unwrap_or_else(|_| {
|
|
||||||
panic!("Stratum: Failed to bind to listen address {}", listen_addr)
|
// let task = async move {
|
||||||
});
|
//
|
||||||
let state_socket = &stop_state.clone();
|
// };
|
||||||
println!("12345 bind complete");
|
|
||||||
loop {
|
let listener = TcpListener::bind(&listen_addr).await.unwrap_or_else(|_| {
|
||||||
println!("12345 socket starting");
|
panic!("Stratum: Failed to bind to listen address {}", listen_addr)
|
||||||
let (socket, _) = listener.accept().await.unwrap();
|
});
|
||||||
// Stop listener on node server stop.
|
let stop_socket = &stop_state.clone();
|
||||||
{
|
loop {
|
||||||
if state_socket.is_stopped() {
|
let (socket, _) = listener.accept().await.unwrap();
|
||||||
panic_any("Stopped");
|
// Stop listener on stratum stop.
|
||||||
}
|
{
|
||||||
|
if stop_socket.is_stopped() {
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
println!("12345 socket");
|
let handler = handler.clone();
|
||||||
let handler = handler.clone();
|
|
||||||
|
|
||||||
let process = || async move {
|
let process = || async move {
|
||||||
// Spawn a task to process the connection
|
// Spawn a task to process the connection
|
||||||
let (tx, mut rx) = mpsc::unbounded();
|
let (tx, mut rx) = mpsc::unbounded();
|
||||||
|
|
||||||
let worker_id = handler.workers.add_worker(tx);
|
let worker_id = handler.workers.add_worker(tx);
|
||||||
info!("Worker {} connected", worker_id);
|
info!("Worker {} connected", worker_id);
|
||||||
|
|
||||||
let framed = Framed::new(socket, LinesCodec::new());
|
let framed = Framed::new(socket, LinesCodec::new());
|
||||||
let (mut writer, mut reader) = framed.split();
|
let (mut writer, mut reader) = framed.split();
|
||||||
|
|
||||||
let h = handler.clone();
|
let h = handler.clone();
|
||||||
let read = async move {
|
let stop_read = stop_socket.clone();
|
||||||
println!("12345 r: 1");
|
let read = async move {
|
||||||
while let Some(line) = reader
|
while let Some(line) = reader
|
||||||
.try_next()
|
.try_next()
|
||||||
.await
|
.await
|
||||||
.map_err(|e| error!("error reading line: {}", e))?
|
.map_err(|e| error!("error reading line: {}", e))?
|
||||||
|
{
|
||||||
|
// Stop read on stratum stop.
|
||||||
{
|
{
|
||||||
println!("12345 r: 2: {}", line);
|
if stop_read.is_stopped() {
|
||||||
let request = serde_json::from_str(&line)
|
break;
|
||||||
.map_err(|e| println!("error serializing line: {}", e))?;
|
}
|
||||||
let resp = h.handle_rpc_requests(request, worker_id);
|
|
||||||
h.workers.send_to(worker_id, resp);
|
|
||||||
}
|
}
|
||||||
println!("12345 r: 3");
|
let request = serde_json::from_str(&line)
|
||||||
|
.map_err(|e| error!("error serializing line: {}", e))?;
|
||||||
Result::<_, ()>::Ok(())
|
let resp = h.handle_rpc_requests(request, worker_id);
|
||||||
};
|
h.workers.send_to(worker_id, resp);
|
||||||
|
}
|
||||||
let write = async move {
|
|
||||||
while let Some(line) = rx.next().await {
|
|
||||||
println!("12345 w: 1: {}", line);
|
|
||||||
writer
|
|
||||||
.send(line)
|
|
||||||
.await
|
|
||||||
.map_err(|e| println!("error writing line: {}", e))?;
|
|
||||||
}
|
|
||||||
println!("12345 w: 2");
|
|
||||||
Result::<_, ()>::Ok(())
|
|
||||||
};
|
|
||||||
|
|
||||||
let task = async move {
|
|
||||||
println!("12345 t: 1");
|
|
||||||
pin_mut!(read, write);
|
|
||||||
println!("12345 t: 2");
|
|
||||||
futures::future::select(read, write).await;
|
|
||||||
println!("12345 t: 3");
|
|
||||||
handler.workers.remove_worker(worker_id);
|
|
||||||
println!("12345 t: 4");
|
|
||||||
info!("Worker {} disconnected", worker_id);
|
|
||||||
};
|
|
||||||
tokio::spawn(task);
|
|
||||||
|
|
||||||
Result::<_, ()>::Ok(())
|
Result::<_, ()>::Ok(())
|
||||||
};
|
};
|
||||||
println!("12345 run process");
|
|
||||||
|
|
||||||
let _ = (process)().await;
|
let stop_write = stop_socket.clone();
|
||||||
println!("12345 after process");
|
let write = async move {
|
||||||
}
|
while let Some(line) = rx.next().await {
|
||||||
};
|
// Stop write on stratum stop.
|
||||||
let rt = Runtime::new().unwrap();
|
{
|
||||||
rt.block_on(task);
|
if stop_write.is_stopped() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
writer
|
||||||
|
.send(line)
|
||||||
|
.await
|
||||||
|
.map_err(|e| error!("error writing line: {}", e))?;
|
||||||
|
}
|
||||||
|
Result::<_, ()>::Ok(())
|
||||||
|
};
|
||||||
|
|
||||||
|
let task = async move {
|
||||||
|
pin_mut!(read, write);
|
||||||
|
futures::future::select(read, write).await;
|
||||||
|
handler.workers.remove_worker(worker_id);
|
||||||
|
info!("Worker {} disconnected", worker_id);
|
||||||
|
};
|
||||||
|
tokio::spawn(task);
|
||||||
|
|
||||||
|
Result::<_, ()>::Ok(())
|
||||||
|
};
|
||||||
|
|
||||||
|
let _ = (process)().await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn check_stop_state(stop_state: Arc<StratumStopState>, listen_addr: SocketAddr) {
|
||||||
|
loop {
|
||||||
|
// Ping stratum socket on stop to handle TcpListener unbind.
|
||||||
|
if stop_state.is_stopped() {
|
||||||
|
thread::spawn(move || {
|
||||||
|
let _ = TcpStream::connect(listen_addr).unwrap();
|
||||||
|
});
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
thread::sleep(Duration::from_millis(1000));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// ----------------------------------------
|
// ----------------------------------------
|
||||||
// Worker Object - a connected stratum client - a miner, pool, proxy, etc...
|
// Worker Object - a connected stratum client - a miner, pool, proxy, etc...
|
||||||
|
|
||||||
|
@ -910,7 +934,9 @@ impl StratumServer {
|
||||||
/// existing chain anytime required and sending that to the connected
|
/// existing chain anytime required and sending that to the connected
|
||||||
/// stratum miner, proxy, or pool, and accepts full solutions to
|
/// stratum miner, proxy, or pool, and accepts full solutions to
|
||||||
/// be submitted.
|
/// be submitted.
|
||||||
pub fn run_loop(&mut self, proof_size: usize, sync_state: Arc<SyncState>, stop_state: Arc<StopState>) {
|
pub fn run_loop(&mut self, proof_size: usize,
|
||||||
|
sync_state: Arc<SyncState>,
|
||||||
|
stop_state: Arc<StratumStopState>) {
|
||||||
info!(
|
info!(
|
||||||
"(Server ID: {}) Starting stratum server with proof_size = {}",
|
"(Server ID: {}) Starting stratum server with proof_size = {}",
|
||||||
self.id, proof_size
|
self.id, proof_size
|
||||||
|
@ -927,33 +953,55 @@ impl StratumServer {
|
||||||
.expect("Stratum: Incorrect address ");
|
.expect("Stratum: Incorrect address ");
|
||||||
|
|
||||||
let handler = Arc::new(Handler::from_stratum(&self));
|
let handler = Arc::new(Handler::from_stratum(&self));
|
||||||
let h = handler.clone();
|
|
||||||
|
|
||||||
let s_state = stop_state.clone();
|
|
||||||
|
|
||||||
let _listener_th = thread::spawn(move || {
|
|
||||||
accept_connections(listen_addr, h, s_state);
|
|
||||||
});
|
|
||||||
|
|
||||||
// We have started
|
|
||||||
{
|
|
||||||
let mut stratum_stats = self.stratum_stats.write();
|
|
||||||
stratum_stats.is_running = true;
|
|
||||||
stratum_stats.edge_bits = (global::min_edge_bits() + 1) as u16;
|
|
||||||
stratum_stats.minimum_share_difficulty = self.config.minimum_share_difficulty;
|
|
||||||
}
|
|
||||||
|
|
||||||
warn!(
|
|
||||||
"Stratum server started on {}",
|
|
||||||
self.config.stratum_server_addr.clone().unwrap()
|
|
||||||
);
|
|
||||||
|
|
||||||
// Initial Loop. Waiting node complete syncing
|
// Initial Loop. Waiting node complete syncing
|
||||||
while self.sync_state.is_syncing() {
|
while self.sync_state.is_syncing() {
|
||||||
thread::sleep(Duration::from_millis(50));
|
thread::sleep(Duration::from_millis(50));
|
||||||
}
|
}
|
||||||
|
|
||||||
handler.run(&self.config, &self.tx_pool, stop_state.clone());
|
let h = handler.clone();
|
||||||
|
|
||||||
|
let task_stop_state = stop_state.clone();
|
||||||
|
|
||||||
|
let task_config = self.config.clone();
|
||||||
|
let task_tx_pool = self.tx_pool.clone();
|
||||||
|
let task_stats = self.stratum_stats.clone();
|
||||||
|
let _ = thread::spawn(move || {
|
||||||
|
let rt = Runtime::new().unwrap();
|
||||||
|
|
||||||
|
let main_stop_state = task_stop_state.clone();
|
||||||
|
let main_task = async move {
|
||||||
|
handler.run(&task_config, &task_tx_pool, main_stop_state);
|
||||||
|
};
|
||||||
|
// Run main loop.
|
||||||
|
rt.spawn(main_task);
|
||||||
|
|
||||||
|
// Run task to periodically check stop state.
|
||||||
|
rt.spawn(check_stop_state(task_stop_state.clone(), listen_addr));
|
||||||
|
// Run connections listener and block thread till it will exit on stop.
|
||||||
|
rt.block_on(accept_connections(listen_addr, h, task_stop_state.clone()));
|
||||||
|
|
||||||
|
// We have stopped.
|
||||||
|
{
|
||||||
|
let mut stratum_stats = task_stats.write();
|
||||||
|
stratum_stats.is_running = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
task_stop_state.reset();
|
||||||
|
});
|
||||||
|
|
||||||
|
warn!(
|
||||||
|
"Stratum server started on {}",
|
||||||
|
self.config.stratum_server_addr.clone().unwrap()
|
||||||
|
);
|
||||||
|
|
||||||
|
// We have started.
|
||||||
|
{
|
||||||
|
let mut stratum_stats = self.stratum_stats.write();
|
||||||
|
stratum_stats.is_running = true;
|
||||||
|
stratum_stats.edge_bits = (global::min_edge_bits() + 1) as u16;
|
||||||
|
stratum_stats.minimum_share_difficulty = self.config.minimum_share_difficulty;
|
||||||
|
}
|
||||||
} // fn run_loop()
|
} // fn run_loop()
|
||||||
} // StratumServer
|
} // StratumServer
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue