diff --git a/src/node/stratum.rs b/src/node/stratum.rs index 9ff0ccc..b80c51d 100644 --- a/src/node/stratum.rs +++ b/src/node/stratum.rs @@ -331,6 +331,7 @@ impl Handler { // Build and return a JobTemplate for mining the current block fn build_block_template(&self) -> JobTemplate { + println!("1 build template 12345"); let bh = self .current_state .read() @@ -339,6 +340,7 @@ impl Handler { .unwrap() .header .clone(); + println!("2 build template 12345"); // Serialize the block header into pre and post nonce strings let mut header_buf = vec![]; { @@ -510,7 +512,7 @@ impl Handler { } // handle submit a solution fn broadcast_job(&self) { - debug!("broadcast job"); + println!("12345 broadcast job"); // Package new block into RpcRequest let job_template = self.build_block_template(); let job_template_json = serde_json::to_string(&job_template).unwrap(); @@ -536,8 +538,10 @@ impl Handler { let mut head = self.chain.head().unwrap(); let mut current_hash = head.prev_block_h; loop { + println!("12345 looping"); // Ping stratum socket on stop to handle TcpListener unbind. if stop_state.is_stopped() { + println!("12345 prepare ping to stop"); let listen_addr: SocketAddr = config .stratum_server_addr .clone() @@ -545,10 +549,14 @@ impl Handler { .parse() .expect("Stratum: Incorrect address "); thread::spawn(move || { + println!("12345 ping start"); let _ = TcpStream::connect(listen_addr).unwrap(); + println!("12345 ping end"); }); break; } + println!("12345 looping2"); + // get the latest chain state head = self.chain.head().unwrap(); @@ -561,8 +569,9 @@ impl Handler { && self.workers.count() > 0 { { - debug!("resend updated block"); + println!("12345 resend updated block"); let mut state = self.current_state.write(); + println!("12345 after resend updated block"); let wallet_listener_url = if !config.burn_reward { Some(config.wallet_listener_url.clone()) } else { @@ -572,12 +581,14 @@ impl Handler { let clear_blocks = current_hash != latest_hash; // Build the new block (version) + println!("12345 get_block"); let (new_block, block_fees) = mine_block::get_block( &self.chain, tx_pool, state.current_key_id.clone(), wallet_listener_url, ); + println!("12345 after get_block"); // scaled difficulty state.current_difficulty = @@ -598,14 +609,20 @@ impl Handler { } // Update the mining stats + println!("12345 update_block_height"); self.workers.update_block_height(new_block.header.height); + println!("12345 after update_block_height"); let difficulty = new_block.header.total_difficulty() - head.total_difficulty; + println!("12345 update_network_difficulty"); self.workers.update_network_difficulty(difficulty.to_num()); + println!("12345 after update_network_difficulty"); self.workers.update_network_hashrate(); + println!("12345 after update_network_hashrate"); // Add this new block candidate onto our list of block versions for this height state.current_block_versions.push(new_block); } + println!("12345 resend updated block exit"); // Send this job to all connected workers self.broadcast_job(); } @@ -626,64 +643,82 @@ fn accept_connections(listen_addr: SocketAddr, handler: Arc, stop_state panic!("Stratum: Failed to bind to listen address {}", listen_addr) }); let state_socket = &stop_state.clone(); - let server = listener - .incoming() - .filter_map(|s| async { s.map_err(|e| error!("accept error = {:?}", e)).ok() }) - .for_each(move |socket| { - let handler = handler.clone(); - async move { - // Stop listener on node server stop. - if state_socket.is_stopped() { - panic_any("Stopped"); - } - // Spawn a task to process the connection - let (tx, mut rx) = mpsc::unbounded(); - - let worker_id = handler.workers.add_worker(tx); - info!("Worker {} connected", worker_id); - - let framed = Framed::new(socket, LinesCodec::new()); - let (mut writer, mut reader) = framed.split(); - - let h = handler.clone(); - let read = async move { - while let Some(line) = reader - .try_next() - .await - .map_err(|e| error!("error reading line: {}", e))? - { - let request = serde_json::from_str(&line) - .map_err(|e| error!("error serializing line: {}", e))?; - let resp = h.handle_rpc_requests(request, worker_id); - h.workers.send_to(worker_id, resp); - } - - Result::<_, ()>::Ok(()) - }; - - let write = async move { - while let Some(line) = rx.next().await { - writer - .send(line) - .await - .map_err(|e| error!("error writing line: {}", e))?; - } - - Result::<_, ()>::Ok(()) - }; - - let task = async move { - pin_mut!(read, write); - futures::future::select(read, write).await; - handler.workers.remove_worker(worker_id); - info!("Worker {} disconnected", worker_id); - }; - tokio::spawn(task); + println!("12345 bind complete"); + loop { + println!("12345 socket starting"); + let (socket, _) = listener.accept().await.unwrap(); + // Stop listener on node server stop. + { + if state_socket.is_stopped() { + panic_any("Stopped"); } - }); - server.await + } + + println!("12345 socket"); + let handler = handler.clone(); + + let process = || async move { + // Spawn a task to process the connection + let (tx, mut rx) = mpsc::unbounded(); + + let worker_id = handler.workers.add_worker(tx); + info!("Worker {} connected", worker_id); + + let framed = Framed::new(socket, LinesCodec::new()); + let (mut writer, mut reader) = framed.split(); + + let h = handler.clone(); + let read = async move { + println!("12345 r: 1"); + while let Some(line) = reader + .try_next() + .await + .map_err(|e| error!("error reading line: {}", e))? + { + println!("12345 r: 2: {}", line); + let request = serde_json::from_str(&line) + .map_err(|e| println!("error serializing line: {}", e))?; + let resp = h.handle_rpc_requests(request, worker_id); + h.workers.send_to(worker_id, resp); + } + println!("12345 r: 3"); + + Result::<_, ()>::Ok(()) + }; + + let write = async move { + while let Some(line) = rx.next().await { + println!("12345 w: 1: {}", line); + writer + .send(line) + .await + .map_err(|e| println!("error writing line: {}", e))?; + } + println!("12345 w: 2"); + Result::<_, ()>::Ok(()) + }; + + let task = async move { + println!("12345 t: 1"); + pin_mut!(read, write); + println!("12345 t: 2"); + futures::future::select(read, write).await; + println!("12345 t: 3"); + handler.workers.remove_worker(worker_id); + println!("12345 t: 4"); + info!("Worker {} disconnected", worker_id); + }; + tokio::spawn(task); + + Result::<_, ()>::Ok(()) + }; + println!("12345 run process"); + + let _ = (process)().await; + println!("12345 after process"); + } }; - let mut rt = Runtime::new().unwrap(); + let rt = Runtime::new().unwrap(); rt.block_on(task); }