stratum: fix mining

This commit is contained in:
ardocrat 2024-08-01 13:08:37 +03:00
parent 355e4173db
commit 03d2336363
6 changed files with 129 additions and 171 deletions

3
Cargo.lock generated
View file

@ -3856,8 +3856,9 @@ dependencies = [
"tls-api",
"tls-api-native-tls",
"tls-api-openssl",
"tokio 0.2.25",
"tokio 1.38.0",
"tokio-util 0.7.11",
"tokio-util 0.2.0",
"toml 0.8.14",
"tor-config",
"tor-hscrypto",

View file

@ -94,7 +94,8 @@ tls-api = "0.9.0"
tls-api-native-tls = "0.9.0"
## stratum server
tokio-util = { version = "0.7.8", features = ["codec"] }
tokio-old = {version = "0.2", features = ["full"], package = "tokio" }
tokio-util-old = { version = "0.2", features = ["codec"], package = "tokio-util" }
[target.'cfg(all(not(target_os = "windows"), not(target_os = "android")))'.dependencies]
eye = { version = "0.5.0", default-features = false }

View file

@ -134,7 +134,7 @@ impl StratumSetup {
ui.vertical_centered(|ui| {
// Show loading indicator or controls to start/stop stratum server.
if Node::get_sync_status().unwrap() == SyncStatus::NoSync &&
if Node::get_sync_status().unwrap_or(SyncStatus::Initial) == SyncStatus::NoSync &&
self.is_port_available && self.wallet_name.is_some() {
if Node::is_stratum_starting() || Node::is_stratum_stopping() {
ui.vertical_centered(|ui| {
@ -147,6 +147,8 @@ impl StratumSetup {
let disable_text = format!("{} {}", POWER, t!("network_settings.disable"));
View::action_button(ui, disable_text, || {
Node::stop_stratum();
let (ip, port) = NodeConfig::get_stratum_address();
self.is_port_available = NodeConfig::is_stratum_port_available(&ip, &port);
});
ui.add_space(6.0);
} else {

View file

@ -84,7 +84,7 @@ pub fn get_block(
while let Err(e) = result {
let mut new_key_id = key_id.to_owned();
match e {
self::Error::Chain(c) => match c {
Error::Chain(c) => match c {
grin_chain::Error::DuplicateCommitment(_) => {
debug!(
"Duplicate commit for potential coinbase detected. Trying next derivation."
@ -93,10 +93,10 @@ pub fn get_block(
new_key_id = None;
}
_ => {
error!("Chain Error: {}", c);
error!("Chain Error: {:?}", c);
}
},
self::Error::WalletComm(_) => {
Error::WalletComm(_) => {
error!(
"Error building new block: Can't connect to wallet listener at {:?}; will retry",
wallet_listener_url.as_ref().unwrap()
@ -233,11 +233,11 @@ fn burn_reward(block_fees: BlockFees) -> Result<(Output, TxKernel, BlockFees), E
fn get_coinbase(
wallet_listener_url: Option<String>,
block_fees: BlockFees,
) -> Result<(core::Output, core::TxKernel, BlockFees), Error> {
match wallet_listener_url {
) -> Result<(Output, TxKernel, BlockFees), Error> {
return match wallet_listener_url {
None => {
// Burn it
return burn_reward(block_fees);
burn_reward(block_fees)
}
Some(wallet_listener_url) => {
let res = create_coinbase(&wallet_listener_url, &block_fees)?;
@ -245,12 +245,12 @@ fn get_coinbase(
let kernel = res.kernel;
let key_id = res.key_id;
let block_fees = BlockFees {
key_id: key_id,
key_id,
..block_fees
};
debug!("get_coinbase: {:?}", block_fees);
return Ok((output, kernel, block_fees));
Ok((output, kernel, block_fees))
}
}
}
@ -273,7 +273,7 @@ fn create_coinbase(dest: &str, block_fees: &BlockFees) -> Result<CbData, Error>
let timeout = grin_api::client::TimeOut::default();
let res: String = grin_api::client::send_request(req, timeout).map_err(|e| {
let report = format!(
"Failed to get coinbase from {}. Is the wallet listening? {}",
"Failed to get coinbase from {}. Is the wallet listening? {:?}",
dest, e
);
error!("{}", report);

View file

@ -616,10 +616,15 @@ pub fn start_stratum_mining_server(server: &Server, config: StratumServerConfig)
);
let stop_state = NODE_STATE.stratum_stop_state.clone();
stop_state.reset();
let _ = thread::Builder::new()
.name("stratum_server".to_string())
.spawn(move || {
let server_state = stop_state.clone();
thread::spawn(move || {
stratum_server.run_loop(proof_size, sync_state, stop_state);
server_state.reset();
// Reset stratum stats.
{
let mut w_stratum_stats = NODE_STATE.stratum_stats.write();
*w_stratum_stats = StratumStats::default();
}
});
}

View file

@ -14,25 +14,23 @@
//! Mining Stratum Server
use futures::channel::mpsc;
use futures::pin_mut;
use futures::{SinkExt, StreamExt, TryStreamExt};
use tokio::net::TcpListener;
use tokio::runtime::Runtime;
use tokio_util::codec::{Framed, LinesCodec};
use tokio_old::net::TcpListener;
use tokio_old::runtime::Runtime;
use tokio_util_old::codec::{Framed, LinesCodec};
use grin_util::RwLock;
use chrono::prelude::Utc;
use serde_json::Value;
use std::collections::HashMap;
use std::net::{SocketAddr, TcpStream};
use std::panic::panic_any;
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::time::{Duration, SystemTime};
use futures::future::{abortable, AbortHandle};
use grin_chain::{self, SyncState};
use grin_servers::common::stats::{StratumStats, WorkerStats};
@ -40,18 +38,18 @@ use grin_servers::common::types::StratumServerConfig;
use grin_core::consensus::graph_weight;
use grin_core::core::hash::Hashed;
use grin_core::core::Block;
use grin_core::global;
use grin_core::global::min_edge_bits;
use grin_core::{pow, ser};
use crate::node::mine_block;
use grin_util::ToHex;
use grin_servers::ServerTxPool;
use log::{debug, error, info, warn};
use log::{debug, error};
use serde_derive::{Deserialize, Serialize};
use crate::node::mine_block::get_block;
use crate::wallet::WalletConfig;
type Tx = mpsc::UnboundedSender<String>;
// ----------------------------------------
// http://www.jsonrpc.org/specification
// RPC Methods
@ -144,7 +142,7 @@ impl<T> From<T> for RpcError
T: std::error::Error,
{
fn from(e: T) -> Self {
error!("Received unhandled error: {}", e);
println!("Received unhandled error: {}", e);
RpcError::internal_error()
}
}
@ -354,7 +352,7 @@ impl Handler {
// Build a JobTemplate from a BlockHeader and return JSON
let job_template = self.build_block_template();
let response = serde_json::to_value(&job_template).unwrap();
debug!(
println!(
"(Server ID: {}) sending block {} with id {} to single worker",
self.id, job_template.height, job_template.job_id,
);
@ -408,7 +406,7 @@ impl Handler {
|| b.is_none()
{
// Return error status
error!(
println!(
"(Server ID: {}) Share at height {}, edge_bits {}, nonce {}, job_id {} submitted too late",
self.id, params.height, params.edge_bits, params.nonce, params.job_id,
);
@ -428,7 +426,7 @@ impl Handler {
if !b.header.pow.is_primary() && !b.header.pow.is_secondary() {
// Return error status
error!(
println!(
"(Server ID: {}) Failed to validate solution at height {}, hash {}, edge_bits {}, nonce {}, job_id {}: cuckoo size too small",
self.id, params.height, b.hash(), params.edge_bits, params.nonce, params.job_id,
);
@ -445,7 +443,7 @@ impl Handler {
// If the difficulty is too low its an error
if unscaled_share_difficulty < state.minimum_share_difficulty {
// Return error status
error!(
println!(
"(Server ID: {}) Share at height {}, hash {}, edge_bits {}, nonce {}, job_id {} rejected due to low difficulty: {}/{}",
self.id, params.height, b.hash(), params.edge_bits, params.nonce, params.job_id, unscaled_share_difficulty, state.minimum_share_difficulty,
);
@ -460,8 +458,8 @@ impl Handler {
let res = self.chain.process_block(b.clone(), grin_chain::Options::MINE);
if let Err(e) = res {
// Return error status
error!(
"(Server ID: {}) Failed to validate solution at height {}, hash {}, edge_bits {}, nonce {}, job_id {}, {}",
println!(
"(Server ID: {}) Failed to validate solution at height {}, hash {}, edge_bits {}, nonce {}, job_id {}, {:?}",
self.id,
params.height,
b.hash(),
@ -480,7 +478,7 @@ impl Handler {
self.workers.stratum_stats.write().blocks_found += 1;
// Log message to make it obvious we found a block
let stats = self.workers.get_stats(worker_id)?;
warn!(
println!(
"(Server ID: {}) Solution Found for block {}, hash {} - Yay!!! Worker ID: {}, blocks found: {}, shares: {}",
self.id, params.height,
b.hash(),
@ -493,7 +491,7 @@ impl Handler {
let res = pow::verify_size(&b.header);
if res.is_err() {
// Return error status
error!(
println!(
"(Server ID: {}) Failed to validate share at height {}, hash {}, edge_bits {}, nonce {}, job_id {}. {:?}",
self.id,
params.height,
@ -516,7 +514,7 @@ impl Handler {
Some(login) => login,
};
info!(
println!(
"(Server ID: {}) Got share at height {}, hash {}, edge_bits {}, nonce {}, job_id {}, difficulty {}/{}, submitted by {}",
self.id,
b.header.height,
@ -531,7 +529,7 @@ impl Handler {
self.workers
.update_stats(worker_id, |worker_stats| worker_stats.num_accepted += 1);
let submit_response = if share_is_block {
format!("block found - {}", b.hash().to_hex())
format!("blockfound - {}", b.hash().to_hex())
} else {
"ok".to_string()
};
@ -542,6 +540,7 @@ impl Handler {
} // handle submit a solution
fn broadcast_job(&self) {
debug!("broadcast job");
// Package new block into RpcRequest
let job_template = self.build_block_template();
let job_template_json = serde_json::to_string(&job_template).unwrap();
@ -561,7 +560,8 @@ impl Handler {
self.workers.broadcast(job_request_json);
}
pub fn run(&self, config: &StratumServerConfig,
pub fn run(&self,
config: &StratumServerConfig,
tx_pool: &ServerTxPool,
stop_state: Arc<StratumStopState>) {
debug!("Run main loop");
@ -569,11 +569,10 @@ impl Handler {
let mut head = self.chain.head().unwrap();
let mut current_hash = head.prev_block_h;
loop {
// Stop main loop on stratum stop.
if stop_state.is_stopped() {
panic_any("Stopped");
thread::sleep(Duration::from_millis(1500));
break;
}
// get the latest chain state
head = self.chain.head().unwrap();
let latest_hash = head.last_block_h;
@ -585,6 +584,7 @@ impl Handler {
&& self.workers.count() > 0
{
{
debug!("resend updated block");
let mut state = self.current_state.write();
let wallet_listener_url = if !config.burn_reward {
if let Ok(id) = config.wallet_listener_url.parse::<i64>() {
@ -604,7 +604,7 @@ impl Handler {
let clear_blocks = current_hash != latest_hash;
// Build the new block (version)
let (new_block, block_fees) = mine_block::get_block(
let (new_block, block_fees) = get_block(
&self.chain,
tx_pool,
state.current_key_id.clone(),
@ -651,104 +651,79 @@ impl Handler {
// ----------------------------------------
// Worker Factory Thread Function
async fn accept_connections(listen_addr: SocketAddr,
handler: Arc<Handler>,
stop_state: Arc<StratumStopState>) {
info!("Start tokio stratum server");
fn accept_connections(listen_addr: SocketAddr,
handler: Arc<Handler>,
stop_state: Arc<StratumStopState>) {
debug!("Start tokio stratum server");
let task = async move {
let mut listener = TcpListener::bind(&listen_addr).await.unwrap_or_else(|_| {
panic!("Stratum: Failed to bind to listen address {}", listen_addr)
});
let server = listener
.incoming()
.filter_map(|s| async { s.map_err(|e| error!("accept error = {:?}", e)).ok() })
.for_each(move |socket| {
let handler = handler.clone();
async move {
// Spawn a task to process the connection
let (tx, mut rx) = mpsc::unbounded();
let _state_check = stop_state.clone();
let worker_id = handler.workers.add_worker(tx);
debug!("Worker {} connected", worker_id);
// let task = async move {
//
// };
let framed = Framed::new(socket, LinesCodec::new());
let (mut writer, mut reader) = framed.split();
let listener = TcpListener::bind(&listen_addr).await.unwrap_or_else(|_| {
panic!("Stratum: Failed to bind to listen address {}", listen_addr)
});
let stop_socket = &stop_state.clone();
loop {
let (socket, _) = listener.accept().await.unwrap();
// Stop listener on stratum stop.
{
if stop_socket.is_stopped() {
break;
}
}
let handler = handler.clone();
let process = || async move {
// Spawn a task to process the connection
let (tx, mut rx) = mpsc::unbounded();
let worker_id = handler.workers.add_worker(tx);
info!("Worker {} connected", worker_id);
let framed = Framed::new(socket, LinesCodec::new());
let (mut writer, mut reader) = framed.split();
let h = handler.clone();
let stop_read = stop_socket.clone();
let read = async move {
while let Some(line) = reader
.try_next()
.await
.map_err(|e| error!("error reading line: {}", e))?
{
// Stop read on stratum stop.
{
if stop_read.is_stopped() {
break;
let h = handler.clone();
let read = async move {
while let Some(line) = reader
.try_next()
.await
.map_err(|e| debug!("error reading line: {}", e))?
{
let request = serde_json::from_str(&line)
.map_err(|e| debug!("error serializing line: {}", e))?;
let resp = h.handle_rpc_requests(request, worker_id);
h.workers.send_to(worker_id, resp);
}
}
let request = serde_json::from_str(&line)
.map_err(|e| error!("error serializing line: {}", e))?;
let resp = h.handle_rpc_requests(request, worker_id);
h.workers.send_to(worker_id, resp);
}
Result::<_, ()>::Ok(())
};
Result::<_, ()>::Ok(())
};
let stop_write = stop_socket.clone();
let write = async move {
while let Some(line) = rx.next().await {
// Stop write on stratum stop.
{
if stop_write.is_stopped() {
break;
let write = async move {
while let Some(line) = rx.next().await {
writer
.send(line)
.await
.map_err(|e| debug!("error writing line: {}", e))?;
}
}
writer
.send(line)
.await
.map_err(|e| error!("error writing line: {}", e))?;
Result::<_, ()>::Ok(())
};
let task = async move {
pin_mut!(read, write);
futures::future::select(read, write).await;
handler.workers.remove_worker(worker_id);
debug!("Worker {} disconnected", worker_id);
};
tokio_old::spawn(task);
}
Result::<_, ()>::Ok(())
};
});
server.await
};
let task = async move {
pin_mut!(read, write);
futures::future::select(read, write).await;
handler.workers.remove_worker(worker_id);
info!("Worker {} disconnected", worker_id);
};
tokio::spawn(task);
Result::<_, ()>::Ok(())
};
let _ = (process)().await;
}
let mut rt = Runtime::new().unwrap();
let (task, handle) = abortable(task);
rt.spawn(check_stop_state(stop_state, handle));
rt.block_on(task).unwrap();
}
async fn check_stop_state(stop_state: Arc<StratumStopState>, listen_addr: SocketAddr) {
async fn check_stop_state(stop_state: Arc<StratumStopState>, handle: AbortHandle) {
loop {
// Ping stratum socket on stop to handle TcpListener unbind.
if stop_state.is_stopped() {
thread::spawn(move || {
let _ = TcpStream::connect(listen_addr).unwrap();
});
handle.abort();
break;
}
thread::sleep(Duration::from_millis(1000));
@ -944,10 +919,11 @@ impl StratumServer {
/// existing chain anytime required and sending that to the connected
/// stratum miner, proxy, or pool, and accepts full solutions to
/// be submitted.
pub fn run_loop(&mut self, proof_size: usize,
pub fn run_loop(&mut self,
proof_size: usize,
sync_state: Arc<SyncState>,
stop_state: Arc<StratumStopState>) {
info!(
debug!(
"(Server ID: {}) Starting stratum server with proof_size = {}",
self.id, proof_size
);
@ -963,55 +939,28 @@ impl StratumServer {
.expect("Stratum: Incorrect address ");
let handler = Arc::new(Handler::from_stratum(&self));
let h = handler.clone();
let stop_socket = stop_state.clone();
let check_state = stop_socket.clone();
let _listener_th = thread::spawn(move || {
accept_connections(listen_addr, h, stop_socket);
});
// We have started
{
let mut stratum_stats = self.stratum_stats.write();
stratum_stats.is_running = true;
stratum_stats.edge_bits = (min_edge_bits() + 1) as u16;
stratum_stats.minimum_share_difficulty = self.config.minimum_share_difficulty;
}
// Initial Loop. Waiting node complete syncing
while self.sync_state.is_syncing() {
thread::sleep(Duration::from_millis(50));
}
let h = handler.clone();
let task_stop_state = stop_state.clone();
let task_config = self.config.clone();
let task_tx_pool = self.tx_pool.clone();
let task_stats = self.stratum_stats.clone();
let _ = thread::spawn(move || {
let rt = Runtime::new().unwrap();
let main_stop_state = task_stop_state.clone();
let main_task = async move {
handler.run(&task_config, &task_tx_pool, main_stop_state);
};
// Run main loop.
rt.spawn(main_task);
// Run task to periodically check stop state.
rt.spawn(check_stop_state(task_stop_state.clone(), listen_addr));
// Run connections listener and block thread till it will exit on stop.
rt.block_on(accept_connections(listen_addr, h, task_stop_state.clone()));
// We have stopped.
{
let mut stratum_stats = task_stats.write();
stratum_stats.is_running = false;
}
task_stop_state.reset();
});
warn!(
"Stratum server started on {}",
self.config.stratum_server_addr.clone().unwrap()
);
// We have started.
{
let mut stratum_stats = self.stratum_stats.write();
stratum_stats.is_running = true;
stratum_stats.edge_bits = (global::min_edge_bits() + 1) as u16;
stratum_stats.minimum_share_difficulty = self.config.minimum_share_difficulty;
}
handler.run(&self.config, &self.tx_pool, check_state);
} // fn run_loop()
} // StratumServer