mirror of
https://github.com/mimblewimble/grin.git
synced 2025-01-21 03:21:08 +03:00
minimal stratum server (#937)
* Fix issue where we have no metadata for a block (#938)
when restarting node before initial sync completed
* Avoid double-locking on add eviction. Fixes #936
* Fix 33c5a983
* Add support for DNS Seed (#940)
* Add support for DNS Seed
* Add port
* Add seed.grin-tech.org
* Remove duplicate IPs
* minimal stratum server
* Modifications for review comments. Move stratum test into its own file, move get_block() into its own rust module, use pool and chain only rather than the entire Miner object
* rustfmt
* cleanup
* cleanup
* Introduce extending_readonly to simplify a forcing and cancelling rollbacks (#945)
readonly views of the txhashset
* Add DNS Seed and make DNSSeed default (#942)
* Add dns seed seeding type
* Add grin-seed.owncrypto.de and make DNSSeed default
* Add email address for each DNS Seed
* [WIP] Core PMMR and API updates to support wallet restore (#950)
* update pmmr to get batch of elements by insertion position
* update pmmr to get batch of elements by insertion position
* add api + chain calls to get traversed outputs back out
* add api + chain calls to get traversed outputs back out
* first pass getting wallet restore to work again with updated utxo-walking api
* Update simulation.md
* Fix Bus Error (core dumped) when validating fast sync txhashset (#956)
This PR fixes #953 by introducing a lock for txhashet_write. It's not enough
to synchronize access to in memory data, files also needs to be protected, so
a general txhashset lock was introduced.
* refactor grin crate into separate modules (#955)
* Add total kernel offset to block api (#954)
* minimal stratum server
* Modifications for review comments. Move stratum test into its own file, move get_block() into its own rust module, use pool and chain only rather than the entire Miner object
* rustfmt
* cleanup
* cleanup
* Merge with grin_grin -> servers code reorg
* Merge with grin_grin -> servers code reorg
* add stratum server stats
This commit is contained in:
parent
8f3fbe632f
commit
bcc8f68f52
13 changed files with 1124 additions and 195 deletions
10
grin.toml
10
grin.toml
|
@ -174,6 +174,16 @@ burn_reward = false
|
|||
#slow_down_in_millis = 30
|
||||
|
||||
|
||||
####################################
|
||||
### STRATUM SERVER CONFIGURATION ###
|
||||
####################################
|
||||
|
||||
#flag whether stratum server is enabled
|
||||
enable_stratum_server = false
|
||||
|
||||
#what port and address for the stratum server to listen on
|
||||
stratum_server_addr = "127.0.0.1:13416"
|
||||
|
||||
#########################################
|
||||
### CUCKOO MINER PLUGIN CONFIGURATION ###
|
||||
#########################################
|
||||
|
|
|
@ -63,6 +63,12 @@ pub struct MinerConfig {
|
|||
/// a testing attribute for the time being that artifically slows down the
|
||||
/// mining loop by adding a sleep to the thread
|
||||
pub slow_down_in_millis: Option<u64>,
|
||||
|
||||
/// Run a stratum mining server rather than mining locally in-process
|
||||
pub enable_stratum_server: bool,
|
||||
|
||||
/// If enabled, the address and port to listen on
|
||||
pub stratum_server_addr: Option<String>,
|
||||
}
|
||||
|
||||
impl Default for MinerConfig {
|
||||
|
@ -76,6 +82,8 @@ impl Default for MinerConfig {
|
|||
burn_reward: false,
|
||||
slow_down_in_millis: Some(0),
|
||||
attempt_time_per_block: 2,
|
||||
enable_stratum_server: false,
|
||||
stratum_server_addr: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,6 +15,8 @@ serde = "1"
|
|||
serde_derive = "1"
|
||||
serde_json = "1"
|
||||
time = "0.1"
|
||||
bufstream = "~0.1"
|
||||
jsonrpc-core = "~4.0"
|
||||
|
||||
grin_api = { path = "../api" }
|
||||
grin_chain = { path = "../chain" }
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::time::SystemTime;
|
||||
|
||||
use chain;
|
||||
use p2p;
|
||||
|
@ -30,6 +31,8 @@ pub struct ServerStateInfo {
|
|||
pub awaiting_peers: Arc<AtomicBool>,
|
||||
/// Mining stats
|
||||
pub mining_stats: Arc<RwLock<MiningStats>>,
|
||||
/// Stratum stats
|
||||
pub stratum_stats: Arc<RwLock<StratumStats>>,
|
||||
}
|
||||
|
||||
impl Default for ServerStateInfo {
|
||||
|
@ -37,6 +40,7 @@ impl Default for ServerStateInfo {
|
|||
ServerStateInfo {
|
||||
awaiting_peers: Arc::new(AtomicBool::new(false)),
|
||||
mining_stats: Arc::new(RwLock::new(MiningStats::default())),
|
||||
stratum_stats: Arc::new(RwLock::new(StratumStats::default())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -56,6 +60,8 @@ pub struct ServerStats {
|
|||
pub awaiting_peers: bool,
|
||||
/// Handle to current mining stats
|
||||
pub mining_stats: MiningStats,
|
||||
/// Handle to current stratum server stats
|
||||
pub stratum_stats: StratumStats,
|
||||
/// Peer stats
|
||||
pub peer_stats: Vec<PeerStats>,
|
||||
/// Difficulty calculation statistics
|
||||
|
@ -82,6 +88,44 @@ pub struct MiningStats {
|
|||
pub device_stats: Option<Vec<Vec<pow::cuckoo_miner::CuckooMinerDeviceStats>>>,
|
||||
}
|
||||
|
||||
/// Struct to return relevant information about stratum workers
|
||||
#[derive(Clone, Serialize, Debug)]
|
||||
pub struct WorkerStats {
|
||||
/// Unique ID for this worker
|
||||
pub id: String,
|
||||
/// whether stratum worker is currently connected
|
||||
pub is_connected: bool,
|
||||
/// Timestamp of most recent communication with this worker
|
||||
pub last_seen: SystemTime,
|
||||
/// pow difficulty this worker is using
|
||||
pub pow_difficulty: u64,
|
||||
/// number of valid shares submitted
|
||||
pub num_accepted: u64,
|
||||
/// number of invalid shares submitted
|
||||
pub num_rejected: u64,
|
||||
/// number of shares submitted too late
|
||||
pub num_stale: u64,
|
||||
}
|
||||
|
||||
/// Struct to return relevant information about the stratum server
|
||||
#[derive(Clone, Serialize, Debug)]
|
||||
pub struct StratumStats {
|
||||
/// whether stratum server is enabled
|
||||
pub is_enabled: bool,
|
||||
/// whether stratum server is running
|
||||
pub is_running: bool,
|
||||
/// Number of connected workers
|
||||
pub num_workers: usize,
|
||||
/// what block height we're mining at
|
||||
pub block_height: u64,
|
||||
/// current network difficulty we're working on
|
||||
pub network_difficulty: u64,
|
||||
/// cuckoo size used for mining
|
||||
pub cuckoo_size: u16,
|
||||
/// Individual worker status
|
||||
pub worker_stats: Vec<WorkerStats>,
|
||||
}
|
||||
|
||||
/// Stats on the last WINDOW blocks and the difficulty calculation
|
||||
#[derive(Clone)]
|
||||
pub struct DiffStats {
|
||||
|
@ -167,3 +211,31 @@ impl Default for MiningStats {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for WorkerStats {
|
||||
fn default() -> WorkerStats {
|
||||
WorkerStats {
|
||||
id: String::from("unknown"),
|
||||
is_connected: false,
|
||||
last_seen: SystemTime::now(),
|
||||
pow_difficulty: 0,
|
||||
num_accepted: 0,
|
||||
num_rejected: 0,
|
||||
num_stale: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for StratumStats {
|
||||
fn default() -> StratumStats {
|
||||
StratumStats {
|
||||
is_enabled: false,
|
||||
is_running: false,
|
||||
num_workers: 0,
|
||||
block_height: 0,
|
||||
network_difficulty: 0,
|
||||
cuckoo_size: 0,
|
||||
worker_stats: Vec::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@ use core::core::target::Difficulty;
|
|||
use core::core::hash::Hashed;
|
||||
use grin::dandelion_monitor;
|
||||
use mining::miner;
|
||||
use mining::stratumserver;
|
||||
use p2p;
|
||||
use pool;
|
||||
use grin::seed;
|
||||
|
@ -67,12 +68,20 @@ impl Server {
|
|||
{
|
||||
let mut mining_config = config.mining_config.clone();
|
||||
let serv = Arc::new(Server::new(config)?);
|
||||
|
||||
if mining_config.as_mut().unwrap().enable_mining {
|
||||
{
|
||||
let mut mining_stats = serv.state_info.mining_stats.write().unwrap();
|
||||
mining_stats.is_enabled = true;
|
||||
}
|
||||
serv.start_miner(mining_config.unwrap());
|
||||
serv.start_miner(mining_config.clone().unwrap());
|
||||
}
|
||||
if mining_config.as_mut().unwrap().enable_stratum_server {
|
||||
{
|
||||
let mut stratum_stats = serv.state_info.stratum_stats.write().unwrap();
|
||||
stratum_stats.is_enabled = true;
|
||||
}
|
||||
serv.start_stratum_server(mining_config.clone().unwrap());
|
||||
}
|
||||
|
||||
info_callback(serv.clone());
|
||||
|
@ -275,6 +284,29 @@ impl Server {
|
|||
});
|
||||
}
|
||||
|
||||
/// Start a minimal "stratum" mining service on a separate thread
|
||||
pub fn start_stratum_server(&self, config: pow::types::MinerConfig) {
|
||||
let cuckoo_size = global::sizeshift();
|
||||
let proof_size = global::proofsize();
|
||||
let currently_syncing = self.currently_syncing.clone();
|
||||
|
||||
let mut stratum_server = stratumserver::StratumServer::new(
|
||||
config.clone(),
|
||||
self.chain.clone(),
|
||||
self.tx_pool.clone(),
|
||||
);
|
||||
let stratum_stats = self.state_info.stratum_stats.clone();
|
||||
let _ = thread::Builder::new()
|
||||
.name("stratum_server".to_string())
|
||||
.spawn(move || {
|
||||
let secs_5 = time::Duration::from_secs(5);
|
||||
while currently_syncing.load(Ordering::Relaxed) {
|
||||
thread::sleep(secs_5);
|
||||
}
|
||||
stratum_server.run_loop(config.clone(), stratum_stats, cuckoo_size as u32, proof_size);
|
||||
});
|
||||
}
|
||||
|
||||
/// The chain head
|
||||
pub fn head(&self) -> chain::Tip {
|
||||
self.chain.head().unwrap()
|
||||
|
@ -292,6 +324,7 @@ impl Server {
|
|||
/// consumers
|
||||
pub fn get_server_stats(&self) -> Result<ServerStats, Error> {
|
||||
let mining_stats = self.state_info.mining_stats.read().unwrap().clone();
|
||||
let stratum_stats = self.state_info.stratum_stats.read().unwrap().clone();
|
||||
let awaiting_peers = self.state_info.awaiting_peers.load(Ordering::Relaxed);
|
||||
|
||||
// Fill out stats on our current difficulty calculation
|
||||
|
@ -359,6 +392,7 @@ impl Server {
|
|||
is_syncing: self.currently_syncing.load(Ordering::Relaxed),
|
||||
awaiting_peers: awaiting_peers,
|
||||
mining_stats: mining_stats,
|
||||
stratum_stats: stratum_stats,
|
||||
peer_stats: peer_stats,
|
||||
diff_stats: diff_stats,
|
||||
})
|
||||
|
|
|
@ -21,8 +21,10 @@
|
|||
#![deny(unused_mut)]
|
||||
#![warn(missing_docs)]
|
||||
|
||||
extern crate bufstream;
|
||||
extern crate hyper;
|
||||
extern crate itertools;
|
||||
extern crate jsonrpc_core;
|
||||
extern crate rand;
|
||||
extern crate serde;
|
||||
#[macro_use]
|
||||
|
|
206
servers/src/mining/mine_block.rs
Normal file
206
servers/src/mining/mine_block.rs
Normal file
|
@ -0,0 +1,206 @@
|
|||
// Copyright 2018 The Grin Developers
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Build a block to mine: gathers transactions from the pool, assembles
|
||||
//! them into a block and returns it.
|
||||
|
||||
use std::thread;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use time;
|
||||
use std::time::Duration;
|
||||
use rand::{self, Rng};
|
||||
|
||||
use chain;
|
||||
use pool;
|
||||
use core::consensus;
|
||||
use core::core;
|
||||
use core::core::Transaction;
|
||||
use core::ser;
|
||||
use keychain::{Identifier, Keychain};
|
||||
use wallet;
|
||||
use wallet::BlockFees;
|
||||
use util;
|
||||
use util::LOGGER;
|
||||
use common::types::Error;
|
||||
use common::adapters::PoolToChainAdapter;
|
||||
|
||||
// Ensure a block suitable for mining is built and returned
|
||||
// If a wallet listener URL is not provided the reward will be "burnt"
|
||||
// Warning: This call does not return until/unless a new block can be built
|
||||
pub fn get_block(
|
||||
chain: &Arc<chain::Chain>,
|
||||
tx_pool: &Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
|
||||
key_id: Option<Identifier>,
|
||||
max_tx: u32,
|
||||
wallet_listener_url: Option<String>,
|
||||
) -> (core::Block, BlockFees) {
|
||||
// get the latest chain state and build a block on top of it
|
||||
let mut result = build_block(
|
||||
chain,
|
||||
tx_pool,
|
||||
key_id.clone(),
|
||||
max_tx,
|
||||
wallet_listener_url.clone(),
|
||||
);
|
||||
while let Err(e) = result {
|
||||
match e {
|
||||
self::Error::Chain(chain::Error::DuplicateCommitment(_)) => {
|
||||
debug!(
|
||||
LOGGER,
|
||||
"Duplicate commit for potential coinbase detected. Trying next derivation."
|
||||
);
|
||||
}
|
||||
ae => {
|
||||
warn!(LOGGER, "Error building new block: {:?}. Retrying.", ae);
|
||||
}
|
||||
}
|
||||
thread::sleep(Duration::from_millis(100));
|
||||
result = build_block(
|
||||
chain,
|
||||
tx_pool,
|
||||
key_id.clone(),
|
||||
max_tx,
|
||||
wallet_listener_url.clone(),
|
||||
);
|
||||
}
|
||||
return result.unwrap();
|
||||
}
|
||||
|
||||
/// Builds a new block with the chain head as previous and eligible
|
||||
/// transactions from the pool.
|
||||
fn build_block(
|
||||
chain: &Arc<chain::Chain>,
|
||||
tx_pool: &Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
|
||||
key_id: Option<Identifier>,
|
||||
max_tx: u32,
|
||||
wallet_listener_url: Option<String>,
|
||||
) -> Result<(core::Block, BlockFees), Error> {
|
||||
// prepare the block header timestamp
|
||||
let head = chain.head_header().unwrap();
|
||||
let mut now_sec = time::get_time().sec;
|
||||
let head_sec = head.timestamp.to_timespec().sec;
|
||||
if now_sec <= head_sec {
|
||||
now_sec = head_sec + 1;
|
||||
}
|
||||
|
||||
// get the difficulty our block should be at
|
||||
let diff_iter = chain.difficulty_iter();
|
||||
let difficulty = consensus::next_difficulty(diff_iter).unwrap();
|
||||
|
||||
// extract current transaction from the pool
|
||||
let txs_box = tx_pool
|
||||
.read()
|
||||
.unwrap()
|
||||
.prepare_mineable_transactions(max_tx);
|
||||
let txs: Vec<&Transaction> = txs_box.iter().map(|tx| tx.as_ref()).collect();
|
||||
|
||||
// build the coinbase and the block itself
|
||||
let fees = txs.iter().map(|tx| tx.fee()).sum();
|
||||
let height = head.height + 1;
|
||||
let block_fees = BlockFees {
|
||||
fees,
|
||||
key_id,
|
||||
height,
|
||||
};
|
||||
|
||||
let (output, kernel, block_fees) = get_coinbase(wallet_listener_url, block_fees)?;
|
||||
let mut b = core::Block::with_reward(&head, txs, output, kernel, difficulty.clone())?;
|
||||
|
||||
// making sure we're not spending time mining a useless block
|
||||
b.validate(&head)?;
|
||||
|
||||
let mut rng = rand::OsRng::new().unwrap();
|
||||
b.header.nonce = rng.gen();
|
||||
b.header.timestamp = time::at_utc(time::Timespec::new(now_sec, 0));
|
||||
|
||||
let b_difficulty =
|
||||
(b.header.total_difficulty.clone() - head.total_difficulty.clone()).into_num();
|
||||
debug!(
|
||||
LOGGER,
|
||||
"Built new block with {} inputs and {} outputs, network difficulty: {}, cumulative difficulty {}",
|
||||
b.inputs.len(),
|
||||
b.outputs.len(),
|
||||
b_difficulty,
|
||||
b.header.clone().total_difficulty.clone().into_num(),
|
||||
);
|
||||
|
||||
let roots_result = chain.set_txhashset_roots(&mut b, false);
|
||||
|
||||
match roots_result {
|
||||
Ok(_) => Ok((b, block_fees)),
|
||||
|
||||
// If it's a duplicate commitment, it's likely trying to use
|
||||
// a key that's already been derived but not in the wallet
|
||||
// for some reason, allow caller to retry
|
||||
Err(chain::Error::DuplicateCommitment(e)) => {
|
||||
Err(Error::Chain(chain::Error::DuplicateCommitment(e)))
|
||||
}
|
||||
|
||||
//Some other issue, possibly duplicate kernel
|
||||
Err(e) => {
|
||||
error!(
|
||||
LOGGER,
|
||||
"Error setting txhashset root to build a block: {:?}", e
|
||||
);
|
||||
Err(Error::Chain(chain::Error::Other(format!("{:?}", e))))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Probably only want to do this when testing.
|
||||
///
|
||||
fn burn_reward(block_fees: BlockFees) -> Result<(core::Output, core::TxKernel, BlockFees), Error> {
|
||||
warn!(LOGGER, "Burning block fees: {:?}", block_fees);
|
||||
let keychain = Keychain::from_random_seed().unwrap();
|
||||
let key_id = keychain.derive_key_id(1).unwrap();
|
||||
let (out, kernel) =
|
||||
core::Block::reward_output(&keychain, &key_id, block_fees.fees, block_fees.height).unwrap();
|
||||
Ok((out, kernel, block_fees))
|
||||
}
|
||||
|
||||
// Connect to the wallet listener and get coinbase.
|
||||
// Warning: If a wallet listener URL is not provided the reward will be "burnt"
|
||||
fn get_coinbase(
|
||||
wallet_listener_url: Option<String>,
|
||||
block_fees: BlockFees,
|
||||
) -> Result<(core::Output, core::TxKernel, BlockFees), Error> {
|
||||
match wallet_listener_url {
|
||||
None => {
|
||||
// Burn it
|
||||
return burn_reward(block_fees);
|
||||
}
|
||||
Some(wallet_listener_url) => {
|
||||
// Get the wallet coinbase
|
||||
let url = format!("{}/v1/receive/coinbase", wallet_listener_url.as_str());
|
||||
|
||||
let res = wallet::client::create_coinbase(&url, &block_fees)?;
|
||||
|
||||
let out_bin = util::from_hex(res.output).unwrap();
|
||||
let kern_bin = util::from_hex(res.kernel).unwrap();
|
||||
let key_id_bin = util::from_hex(res.key_id).unwrap();
|
||||
let output = ser::deserialize(&mut &out_bin[..]).unwrap();
|
||||
let kernel = ser::deserialize(&mut &kern_bin[..]).unwrap();
|
||||
let key_id = ser::deserialize(&mut &key_id_bin[..]).unwrap();
|
||||
let block_fees = BlockFees {
|
||||
key_id: Some(key_id),
|
||||
..block_fees
|
||||
};
|
||||
|
||||
debug!(LOGGER, "get_coinbase: {:?}", block_fees);
|
||||
|
||||
return Ok((output, kernel, block_fees));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -12,41 +12,34 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Mining service, gathers transactions from the pool, assemble them in a
|
||||
//! block and mine the block to produce a valid header with its proof-of-work.
|
||||
//! Mining service, gets a block to mine, and based on mining configuration chooses
|
||||
//! a version of the cuckoo miner to mine the block and produce a valid header with
|
||||
//! its proof-of-work. Any valid mined blocks are submitted to the network.
|
||||
|
||||
use rand::{self, Rng};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use time;
|
||||
use itertools::Itertools;
|
||||
|
||||
use common::adapters::PoolToChainAdapter;
|
||||
use core::consensus;
|
||||
use core::core;
|
||||
use core::core::Proof;
|
||||
use core::core::{Block, BlockHeader, Transaction};
|
||||
use core::core::{Block, BlockHeader};
|
||||
use core::core::hash::{Hash, Hashed};
|
||||
use pow::{cuckoo, MiningWorker};
|
||||
use pow::types::MinerConfig;
|
||||
use pow::plugin::PluginMiner;
|
||||
use core::ser;
|
||||
use core::global;
|
||||
use core::ser::AsFixedBytes;
|
||||
use util::LOGGER;
|
||||
use common::types::Error;
|
||||
use common::stats::MiningStats;
|
||||
|
||||
use chain;
|
||||
use pool;
|
||||
use util;
|
||||
use keychain::{Identifier, Keychain};
|
||||
use wallet;
|
||||
use wallet::BlockFees;
|
||||
|
||||
use pow::plugin::PluginMiner;
|
||||
|
||||
use itertools::Itertools;
|
||||
use mining::mine_block;
|
||||
|
||||
// Max number of transactions this miner will assemble in a block
|
||||
const MAX_TX: u32 = 5000;
|
||||
|
@ -223,9 +216,8 @@ impl Miner {
|
|||
}
|
||||
info!(
|
||||
LOGGER,
|
||||
"Mining: Cuckoo{} at {} gps (graphs per second)",
|
||||
cuckoo_size,
|
||||
sps_total);
|
||||
"Mining: Cuckoo{} at {} gps (graphs per second)", cuckoo_size, sps_total
|
||||
);
|
||||
if sps_total.is_finite() {
|
||||
let mut mining_stats = mining_stats.write().unwrap();
|
||||
mining_stats.combined_gps = sps_total;
|
||||
|
@ -483,7 +475,7 @@ impl Miner {
|
|||
}
|
||||
|
||||
// iteration, we keep the returned derivation to provide it back when
|
||||
// nothing has changed
|
||||
// nothing has changed. We only want to create a new key_id for each new block.
|
||||
let mut key_id = None;
|
||||
|
||||
{
|
||||
|
@ -498,22 +490,18 @@ impl Miner {
|
|||
// get the latest chain state and build a block on top of it
|
||||
let head = self.chain.head_header().unwrap();
|
||||
let mut latest_hash = self.chain.head().unwrap().last_block_h;
|
||||
|
||||
let mut result = self.build_block(&head, key_id.clone());
|
||||
while let Err(e) = result {
|
||||
match e {
|
||||
self::Error::Chain(chain::Error::DuplicateCommitment(_)) => {
|
||||
debug!(LOGGER, "Duplicate commit for potential coinbase detected. Trying next derivation.");
|
||||
}
|
||||
ae => {
|
||||
warn!(LOGGER, "Error building new block: {:?}. Retrying.", ae);
|
||||
}
|
||||
}
|
||||
thread::sleep(Duration::from_millis(100));
|
||||
result = self.build_block(&head, key_id.clone());
|
||||
let mut wallet_listener_url: Option<String> = None;
|
||||
if !self.config.burn_reward {
|
||||
wallet_listener_url = Some(self.config.wallet_listener_url.clone());
|
||||
}
|
||||
|
||||
let (mut b, block_fees) = result.unwrap();
|
||||
let (mut b, block_fees) = mine_block::get_block(
|
||||
&self.chain,
|
||||
&self.tx_pool,
|
||||
key_id.clone(),
|
||||
MAX_TX.clone(),
|
||||
wallet_listener_url,
|
||||
);
|
||||
{
|
||||
let mut mining_stats = mining_stats.write().unwrap();
|
||||
mining_stats.block_height = b.header.height;
|
||||
|
@ -595,127 +583,4 @@ impl Miner {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Builds a new block with the chain head as previous and eligible
|
||||
/// transactions from the pool.
|
||||
fn build_block(
|
||||
&self,
|
||||
head: &core::BlockHeader,
|
||||
key_id: Option<Identifier>,
|
||||
) -> Result<(core::Block, BlockFees), Error> {
|
||||
// prepare the block header timestamp
|
||||
let mut now_sec = time::get_time().sec;
|
||||
let head_sec = head.timestamp.to_timespec().sec;
|
||||
if now_sec <= head_sec {
|
||||
now_sec = head_sec + 1;
|
||||
}
|
||||
|
||||
// get the difficulty our block should be at
|
||||
let diff_iter = self.chain.difficulty_iter();
|
||||
let difficulty = consensus::next_difficulty(diff_iter).unwrap();
|
||||
|
||||
// extract current transaction from the pool
|
||||
let txs_box = self.tx_pool
|
||||
.read()
|
||||
.unwrap()
|
||||
.prepare_mineable_transactions(MAX_TX);
|
||||
let txs: Vec<&Transaction> = txs_box.iter().map(|tx| tx.as_ref()).collect();
|
||||
|
||||
// build the coinbase and the block itself
|
||||
let fees = txs.iter().map(|tx| tx.fee()).sum();
|
||||
let height = head.height + 1;
|
||||
let block_fees = BlockFees {
|
||||
fees,
|
||||
key_id,
|
||||
height,
|
||||
};
|
||||
|
||||
let (output, kernel, block_fees) = self.get_coinbase(block_fees)?;
|
||||
let mut b = core::Block::with_reward(head, txs, output, kernel, difficulty.clone())?;
|
||||
|
||||
debug!(
|
||||
LOGGER,
|
||||
"(Server ID: {}) Built new block with {} inputs and {} outputs, network difficulty: {}, cumulative difficulty {}",
|
||||
self.debug_output_id,
|
||||
b.inputs.len(),
|
||||
b.outputs.len(),
|
||||
difficulty.clone().into_num(),
|
||||
b.header.clone().total_difficulty.clone().into_num(),
|
||||
);
|
||||
|
||||
// making sure we're not spending time mining a useless block
|
||||
b.validate(&head)?;
|
||||
|
||||
let mut rng = rand::OsRng::new().unwrap();
|
||||
b.header.nonce = rng.gen();
|
||||
b.header.timestamp = time::at_utc(time::Timespec::new(now_sec, 0));
|
||||
|
||||
let roots_result = self.chain.set_txhashset_roots(&mut b, false);
|
||||
|
||||
match roots_result {
|
||||
Ok(_) => Ok((b, block_fees)),
|
||||
|
||||
// If it's a duplicate commitment, it's likely trying to use
|
||||
// a key that's already been derived but not in the wallet
|
||||
// for some reason, allow caller to retry
|
||||
Err(chain::Error::DuplicateCommitment(e)) => {
|
||||
Err(Error::Chain(chain::Error::DuplicateCommitment(e)))
|
||||
}
|
||||
|
||||
//Some other issue, possibly duplicate kernel
|
||||
Err(e) => {
|
||||
error!(
|
||||
LOGGER,
|
||||
"Error setting txhashset root to build a block: {:?}", e
|
||||
);
|
||||
Err(Error::Chain(chain::Error::Other(format!("{:?}", e))))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Probably only want to do this when testing.
|
||||
///
|
||||
fn burn_reward(
|
||||
&self,
|
||||
block_fees: BlockFees,
|
||||
) -> Result<(core::Output, core::TxKernel, BlockFees), Error> {
|
||||
let keychain = Keychain::from_random_seed().unwrap();
|
||||
let key_id = keychain.derive_key_id(1).unwrap();
|
||||
let (out, kernel) =
|
||||
core::Block::reward_output(&keychain, &key_id, block_fees.fees, block_fees.height)
|
||||
.unwrap();
|
||||
Ok((out, kernel, block_fees))
|
||||
}
|
||||
|
||||
fn get_coinbase(
|
||||
&self,
|
||||
block_fees: BlockFees,
|
||||
) -> Result<(core::Output, core::TxKernel, BlockFees), Error> {
|
||||
if self.config.burn_reward {
|
||||
self.burn_reward(block_fees)
|
||||
} else {
|
||||
let url = format!(
|
||||
"{}/v1/receive/coinbase",
|
||||
self.config.wallet_listener_url.as_str()
|
||||
);
|
||||
|
||||
let res = wallet::client::create_coinbase(&url, &block_fees)?;
|
||||
|
||||
let out_bin = util::from_hex(res.output).unwrap();
|
||||
let kern_bin = util::from_hex(res.kernel).unwrap();
|
||||
let key_id_bin = util::from_hex(res.key_id).unwrap();
|
||||
let output = ser::deserialize(&mut &out_bin[..]).unwrap();
|
||||
let kernel = ser::deserialize(&mut &kern_bin[..]).unwrap();
|
||||
let key_id = ser::deserialize(&mut &key_id_bin[..]).unwrap();
|
||||
let block_fees = BlockFees {
|
||||
key_id: Some(key_id),
|
||||
..block_fees
|
||||
};
|
||||
|
||||
debug!(LOGGER, "get_coinbase: {:?}", block_fees);
|
||||
|
||||
Ok((output, kernel, block_fees))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,3 +15,5 @@
|
|||
//! Mining + Mining server
|
||||
|
||||
pub mod miner;
|
||||
pub mod stratumserver;
|
||||
mod mine_block;
|
||||
|
|
574
servers/src/mining/stratumserver.rs
Normal file
574
servers/src/mining/stratumserver.rs
Normal file
|
@ -0,0 +1,574 @@
|
|||
// Copyright 2018 The Grin Developers
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Mining Stratum Server
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use std::net::{TcpListener, TcpStream};
|
||||
use std::io::{ErrorKind, Write};
|
||||
use std::error::Error;
|
||||
use time;
|
||||
use util::LOGGER;
|
||||
use std::io::BufRead;
|
||||
use bufstream::BufStream;
|
||||
use std::sync::{Arc, Mutex, RwLock};
|
||||
use serde_json;
|
||||
use std::time::SystemTime;
|
||||
|
||||
use common::adapters::PoolToChainAdapter;
|
||||
use core::core::{Block, BlockHeader};
|
||||
use pow::types::MinerConfig;
|
||||
use mining::miner::*;
|
||||
use mining::mine_block;
|
||||
use chain;
|
||||
use pool;
|
||||
use common::stats::{StratumStats, WorkerStats};
|
||||
|
||||
// Max number of transactions this miner will assemble in a block
|
||||
const MAX_TX: u32 = 5000;
|
||||
|
||||
// ----------------------------------------
|
||||
// http://www.jsonrpc.org/specification
|
||||
// RPC Methods
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
struct RpcRequest {
|
||||
id: String,
|
||||
jsonrpc: String,
|
||||
method: String,
|
||||
params: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
struct RpcResponse {
|
||||
id: String,
|
||||
jsonrpc: String,
|
||||
result: Option<String>,
|
||||
error: Option<RpcError>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
struct RpcError {
|
||||
code: i32,
|
||||
message: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
struct LoginParams {
|
||||
login: String,
|
||||
pass: String,
|
||||
agent: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
struct SubmitParams {
|
||||
height: u64,
|
||||
nonce: u64,
|
||||
pow: Vec<u32>,
|
||||
}
|
||||
|
||||
// ----------------------------------------
|
||||
// Worker Factory Thread Function
|
||||
|
||||
// Run in a thread. Adds new connections to the workers list
|
||||
fn accept_workers(id: String, address: String, workers: &mut Arc<Mutex<Vec<Worker>>>, stratum_stats: &mut Arc<RwLock<StratumStats>>) {
|
||||
let listener = TcpListener::bind(address).expect("Failed to bind to listen address");
|
||||
let mut worker_id: u32 = 0;
|
||||
for stream in listener.incoming() {
|
||||
match stream {
|
||||
Ok(stream) => {
|
||||
warn!(
|
||||
LOGGER,
|
||||
"(Server ID: {}) New connection: {}",
|
||||
id,
|
||||
stream.peer_addr().unwrap()
|
||||
);
|
||||
stream
|
||||
.set_nonblocking(true)
|
||||
.expect("set_nonblocking call failed");
|
||||
let mut worker = Worker::new(worker_id.to_string(), BufStream::new(stream));
|
||||
workers.lock().unwrap().push(worker);
|
||||
// stats for this worker (worker stat objects are added and updated but never removed)
|
||||
let mut worker_stats = WorkerStats::default();
|
||||
worker_stats.is_connected = true;
|
||||
worker_stats.id = worker_id.to_string();
|
||||
worker_stats.pow_difficulty = 1; // XXX TODO
|
||||
let mut stratum_stats = stratum_stats.write().unwrap();
|
||||
stratum_stats.worker_stats.push(worker_stats);
|
||||
worker_id = worker_id + 1;
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(
|
||||
LOGGER,
|
||||
"(Server ID: {}) Error accepting connection: {:?}", id, e
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
// close the socket server
|
||||
drop(listener);
|
||||
}
|
||||
|
||||
// ----------------------------------------
|
||||
// Worker Object - a connected stratum client - a miner, pool, proxy, etc...
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct JobTemplate {
|
||||
pre_pow: String,
|
||||
}
|
||||
|
||||
pub struct Worker {
|
||||
id: String,
|
||||
stream: BufStream<TcpStream>,
|
||||
error: bool,
|
||||
authenticated: bool,
|
||||
}
|
||||
|
||||
impl Worker {
|
||||
/// Creates a new Stratum Worker.
|
||||
pub fn new(id: String, stream: BufStream<TcpStream>) -> Worker {
|
||||
Worker {
|
||||
id: id,
|
||||
stream: stream,
|
||||
error: false,
|
||||
authenticated: false,
|
||||
}
|
||||
}
|
||||
|
||||
// Get Message from the worker
|
||||
fn read_message(&mut self) -> Option<String> {
|
||||
// Read and return a single message or None
|
||||
let mut line = String::new();
|
||||
match self.stream.read_line(&mut line) {
|
||||
Ok(_) => {
|
||||
return Some(line);
|
||||
}
|
||||
Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
|
||||
// Not an error, just no messages ready
|
||||
return None;
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(
|
||||
LOGGER,
|
||||
"(Server ID: {}) Error in connection with stratum client: {}", self.id, e
|
||||
);
|
||||
self.error = true;
|
||||
return None;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Send Message to the worker
|
||||
fn send_message(&mut self, message_in: String) {
|
||||
// Write and Flush the message
|
||||
let mut message = message_in.clone();
|
||||
if !message.ends_with("\n") {
|
||||
message += "\n";
|
||||
}
|
||||
match self.stream.write(message.as_bytes()) {
|
||||
Ok(_) => match self.stream.flush() {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
warn!(
|
||||
LOGGER,
|
||||
"(Server ID: {}) Error in connection with stratum client: {}", self.id, e
|
||||
);
|
||||
self.error = true;
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
warn!(
|
||||
LOGGER,
|
||||
"(Server ID: {}) Error in connection with stratum client: {}", self.id, e
|
||||
);
|
||||
self.error = true;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
} // impl Worker
|
||||
|
||||
// ----------------------------------------
|
||||
// Grin Stratum Server
|
||||
|
||||
pub struct StratumServer {
|
||||
id: String,
|
||||
config: MinerConfig,
|
||||
chain: Arc<chain::Chain>,
|
||||
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
|
||||
current_block: Block,
|
||||
workers: Arc<Mutex<Vec<Worker>>>,
|
||||
}
|
||||
|
||||
impl StratumServer {
|
||||
/// Creates a new Stratum Server.
|
||||
pub fn new(
|
||||
config: MinerConfig,
|
||||
chain_ref: Arc<chain::Chain>,
|
||||
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
|
||||
) -> StratumServer {
|
||||
StratumServer {
|
||||
id: String::from("StratumServer"),
|
||||
config: config,
|
||||
chain: chain_ref,
|
||||
tx_pool: tx_pool,
|
||||
current_block: Block::default(),
|
||||
workers: Arc::new(Mutex::new(Vec::new())),
|
||||
}
|
||||
}
|
||||
|
||||
// Build and return a JobTemplate for mining the current block
|
||||
fn build_block_template(&self, bh: BlockHeader) -> JobTemplate {
|
||||
// Serialize the block header into pre and post nonce strings
|
||||
let mut pre_pow_writer = HeaderPrePowWriter::default();
|
||||
bh.write_pre_pow(&mut pre_pow_writer).unwrap();
|
||||
let pre = pre_pow_writer.as_hex_string(false);
|
||||
let job_template = JobTemplate { pre_pow: pre };
|
||||
return job_template;
|
||||
}
|
||||
|
||||
// Handle an RPC request message from the worker(s)
|
||||
fn handle_rpc_requests(&mut self, stratum_stats: &mut Arc<RwLock<StratumStats>>) {
|
||||
let mut workers_l = self.workers.lock().unwrap();
|
||||
for num in 0..workers_l.len() {
|
||||
match workers_l[num].read_message() {
|
||||
Some(the_message) => {
|
||||
// Decompose the request from the JSONRpc wrapper
|
||||
let request: RpcRequest = match serde_json::from_str(&the_message) {
|
||||
Ok(request) => request,
|
||||
Err(e) => {
|
||||
// not a valid JSON RpcRequest - disconnect the worker
|
||||
warn!(
|
||||
LOGGER,
|
||||
"(Server ID: {}) Failed to parse JSONRpc: {} - {:?}",
|
||||
self.id,
|
||||
e.description(),
|
||||
the_message.as_bytes(),
|
||||
);
|
||||
workers_l[num].error = true;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let mut stratum_stats = stratum_stats.write().unwrap();
|
||||
let worker_stats_id = stratum_stats.worker_stats.iter().position(|r| r.id == workers_l[num].id).unwrap();
|
||||
stratum_stats.worker_stats[worker_stats_id].last_seen = SystemTime::now();
|
||||
|
||||
// Call the handler function for requested method
|
||||
let (response, err) = match request.method.as_str() {
|
||||
"login" => {
|
||||
let (response, err) = self.handle_login(request.params);
|
||||
// XXX TODO Future? - Validate username and password
|
||||
if err == false {
|
||||
workers_l[num].authenticated = true;
|
||||
}
|
||||
(response, err)
|
||||
}
|
||||
"submit" => self.handle_submit(request.params, &mut stratum_stats.worker_stats[worker_stats_id]),
|
||||
"keepalive" => self.handle_keepalive(),
|
||||
"getjobtemplate" => {
|
||||
let b = self.current_block.header.clone();
|
||||
self.handle_getjobtemplate(b)
|
||||
}
|
||||
"status" => self.handle_status(&stratum_stats.worker_stats[worker_stats_id]),
|
||||
_ => {
|
||||
// Called undefined method
|
||||
let e = r#"{"code": -32601, "message": "Method not found"}"#;
|
||||
let err = e.to_string();
|
||||
(err, true)
|
||||
}
|
||||
};
|
||||
|
||||
// Package the reply as RpcResponse json
|
||||
let rpc_response: String;
|
||||
if err == true {
|
||||
let rpc_err: RpcError = serde_json::from_str(&response).unwrap();
|
||||
let resp = RpcResponse {
|
||||
id: workers_l[num].id.clone(),
|
||||
jsonrpc: String::from("2.0"),
|
||||
result: None,
|
||||
error: Some(rpc_err),
|
||||
};
|
||||
rpc_response = serde_json::to_string(&resp).unwrap();
|
||||
} else {
|
||||
let resp = RpcResponse {
|
||||
id: workers_l[num].id.clone(),
|
||||
jsonrpc: String::from("2.0"),
|
||||
result: Some(response),
|
||||
error: None,
|
||||
};
|
||||
rpc_response = serde_json::to_string(&resp).unwrap();
|
||||
}
|
||||
|
||||
// Send the reply
|
||||
workers_l[num].send_message(rpc_response);
|
||||
}
|
||||
None => {} // No message for us from this worker
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Handle STATUS message
|
||||
fn handle_status(&self, worker_stats: &WorkerStats) -> (String, bool) {
|
||||
// Return stratum and worker stats in json for use by a dashboard or healthcheck.
|
||||
let response = serde_json::to_string(&worker_stats).unwrap();
|
||||
return (response, false);
|
||||
}
|
||||
|
||||
// Handle GETJOBTEMPLATE message
|
||||
fn handle_getjobtemplate(&self, bh: BlockHeader) -> (String, bool) {
|
||||
// Build a JobTemplate from a BlockHeader and return JSON
|
||||
let job_template = self.build_block_template(bh);
|
||||
let job_template_json = serde_json::to_string(&job_template).unwrap();
|
||||
return (job_template_json, false);
|
||||
}
|
||||
|
||||
// Handle KEEPALIVE message
|
||||
fn handle_keepalive(&self) -> (String, bool) {
|
||||
return (String::from("ok"), false);
|
||||
}
|
||||
|
||||
// Handle LOGIN message
|
||||
fn handle_login(&self, params: Option<String>) -> (String, bool) {
|
||||
// Extract the params string into a LoginParams struct
|
||||
let params_str = match params {
|
||||
Some(val) => val,
|
||||
None => String::from("{}"),
|
||||
};
|
||||
let _login_params: LoginParams = match serde_json::from_str(¶ms_str) {
|
||||
Ok(val) => val,
|
||||
Err(_e) => {
|
||||
let r = r#"{"code": -32600, "message": "Invalid Request"}"#;
|
||||
return (String::from(r), true);
|
||||
}
|
||||
};
|
||||
return (String::from("ok"), false);
|
||||
}
|
||||
|
||||
// Handle SUBMIT message
|
||||
// params contains a solved block header
|
||||
// we are expecting real solutions at the full difficulty.
|
||||
fn handle_submit(&self, params: Option<String>, worker_stats: &mut WorkerStats) -> (String, bool) {
|
||||
// Extract the params string into a SubmitParams struct
|
||||
let params_str = match params {
|
||||
Some(val) => val,
|
||||
None => String::from("{}"),
|
||||
};
|
||||
let submit_params: SubmitParams = match serde_json::from_str(¶ms_str) {
|
||||
Ok(val) => val,
|
||||
Err(_e) => {
|
||||
let r = r#"{"code": -32600, "message": "Invalid Request"}"#;
|
||||
return (String::from(r), true);
|
||||
}
|
||||
};
|
||||
|
||||
let mut b: Block;
|
||||
if submit_params.height == self.current_block.header.height {
|
||||
// Reconstruct the block header with this nonce and pow added
|
||||
b = self.current_block.clone();
|
||||
b.header.nonce = submit_params.nonce;
|
||||
b.header.pow.proof_size = submit_params.pow.len();
|
||||
b.header.pow.nonces = submit_params.pow;
|
||||
info!(
|
||||
LOGGER,
|
||||
"(Server ID: {}) Found proof of work, adding block {}",
|
||||
self.id,
|
||||
b.hash()
|
||||
);
|
||||
// Submit the block to grin server (known here as "self.miner")
|
||||
let res = self.chain.process_block(b.clone(), chain::Options::MINE);
|
||||
if let Err(e) = res {
|
||||
error!(
|
||||
LOGGER,
|
||||
"(Server ID: {}) Error validating mined block: {:?}", self.id, e
|
||||
);
|
||||
worker_stats.num_rejected += 1;
|
||||
let e = r#"{"code": -1, "message": "Solution validation failed"}"#;
|
||||
let err = e.to_string();
|
||||
return (err, true);
|
||||
}
|
||||
} else {
|
||||
warn!(
|
||||
LOGGER,
|
||||
"(Server ID: {}) Found POW for block at height: {} - but too late",
|
||||
self.id,
|
||||
submit_params.height
|
||||
);
|
||||
worker_stats.num_stale += 1;
|
||||
let e = r#"{"code": -1, "message": "Solution submitted too late"}"#;
|
||||
let err = e.to_string();
|
||||
return (err, true);
|
||||
}
|
||||
worker_stats.num_accepted += 1;
|
||||
return (String::from("ok"), false);
|
||||
} // handle submit a solution
|
||||
|
||||
// Purge dead/sick workers - remove all workers marked in error state
|
||||
fn clean_workers(&mut self, stratum_stats: &mut Arc<RwLock<StratumStats>>) {
|
||||
let mut start = 0;
|
||||
let mut workers_l = self.workers.lock().unwrap();
|
||||
loop {
|
||||
for num in start..workers_l.len() {
|
||||
if workers_l[num].error == true {
|
||||
warn!(
|
||||
LOGGER,
|
||||
"(Server ID: {}) Dropping worker: {}",
|
||||
self.id,
|
||||
workers_l[num].id;
|
||||
);
|
||||
// Update worker stats
|
||||
let mut stratum_stats = stratum_stats.write().unwrap();
|
||||
let worker_stats_id = stratum_stats.worker_stats.iter().position(|r| r.id == workers_l[num].id).unwrap();
|
||||
stratum_stats.worker_stats[worker_stats_id].is_connected = false;
|
||||
// Remove the dead worker
|
||||
workers_l.remove(num);
|
||||
break;
|
||||
}
|
||||
start = num + 1;
|
||||
}
|
||||
if start >= workers_l.len() {
|
||||
let mut stratum_stats = stratum_stats.write().unwrap();
|
||||
stratum_stats.num_workers = workers_l.len();
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Broadcast a jobtemplate RpcRequest to all connected workers
|
||||
fn broadcast_job(&mut self) {
|
||||
debug!(
|
||||
LOGGER,
|
||||
"(Server ID: {}) sending block {} to stratum clients",
|
||||
self.id,
|
||||
self.current_block.header.height
|
||||
);
|
||||
|
||||
// Package new block into RpcRequest
|
||||
let job_template = self.build_block_template(self.current_block.header.clone());
|
||||
let job_template_json = serde_json::to_string(&job_template).unwrap();
|
||||
let job_request = RpcRequest {
|
||||
id: String::from("Stratum"),
|
||||
jsonrpc: String::from("2.0"),
|
||||
method: String::from("job"),
|
||||
params: Some(job_template_json),
|
||||
};
|
||||
let job_request_json = serde_json::to_string(&job_request).unwrap();
|
||||
|
||||
// Push the new block to all connected clients
|
||||
let mut workers_l = self.workers.lock().unwrap();
|
||||
for num in 0..workers_l.len() {
|
||||
workers_l[num].send_message(job_request_json.clone());
|
||||
}
|
||||
}
|
||||
|
||||
/// "main()" - Starts the stratum-server. Listens for a connection, then enters a
|
||||
/// loop, building a new block on top of the existing chain anytime required and sending that to
|
||||
/// the connected stratum miner, proxy, or pool, and accepts full solutions to be submitted.
|
||||
pub fn run_loop(&mut self, miner_config: MinerConfig, stratum_stats: Arc<RwLock<StratumStats>>, cuckoo_size: u32, proof_size: usize) {
|
||||
info!(
|
||||
LOGGER,
|
||||
"(Server ID: {}) Starting stratum server with cuckoo_size = {}, proof_size = {}",
|
||||
self.id,
|
||||
cuckoo_size,
|
||||
proof_size
|
||||
);
|
||||
|
||||
// "globals" for this function
|
||||
let attempt_time_per_block = miner_config.attempt_time_per_block;
|
||||
let mut deadline: i64 = 0;
|
||||
// to prevent the wallet from generating a new HD key derivation for each
|
||||
// iteration, we keep the returned derivation to provide it back when
|
||||
// nothing has changed. We only want to create a key_id for each new block,
|
||||
// and reuse it when we rebuild the current block to add new tx.
|
||||
let mut key_id = None;
|
||||
let mut head = self.chain.head().unwrap();
|
||||
let mut current_hash = head.prev_block_h;
|
||||
let mut latest_hash;
|
||||
let listen_addr = miner_config.stratum_server_addr.clone().unwrap();
|
||||
|
||||
// Start a thread to accept new worker connections
|
||||
let mut workers_th = self.workers.clone();
|
||||
let id_th = self.id.clone();
|
||||
let mut stats_th = stratum_stats.clone();
|
||||
let _listener_th = thread::spawn(move || {
|
||||
accept_workers(id_th, listen_addr, &mut workers_th, &mut stats_th);
|
||||
});
|
||||
|
||||
// We have started
|
||||
{
|
||||
let mut stratum_stats = stratum_stats.write().unwrap();
|
||||
stratum_stats.is_running = true;
|
||||
stratum_stats.cuckoo_size = cuckoo_size as u16;
|
||||
}
|
||||
|
||||
warn!(
|
||||
LOGGER,
|
||||
"Stratum server started on {}",
|
||||
miner_config.stratum_server_addr.unwrap()
|
||||
);
|
||||
|
||||
// Main Loop
|
||||
loop {
|
||||
// Remove workers with failed connections
|
||||
self.clean_workers(&mut stratum_stats.clone());
|
||||
|
||||
// get the latest chain state
|
||||
head = self.chain.head().unwrap();
|
||||
latest_hash = head.last_block_h;
|
||||
|
||||
// Build a new block if:
|
||||
// There is a new block on the chain
|
||||
// or We are rebuilding the current one to include new transactions
|
||||
if current_hash != latest_hash || time::get_time().sec >= deadline {
|
||||
if current_hash != latest_hash {
|
||||
// A brand new block, so we will generate a new key_id
|
||||
key_id = None;
|
||||
}
|
||||
let mut wallet_listener_url: Option<String> = None;
|
||||
if !self.config.burn_reward {
|
||||
wallet_listener_url = Some(self.config.wallet_listener_url.clone());
|
||||
}
|
||||
|
||||
let (new_block, block_fees) = mine_block::get_block(
|
||||
&self.chain,
|
||||
&self.tx_pool,
|
||||
key_id.clone(),
|
||||
MAX_TX.clone(),
|
||||
wallet_listener_url,
|
||||
);
|
||||
self.current_block = new_block;
|
||||
key_id = block_fees.key_id();
|
||||
current_hash = latest_hash;
|
||||
// set a new deadline for rebuilding with fresh transactions
|
||||
deadline = time::get_time().sec + attempt_time_per_block as i64;
|
||||
|
||||
{
|
||||
let mut stratum_stats = stratum_stats.write().unwrap();
|
||||
stratum_stats.block_height = self.current_block.header.height;
|
||||
stratum_stats.network_difficulty =
|
||||
(self.current_block.header.total_difficulty.clone() - head.total_difficulty.clone()).into_num();
|
||||
}
|
||||
|
||||
// Send this job to all connected workers
|
||||
self.broadcast_job();
|
||||
}
|
||||
|
||||
// Handle any messages from the workers
|
||||
self.handle_rpc_requests(&mut stratum_stats.clone());
|
||||
|
||||
// sleep before restarting loop
|
||||
thread::sleep(Duration::from_millis(500));
|
||||
} // Main Loop
|
||||
} // fn run_loop()
|
||||
} // StratumServer
|
|
@ -557,3 +557,38 @@ impl LocalServerContainerPool {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Create and return a ServerConfig
|
||||
pub fn config(n: u16, test_name_dir: &str, seed_n: u16) -> servers::ServerConfig {
|
||||
servers::ServerConfig {
|
||||
api_http_addr: format!("127.0.0.1:{}", 20000 + n),
|
||||
db_root: format!("target/tmp/{}/grin-sync-{}", test_name_dir, n),
|
||||
p2p_config: p2p::P2PConfig {
|
||||
port: 10000 + n,
|
||||
..p2p::P2PConfig::default()
|
||||
},
|
||||
seeding_type: servers::Seeding::List,
|
||||
seeds: Some(vec![format!("127.0.0.1:{}", 10000 + seed_n)]),
|
||||
chain_type: core::global::ChainTypes::AutomatedTesting,
|
||||
archive_mode: Some(true),
|
||||
skip_sync_wait: Some(true),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
/// Create and return a MinerConfig
|
||||
pub fn miner_config() -> pow::types::MinerConfig {
|
||||
let mut plugin_config = pow::types::CuckooMinerPluginConfig::default();
|
||||
let mut plugin_config_vec: Vec<pow::types::CuckooMinerPluginConfig> = Vec::new();
|
||||
plugin_config.type_filter = String::from("mean_cpu");
|
||||
plugin_config_vec.push(plugin_config);
|
||||
|
||||
pow::types::MinerConfig {
|
||||
enable_mining: true,
|
||||
burn_reward: true,
|
||||
miner_async_mode: Some(false),
|
||||
miner_plugin_dir: None,
|
||||
miner_plugin_config: Some(plugin_config_vec),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,8 @@ extern crate grin_wallet as wallet;
|
|||
|
||||
mod framework;
|
||||
|
||||
use std::fs;
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
use std::time;
|
||||
use std::default::Default;
|
||||
|
@ -31,7 +33,7 @@ use core::global;
|
|||
use core::global::ChainTypes;
|
||||
|
||||
use framework::{LocalServerContainerConfig, LocalServerContainerPool,
|
||||
LocalServerContainerPoolConfig};
|
||||
LocalServerContainerPoolConfig, config, miner_config};
|
||||
|
||||
/// Testing the frameworks by starting a fresh server, creating a genesis
|
||||
/// Block and mining into a wallet for a bit
|
||||
|
@ -186,7 +188,7 @@ fn simulate_block_propagation() {
|
|||
// instantiates 5 servers on different ports
|
||||
let mut servers = vec![];
|
||||
for n in 0..5 {
|
||||
let s = servers::Server::new(config(10 * n, test_name_dir, 0)).unwrap();
|
||||
let s = servers::Server::new(framework::config(10 * n, test_name_dir, 0)).unwrap();
|
||||
servers.push(s);
|
||||
thread::sleep(time::Duration::from_millis(100));
|
||||
}
|
||||
|
@ -226,13 +228,13 @@ fn simulate_full_sync() {
|
|||
let test_name_dir = "grin-sync";
|
||||
framework::clean_all_output(test_name_dir);
|
||||
|
||||
let s1 = servers::Server::new(config(1000, "grin-sync", 1000)).unwrap();
|
||||
let s1 = servers::Server::new(framework::config(1000, "grin-sync", 1000)).unwrap();
|
||||
// mine a few blocks on server 1
|
||||
s1.start_miner(miner_config());
|
||||
thread::sleep(time::Duration::from_secs(8));
|
||||
|
||||
#[ignore(unused_mut)] // mut needed?
|
||||
let mut conf = config(1001, "grin-sync", 1000);
|
||||
let mut conf = framework::config(1001, "grin-sync", 1000);
|
||||
let s2 = servers::Server::new(conf).unwrap();
|
||||
while s2.head().height < 4 {
|
||||
thread::sleep(time::Duration::from_millis(100));
|
||||
|
@ -253,7 +255,7 @@ fn simulate_fast_sync() {
|
|||
let test_name_dir = "grin-fast";
|
||||
framework::clean_all_output(test_name_dir);
|
||||
|
||||
let s1 = servers::Server::new(config(2000, "grin-fast", 2000)).unwrap();
|
||||
let s1 = servers::Server::new(framework::config(2000, "grin-fast", 2000)).unwrap();
|
||||
// mine a few blocks on server 1
|
||||
s1.start_miner(miner_config());
|
||||
thread::sleep(time::Duration::from_secs(8));
|
||||
|
@ -280,7 +282,7 @@ fn simulate_fast_sync_double() {
|
|||
framework::clean_all_output("grin-double-fast1");
|
||||
framework::clean_all_output("grin-double-fast2");
|
||||
|
||||
let s1 = servers::Server::new(config(3000, "grin-double-fast1", 3000)).unwrap();
|
||||
let s1 = servers::Server::new(framework::config(3000, "grin-double-fast1", 3000)).unwrap();
|
||||
// mine a few blocks on server 1
|
||||
s1.start_miner(miner_config());
|
||||
thread::sleep(time::Duration::from_secs(8));
|
||||
|
@ -308,36 +310,3 @@ fn simulate_fast_sync_double() {
|
|||
s1.stop();
|
||||
s2.stop();
|
||||
}
|
||||
|
||||
fn config(n: u16, test_name_dir: &str, seed_n: u16) -> servers::ServerConfig {
|
||||
servers::ServerConfig {
|
||||
api_http_addr: format!("127.0.0.1:{}", 20000 + n),
|
||||
db_root: format!("target/tmp/{}/grin-sync-{}", test_name_dir, n),
|
||||
p2p_config: p2p::P2PConfig {
|
||||
port: 10000 + n,
|
||||
..p2p::P2PConfig::default()
|
||||
},
|
||||
seeding_type: servers::Seeding::List,
|
||||
seeds: Some(vec![format!("127.0.0.1:{}", 10000 + seed_n)]),
|
||||
chain_type: core::global::ChainTypes::AutomatedTesting,
|
||||
archive_mode: Some(true),
|
||||
skip_sync_wait: Some(true),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
fn miner_config() -> pow::types::MinerConfig {
|
||||
let mut plugin_config = pow::types::CuckooMinerPluginConfig::default();
|
||||
let mut plugin_config_vec: Vec<pow::types::CuckooMinerPluginConfig> = Vec::new();
|
||||
plugin_config.type_filter = String::from("mean_cpu");
|
||||
plugin_config_vec.push(plugin_config);
|
||||
|
||||
pow::types::MinerConfig {
|
||||
enable_mining: true,
|
||||
burn_reward: true,
|
||||
miner_async_mode: Some(false),
|
||||
miner_plugin_dir: None,
|
||||
miner_plugin_config: Some(plugin_config_vec),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
|
150
servers/tests/stratum.rs
Normal file
150
servers/tests/stratum.rs
Normal file
|
@ -0,0 +1,150 @@
|
|||
// Copyright 2018 The Grin Developers
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
extern crate grin_api as api;
|
||||
extern crate grin_chain as chain;
|
||||
extern crate grin_core as core;
|
||||
extern crate grin_servers as servers;
|
||||
extern crate grin_p2p as p2p;
|
||||
extern crate grin_pow as pow;
|
||||
extern crate grin_util as util;
|
||||
extern crate grin_wallet as wallet;
|
||||
|
||||
extern crate bufstream;
|
||||
extern crate serde_json;
|
||||
|
||||
mod framework;
|
||||
|
||||
use std::io::prelude::*;
|
||||
use std::net::TcpStream;
|
||||
use bufstream::BufStream;
|
||||
use serde_json::Value;
|
||||
|
||||
use std::thread;
|
||||
use std::time;
|
||||
|
||||
use core::global;
|
||||
use core::global::ChainTypes;
|
||||
|
||||
use framework::{config, miner_config};
|
||||
|
||||
// Create a grin server, and a stratum server.
|
||||
// Simulate a few JSONRpc requests and verify the results.
|
||||
// Validate disconnected workers
|
||||
// Validate broadcasting new jobs
|
||||
#[test]
|
||||
fn basic_stratum_server() {
|
||||
util::init_test_logger();
|
||||
global::set_mining_mode(ChainTypes::AutomatedTesting);
|
||||
|
||||
let test_name_dir = "stratum_server";
|
||||
framework::clean_all_output(test_name_dir);
|
||||
|
||||
// Create a server
|
||||
let s = servers::Server::new(config(4000, test_name_dir, 0)).unwrap();
|
||||
|
||||
// Get mining config with stratumserver enabled
|
||||
let mut miner_cfg = miner_config();
|
||||
miner_cfg.enable_mining = false;
|
||||
miner_cfg.attempt_time_per_block = 999;
|
||||
miner_cfg.enable_stratum_server = true;
|
||||
miner_cfg.stratum_server_addr = Some(String::from("127.0.0.1:11101"));
|
||||
|
||||
// Start stratum server
|
||||
s.start_stratum_server(miner_cfg);
|
||||
|
||||
// Wait for stratum server to start and
|
||||
// Verify stratum server accepts connections
|
||||
loop {
|
||||
if let Ok(_stream) = TcpStream::connect("127.0.0.1:11101") {
|
||||
break;
|
||||
} else {
|
||||
thread::sleep(time::Duration::from_millis(500));
|
||||
}
|
||||
// As this stream falls out of scope it will be disconnected
|
||||
}
|
||||
|
||||
// Create a few new worker connections
|
||||
let mut workers = vec![];
|
||||
for _n in 0..5 {
|
||||
let w = TcpStream::connect("127.0.0.1:11101").unwrap();
|
||||
w.set_nonblocking(true)
|
||||
.expect("Failed to set TcpStream to non-blocking");
|
||||
let stream = BufStream::new(w);
|
||||
workers.push(stream);
|
||||
}
|
||||
assert!(workers.len() == 5);
|
||||
|
||||
// Simulate a worker lost connection
|
||||
workers.remove(4);
|
||||
|
||||
// Swallow the genesis block
|
||||
thread::sleep(time::Duration::from_secs(1)); // Wait for the server to broadcast
|
||||
let mut response = String::new();
|
||||
for n in 0..workers.len() {
|
||||
let _result = workers[n].read_line(&mut response);
|
||||
}
|
||||
|
||||
// Verify a few stratum JSONRpc commands
|
||||
// getjobtemplate - expected block template result
|
||||
let mut response = String::new();
|
||||
let job_req = "{\"id\": \"Stratum\", \"jsonrpc\": \"2.0\", \"method\": \"getjobtemplate\"}\n";
|
||||
workers[2].write(job_req.as_bytes()).unwrap();
|
||||
workers[2].flush().unwrap();
|
||||
thread::sleep(time::Duration::from_secs(1)); // Wait for the server to reply
|
||||
match workers[2].read_line(&mut response) {
|
||||
Ok(_) => {
|
||||
let r: Value = serde_json::from_str(&response).unwrap();
|
||||
assert_eq!(r["error"], serde_json::Value::Null);
|
||||
assert_ne!(r["result"], serde_json::Value::Null);
|
||||
}
|
||||
Err(_e) => {
|
||||
assert!(false);
|
||||
}
|
||||
}
|
||||
|
||||
// keepalive - expected "ok" result
|
||||
let mut response = String::new();
|
||||
let job_req = "{\"id\":\"3\",\"jsonrpc\":\"2.0\",\"method\":\"keepalive\"}\n";
|
||||
let ok_resp = "{\"id\":\"3\",\"jsonrpc\":\"2.0\",\"result\":\"ok\",\"error\":null}\n";
|
||||
workers[2].write(job_req.as_bytes()).unwrap();
|
||||
workers[2].flush().unwrap();
|
||||
thread::sleep(time::Duration::from_secs(1)); // Wait for the server to reply
|
||||
let _st = workers[2].read_line(&mut response);
|
||||
assert_eq!(response.as_str(), ok_resp);
|
||||
|
||||
// "doesnotexist" - error expected
|
||||
let mut response = String::new();
|
||||
let job_req = "{\"id\":\"4\",\"jsonrpc\":\"2.0\",\"method\":\"doesnotexist\"}\n";
|
||||
let ok_resp = "{\"id\":\"4\",\"jsonrpc\":\"2.0\",\"result\":null,\"error\":{\"code\":-32601,\"message\":\"Method not found\"}}\n";
|
||||
workers[3].write(job_req.as_bytes()).unwrap();
|
||||
workers[3].flush().unwrap();
|
||||
thread::sleep(time::Duration::from_secs(1)); // Wait for the server to reply
|
||||
let _st = workers[3].read_line(&mut response);
|
||||
assert_eq!(response.as_str(), ok_resp);
|
||||
|
||||
// Simulate a worker lost connection
|
||||
workers.remove(1);
|
||||
|
||||
// Start mining blocks
|
||||
s.start_miner(miner_config());
|
||||
|
||||
// Verify blocks are being broadcast to workers
|
||||
let expected = String::from("job");
|
||||
thread::sleep(time::Duration::from_secs(3)); // Wait for a few mined blocks
|
||||
let mut jobtemplate = String::new();
|
||||
let _st = workers[2].read_line(&mut jobtemplate);
|
||||
let job_template: Value = serde_json::from_str(&jobtemplate).unwrap();
|
||||
assert_eq!(job_template["method"], expected);
|
||||
}
|
Loading…
Reference in a new issue