fix deadlock between get_status and main loop (#2556)

* fix deadlock between get_status and main loop

* isolate all shared state in one struct in order to avoid deadlocks
This commit is contained in:
e-max 2019-02-15 22:22:53 +01:00 committed by hashmap
parent 2f5fbb3ce8
commit cbac14c135

View file

@ -169,48 +169,59 @@ pub struct WorkerStatus {
stale: u64, stale: u64,
} }
struct State {
current_block_versions: Vec<Block>,
// to prevent the wallet from generating a new HD key derivation for each
// iteration, we keep the returned derivation to provide it back when
// nothing has changed. We only want to create a key_id for each new block,
// and reuse it when we rebuild the current block to add new tx.
current_key_id: Option<keychain::Identifier>,
current_difficulty: u64,
minimum_share_difficulty: u64,
}
impl State {
pub fn new(minimum_share_difficulty: u64) -> Self {
let blocks = vec![Block::default()];
State {
current_block_versions: blocks,
current_key_id: None,
current_difficulty: <u64>::max_value(),
minimum_share_difficulty: minimum_share_difficulty,
}
}
}
struct Handler { struct Handler {
id: String, id: String,
workers: Arc<WorkersList>, workers: Arc<WorkersList>,
current_block_versions: Arc<RwLock<Vec<Block>>>,
sync_state: Arc<SyncState>, sync_state: Arc<SyncState>,
minimum_share_difficulty: Arc<RwLock<u64>>,
current_key_id: Arc<RwLock<Option<keychain::Identifier>>>,
current_difficulty: Arc<RwLock<u64>>,
chain: Arc<chain::Chain>, chain: Arc<chain::Chain>,
current_state: Arc<RwLock<State>>,
} }
impl Handler { impl Handler {
pub fn new( pub fn new(
id: String, id: String,
workers: Arc<WorkersList>, stratum_stats: Arc<RwLock<StratumStats>>,
current_block_versions: Arc<RwLock<Vec<Block>>>,
sync_state: Arc<SyncState>, sync_state: Arc<SyncState>,
minimum_share_difficulty: Arc<RwLock<u64>>, minimum_share_difficulty: u64,
current_key_id: Arc<RwLock<Option<keychain::Identifier>>>,
current_difficulty: Arc<RwLock<u64>>,
chain: Arc<chain::Chain>, chain: Arc<chain::Chain>,
) -> Self { ) -> Self {
Handler { Handler {
id, id: id,
workers, workers: Arc::new(WorkersList::new(stratum_stats.clone())),
current_block_versions, sync_state: sync_state,
sync_state, chain: chain,
minimum_share_difficulty, current_state: Arc::new(RwLock::new(State::new(minimum_share_difficulty))),
current_key_id,
current_difficulty,
chain,
} }
} }
pub fn from_stratum(stratum: &StratumServer) -> Self { pub fn from_stratum(stratum: &StratumServer) -> Self {
Handler::new( Handler::new(
stratum.id.clone(), stratum.id.clone(),
stratum.workers.clone(), stratum.stratum_stats.clone(),
stratum.current_block_versions.clone(),
stratum.sync_state.clone(), stratum.sync_state.clone(),
stratum.minimum_share_difficulty.clone(), stratum.config.minimum_share_difficulty,
stratum.current_key_id.clone(),
stratum.current_difficulty.clone(),
stratum.chain.clone(), stratum.chain.clone(),
) )
} }
@ -224,8 +235,7 @@ impl Handler {
let res = self.handle_submit(request.params, worker_id); let res = self.handle_submit(request.params, worker_id);
// this key_id has been used now, reset // this key_id has been used now, reset
if let Ok((_, true)) = res { if let Ok((_, true)) = res {
let mut current_key_id = self.current_key_id.write(); self.current_state.write().current_key_id = None;
*current_key_id = None;
} }
res.map(|(v, _)| v) res.map(|(v, _)| v)
} }
@ -265,14 +275,7 @@ impl Handler {
} }
fn handle_login(&self, params: Option<Value>, worker_id: usize) -> Result<Value, RpcError> { fn handle_login(&self, params: Option<Value>, worker_id: usize) -> Result<Value, RpcError> {
let params: LoginParams = parse_params(params)?; let params: LoginParams = parse_params(params)?;
let mut workers = self.workers.workers_list.write(); self.workers.login(worker_id, params.login, params.agent)?;
let worker = workers
.get_mut(&worker_id)
.ok_or(RpcError::internal_error())?;
worker.login = Some(params.login);
// XXX TODO Future - Validate password?
worker.agent = params.agent;
worker.authenticated = true;
return Ok("ok".into()); return Ok("ok".into());
} }
@ -283,21 +286,21 @@ impl Handler {
fn handle_status(&self, worker_id: usize) -> Result<Value, RpcError> { fn handle_status(&self, worker_id: usize) -> Result<Value, RpcError> {
// Return worker status in json for use by a dashboard or healthcheck. // Return worker status in json for use by a dashboard or healthcheck.
let stats = self.workers.get_stats(worker_id)?;
let status = WorkerStatus { let status = WorkerStatus {
id: self.workers.stratum_stats.read().worker_stats[worker_id] id: stats.id.clone(),
.id
.clone(),
height: self height: self
.current_block_versions .current_state
.read() .read()
.current_block_versions
.last() .last()
.unwrap() .unwrap()
.header .header
.height, .height,
difficulty: self.workers.stratum_stats.read().worker_stats[worker_id].pow_difficulty, difficulty: stats.pow_difficulty,
accepted: self.workers.stratum_stats.read().worker_stats[worker_id].num_accepted, accepted: stats.num_accepted,
rejected: self.workers.stratum_stats.read().worker_stats[worker_id].num_rejected, rejected: stats.num_rejected,
stale: self.workers.stratum_stats.read().worker_stats[worker_id].num_stale, stale: stats.num_stale,
}; };
let response = serde_json::to_value(&status).unwrap(); let response = serde_json::to_value(&status).unwrap();
return Ok(response); return Ok(response);
@ -317,8 +320,9 @@ impl Handler {
// Build and return a JobTemplate for mining the current block // Build and return a JobTemplate for mining the current block
fn build_block_template(&self) -> JobTemplate { fn build_block_template(&self) -> JobTemplate {
let bh = self let bh = self
.current_block_versions .current_state
.read() .read()
.current_block_versions
.last() .last()
.unwrap() .unwrap()
.header .header
@ -333,8 +337,8 @@ impl Handler {
let pre_pow = util::to_hex(header_buf); let pre_pow = util::to_hex(header_buf);
let job_template = JobTemplate { let job_template = JobTemplate {
height: bh.height, height: bh.height,
job_id: (self.current_block_versions.read().len() - 1) as u64, job_id: (self.current_state.read().current_block_versions.len() - 1) as u64,
difficulty: *self.minimum_share_difficulty.read(), difficulty: self.current_state.read().minimum_share_difficulty,
pre_pow, pre_pow,
}; };
return job_template; return job_template;
@ -352,17 +356,11 @@ impl Handler {
// Validate parameters // Validate parameters
let params: SubmitParams = parse_params(params)?; let params: SubmitParams = parse_params(params)?;
let current_block_versions = self.current_block_versions.read(); let state = self.current_state.read();
// Find the correct version of the block to match this header // Find the correct version of the block to match this header
let b: Option<&Block> = current_block_versions.get(params.job_id as usize); let b: Option<&Block> = state.current_block_versions.get(params.job_id as usize);
if params.height if params.height != state.current_block_versions.last().unwrap().header.height
!= self || b.is_none()
.current_block_versions
.read()
.last()
.unwrap()
.header
.height || b.is_none()
{ {
// Return error status // Return error status
error!( error!(
@ -396,11 +394,11 @@ impl Handler {
// Get share difficulty // Get share difficulty
share_difficulty = b.header.pow.to_difficulty(b.header.height).to_num(); share_difficulty = b.header.pow.to_difficulty(b.header.height).to_num();
// If the difficulty is too low its an error // If the difficulty is too low its an error
if share_difficulty < *self.minimum_share_difficulty.read() { if share_difficulty < state.minimum_share_difficulty {
// Return error status // Return error status
error!( error!(
"(Server ID: {}) Share at height {}, hash {}, edge_bits {}, nonce {}, job_id {} rejected due to low difficulty: {}/{}", "(Server ID: {}) Share at height {}, hash {}, edge_bits {}, nonce {}, job_id {} rejected due to low difficulty: {}/{}",
self.id, params.height, b.hash(), params.edge_bits, params.nonce, params.job_id, share_difficulty, *self.minimum_share_difficulty.read(), self.id, params.height, b.hash(), params.edge_bits, params.nonce, params.job_id, share_difficulty, state.minimum_share_difficulty,
); );
self.workers self.workers
.update_stats(worker_id, |worker_stats| worker_stats.num_rejected += 1); .update_stats(worker_id, |worker_stats| worker_stats.num_rejected += 1);
@ -408,7 +406,7 @@ impl Handler {
} }
// If the difficulty is high enough, submit it (which also validates it) // If the difficulty is high enough, submit it (which also validates it)
if share_difficulty >= *self.current_difficulty.read() { if share_difficulty >= state.current_difficulty {
// This is a full solution, submit it to the network // This is a full solution, submit it to the network
let res = self.chain.process_block(b.clone(), chain::Options::MINE); let res = self.chain.process_block(b.clone(), chain::Options::MINE);
if let Err(e) = res { if let Err(e) = res {
@ -432,13 +430,14 @@ impl Handler {
self.workers self.workers
.update_stats(worker_id, |worker_stats| worker_stats.num_blocks_found += 1); .update_stats(worker_id, |worker_stats| worker_stats.num_blocks_found += 1);
// Log message to make it obvious we found a block // Log message to make it obvious we found a block
let stats = self.workers.get_stats(worker_id)?;
warn!( warn!(
"(Server ID: {}) Solution Found for block {}, hash {} - Yay!!! Worker ID: {}, blocks found: {}, shares: {}", "(Server ID: {}) Solution Found for block {}, hash {} - Yay!!! Worker ID: {}, blocks found: {}, shares: {}",
self.id, params.height, self.id, params.height,
b.hash(), b.hash(),
self.workers.stratum_stats.read().worker_stats[worker_id].id, stats.id,
self.workers.stratum_stats.read().worker_stats[worker_id].num_blocks_found, stats.num_blocks_found,
self.workers.stratum_stats.read().worker_stats[worker_id].num_accepted, stats.num_accepted,
); );
} else { } else {
// Do some validation but dont submit // Do some validation but dont submit
@ -461,25 +460,12 @@ impl Handler {
} }
} }
// Log this as a valid share // Log this as a valid share
let submitted_by = match self let worker = self.workers.get_worker(worker_id)?;
.workers let submitted_by = match worker.login {
.workers_list None => worker.id.to_string(),
.read()
.get(&worker_id)
.unwrap()
.login
.clone()
{
None => self
.workers
.workers_list
.read()
.get(&worker_id)
.unwrap()
.id
.to_string(),
Some(login) => login.clone(), Some(login) => login.clone(),
}; };
info!( info!(
"(Server ID: {}) Got share at height {}, hash {}, edge_bits {}, nonce {}, job_id {}, difficulty {}/{}, submitted by {}", "(Server ID: {}) Got share at height {}, hash {}, edge_bits {}, nonce {}, job_id {}, difficulty {}/{}, submitted by {}",
self.id, self.id,
@ -489,7 +475,7 @@ impl Handler {
b.header.pow.nonce, b.header.pow.nonce,
params.job_id, params.job_id,
share_difficulty, share_difficulty,
*self.current_difficulty.read(), state.current_difficulty,
submitted_by, submitted_by,
); );
self.workers self.workers
@ -505,17 +491,109 @@ impl Handler {
share_is_block, share_is_block,
)); ));
} // handle submit a solution } // handle submit a solution
fn broadcast_job(&self) {
debug!("broadcast job");
// Package new block into RpcRequest
let job_template = self.build_block_template();
let job_template_json = serde_json::to_string(&job_template).unwrap();
// Issue #1159 - use a serde_json Value type to avoid extra quoting
let job_template_value: Value = serde_json::from_str(&job_template_json).unwrap();
let job_request = RpcRequest {
id: String::from("Stratum"),
jsonrpc: String::from("2.0"),
method: String::from("job"),
params: Some(job_template_value),
};
let job_request_json = serde_json::to_string(&job_request).unwrap();
debug!(
"(Server ID: {}) sending block {} with id {} to stratum clients",
self.id, job_template.height, job_template.job_id,
);
self.workers.broadcast(job_request_json.clone());
}
pub fn run(
&self,
config: &StratumServerConfig,
tx_pool: &Arc<RwLock<pool::TransactionPool>>,
verifier_cache: Arc<RwLock<dyn VerifierCache>>,
) {
debug!("Run main loop");
let mut deadline: i64 = 0;
let mut head = self.chain.head().unwrap();
let mut current_hash = head.prev_block_h;
loop {
// get the latest chain state
head = self.chain.head().unwrap();
let latest_hash = head.last_block_h;
// Build a new block if:
// There is a new block on the chain
// or We are rebuilding the current one to include new transactions
// and there is at least one worker connected
if (current_hash != latest_hash || Utc::now().timestamp() >= deadline)
&& self.workers.count() > 0
{
{
debug!("resend updated block");
let mut state = self.current_state.write();
let mut wallet_listener_url: Option<String> = None;
if !config.burn_reward {
wallet_listener_url = Some(config.wallet_listener_url.clone());
}
// If this is a new block, clear the current_block version history
let clear_blocks = current_hash != latest_hash;
// Build the new block (version)
let (new_block, block_fees) = mine_block::get_block(
&self.chain,
tx_pool,
verifier_cache.clone(),
state.current_key_id.clone(),
wallet_listener_url,
);
state.current_difficulty =
(new_block.header.total_difficulty() - head.total_difficulty).to_num();
state.current_key_id = block_fees.key_id();
current_hash = latest_hash;
// set the minimum acceptable share difficulty for this block
state.minimum_share_difficulty =
cmp::min(config.minimum_share_difficulty, state.current_difficulty);
// set a new deadline for rebuilding with fresh transactions
deadline = Utc::now().timestamp() + config.attempt_time_per_block as i64;
self.workers.update_block_height(new_block.header.height);
self.workers
.update_network_difficulty(state.current_difficulty);
if clear_blocks {
state.current_block_versions.clear();
}
state.current_block_versions.push(new_block);
// Send this job to all connected workers
}
self.broadcast_job();
}
// sleep before restarting loop
thread::sleep(Duration::from_millis(5));
} // Main Loop
}
} }
// ---------------------------------------- // ----------------------------------------
// Worker Factory Thread Function // Worker Factory Thread Function
fn accept_connections(listen_addr: SocketAddr, handler: Handler) { fn accept_connections(listen_addr: SocketAddr, handler: Arc<Handler>) {
info!("Start tokio stratum server"); info!("Start tokio stratum server");
let listener = TcpListener::bind(&listen_addr).expect(&format!( let listener = TcpListener::bind(&listen_addr).expect(&format!(
"Stratum: Failed to bind to listen address {}", "Stratum: Failed to bind to listen address {}",
listen_addr listen_addr
)); ));
let handler = Arc::new(handler);
let server = listener let server = listener
.incoming() .incoming()
.for_each(move |socket| { .for_each(move |socket| {
@ -564,6 +642,7 @@ fn accept_connections(listen_addr: SocketAddr, handler: Handler) {
// ---------------------------------------- // ----------------------------------------
// Worker Object - a connected stratum client - a miner, pool, proxy, etc... // Worker Object - a connected stratum client - a miner, pool, proxy, etc...
#[derive(Clone)]
pub struct Worker { pub struct Worker {
id: usize, id: usize,
agent: String, agent: String,
@ -622,6 +701,36 @@ impl WorkersList {
self.stratum_stats.write().num_workers = self.workers_list.read().len(); self.stratum_stats.write().num_workers = self.workers_list.read().len();
} }
pub fn login(&self, worker_id: usize, login: String, agent: String) -> Result<(), RpcError> {
let mut wl = self.workers_list.write();
let mut worker = wl.get_mut(&worker_id).ok_or(RpcError::internal_error())?;
worker.login = Some(login);
// XXX TODO Future - Validate password?
worker.agent = agent;
worker.authenticated = true;
Ok(())
}
pub fn get_worker(&self, worker_id: usize) -> Result<Worker, RpcError> {
self.workers_list
.read()
.get(&worker_id)
.ok_or_else(|| {
error!("Worker {} not found", worker_id);
RpcError::internal_error()
})
.map(|w| w.clone())
}
pub fn get_stats(&self, worker_id: usize) -> Result<WorkerStats, RpcError> {
self.stratum_stats
.read()
.worker_stats
.get(worker_id)
.ok_or(RpcError::internal_error())
.map(|ws| ws.clone())
}
pub fn last_seen(&self, worker_id: usize) { pub fn last_seen(&self, worker_id: usize) {
//self.stratum_stats.write().worker_stats[worker_id].last_seen = SystemTime::now(); //self.stratum_stats.write().worker_stats[worker_id].last_seen = SystemTime::now();
self.update_stats(worker_id, |ws| ws.last_seen = SystemTime::now()); self.update_stats(worker_id, |ws| ws.last_seen = SystemTime::now());
@ -640,9 +749,26 @@ impl WorkersList {
.tx .tx
.unbounded_send(msg); .unbounded_send(msg);
} }
pub fn broadcast(&self, msg: String) {
for worker in self.workers_list.read().values() {
worker.tx.unbounded_send(msg.clone());
}
}
pub fn count(&self) -> usize { pub fn count(&self) -> usize {
self.workers_list.read().len() self.workers_list.read().len()
} }
pub fn update_block_height(&self, height: u64) {
let mut stratum_stats = self.stratum_stats.write();
stratum_stats.block_height = height;
}
pub fn update_network_difficulty(&self, difficulty: u64) {
let mut stratum_stats = self.stratum_stats.write();
stratum_stats.network_difficulty = difficulty;
}
} }
// ---------------------------------------- // ----------------------------------------
@ -654,11 +780,6 @@ pub struct StratumServer {
chain: Arc<chain::Chain>, chain: Arc<chain::Chain>,
tx_pool: Arc<RwLock<pool::TransactionPool>>, tx_pool: Arc<RwLock<pool::TransactionPool>>,
verifier_cache: Arc<RwLock<dyn VerifierCache>>, verifier_cache: Arc<RwLock<dyn VerifierCache>>,
current_block_versions: Arc<RwLock<Vec<Block>>>,
current_difficulty: Arc<RwLock<u64>>,
minimum_share_difficulty: Arc<RwLock<u64>>,
current_key_id: Arc<RwLock<Option<keychain::Identifier>>>,
workers: Arc<WorkersList>,
sync_state: Arc<SyncState>, sync_state: Arc<SyncState>,
stratum_stats: Arc<RwLock<StratumStats>>, stratum_stats: Arc<RwLock<StratumStats>>,
} }
@ -674,70 +795,15 @@ impl StratumServer {
) -> StratumServer { ) -> StratumServer {
StratumServer { StratumServer {
id: String::from("0"), id: String::from("0"),
minimum_share_difficulty: Arc::new(RwLock::new(config.minimum_share_difficulty)),
config, config,
chain, chain,
tx_pool, tx_pool,
verifier_cache, verifier_cache,
current_block_versions: Arc::new(RwLock::new(Vec::new())),
current_difficulty: Arc::new(RwLock::new(<u64>::max_value())),
current_key_id: Arc::new(RwLock::new(None)),
workers: Arc::new(WorkersList::new(stratum_stats.clone())),
sync_state: Arc::new(SyncState::new()), sync_state: Arc::new(SyncState::new()),
stratum_stats: stratum_stats, stratum_stats: stratum_stats,
} }
} }
// Build and return a JobTemplate for mining the current block
fn build_block_template(&self) -> JobTemplate {
let bh = self
.current_block_versions
.read()
.last()
.unwrap()
.header
.clone();
// Serialize the block header into pre and post nonce strings
let mut header_buf = vec![];
{
let mut writer = ser::BinWriter::new(&mut header_buf);
bh.write_pre_pow(&mut writer).unwrap();
bh.pow.write_pre_pow(bh.version, &mut writer).unwrap();
}
let pre_pow = util::to_hex(header_buf);
let job_template = JobTemplate {
height: bh.height,
job_id: (self.current_block_versions.read().len() - 1) as u64,
difficulty: *self.minimum_share_difficulty.read(),
pre_pow,
};
return job_template;
}
// Broadcast a jobtemplate RpcRequest to all connected workers - no response
// expected
fn broadcast_job(&mut self) {
// Package new block into RpcRequest
let job_template = self.build_block_template();
let job_template_json = serde_json::to_string(&job_template).unwrap();
// Issue #1159 - use a serde_json Value type to avoid extra quoting
let job_template_value: Value = serde_json::from_str(&job_template_json).unwrap();
let job_request = RpcRequest {
id: String::from("Stratum"),
jsonrpc: String::from("2.0"),
method: String::from("job"),
params: Some(job_template_value),
};
let job_request_json = serde_json::to_string(&job_request).unwrap();
debug!(
"(Server ID: {}) sending block {} with id {} to stratum clients",
self.id, job_template.height, job_template.job_id,
);
for worker in self.workers.workers_list.read().values() {
worker.tx.unbounded_send(job_request_json.clone());
}
}
/// "main()" - Starts the stratum-server. Creates a thread to Listens for /// "main()" - Starts the stratum-server. Creates a thread to Listens for
/// a connection, then enters a loop, building a new block on top of the /// a connection, then enters a loop, building a new block on top of the
/// existing chain anytime required and sending that to the connected /// existing chain anytime required and sending that to the connected
@ -751,16 +817,6 @@ impl StratumServer {
self.sync_state = sync_state; self.sync_state = sync_state;
// "globals" for this function
let attempt_time_per_block = self.config.attempt_time_per_block;
let mut deadline: i64 = 0;
// to prevent the wallet from generating a new HD key derivation for each
// iteration, we keep the returned derivation to provide it back when
// nothing has changed. We only want to create a key_id for each new block,
// and reuse it when we rebuild the current block to add new tx.
let mut head = self.chain.head().unwrap();
let mut current_hash = head.prev_block_h;
let mut latest_hash;
let listen_addr = self let listen_addr = self
.config .config
.stratum_server_addr .stratum_server_addr
@ -768,14 +824,12 @@ impl StratumServer {
.unwrap() .unwrap()
.parse() .parse()
.expect("Stratum: Incorrect address "); .expect("Stratum: Incorrect address ");
{
self.current_block_versions.write().push(Block::default());
}
let handler = Handler::from_stratum(&self); let handler = Arc::new(Handler::from_stratum(&self));
let h = handler.clone();
let _listener_th = thread::spawn(move || { let _listener_th = thread::spawn(move || {
accept_connections(listen_addr, handler); accept_connections(listen_addr, h);
}); });
// We have started // We have started
@ -795,72 +849,7 @@ impl StratumServer {
thread::sleep(Duration::from_millis(50)); thread::sleep(Duration::from_millis(50));
} }
// Main Loop handler.run(&self.config, &self.tx_pool, self.verifier_cache.clone());
loop {
// get the latest chain state
head = self.chain.head().unwrap();
latest_hash = head.last_block_h;
// Build a new block if:
// There is a new block on the chain
// or We are rebuilding the current one to include new transactions
// and there is at least one worker connected
if (current_hash != latest_hash || Utc::now().timestamp() >= deadline)
&& self.workers.count() > 0
{
{
let mut current_block_versions = self.current_block_versions.write();
let mut wallet_listener_url: Option<String> = None;
if !self.config.burn_reward {
wallet_listener_url = Some(self.config.wallet_listener_url.clone());
}
// If this is a new block, clear the current_block version history
if current_hash != latest_hash {
current_block_versions.clear();
}
// Build the new block (version)
let (new_block, block_fees) = mine_block::get_block(
&self.chain,
&self.tx_pool,
self.verifier_cache.clone(),
self.current_key_id.read().clone(),
wallet_listener_url,
);
{
let mut current_difficulty = self.current_difficulty.write();
*current_difficulty =
(new_block.header.total_difficulty() - head.total_difficulty).to_num();
}
{
let mut current_key_id = self.current_key_id.write();
*current_key_id = block_fees.key_id();
}
current_hash = latest_hash;
{
// set the minimum acceptable share difficulty for this block
let mut minimum_share_difficulty = self.minimum_share_difficulty.write();
*minimum_share_difficulty = cmp::min(
self.config.minimum_share_difficulty,
*self.current_difficulty.read(),
);
}
// set a new deadline for rebuilding with fresh transactions
deadline = Utc::now().timestamp() + attempt_time_per_block as i64;
let mut stratum_stats = self.stratum_stats.write();
stratum_stats.block_height = new_block.header.height;
stratum_stats.network_difficulty = *self.current_difficulty.read();
// Add this new block version to our current block map
current_block_versions.push(new_block);
}
// Send this job to all connected workers
self.broadcast_job();
}
// sleep before restarting loop
thread::sleep(Duration::from_millis(5));
} // Main Loop
} // fn run_loop() } // fn run_loop()
} // StratumServer } // StratumServer