diff --git a/src/gui/views/network.rs b/src/gui/views/network.rs index afd7846..0e8cb95 100644 --- a/src/gui/views/network.rs +++ b/src/gui/views/network.rs @@ -14,7 +14,9 @@ use std::borrow::Cow; use std::collections::hash_map::DefaultHasher; +use std::sync::atomic::Ordering; use std::time::Duration; +use chrono::Utc; use eframe::epaint::{Color32, FontId, Stroke}; use eframe::epaint::text::{LayoutJob, TextFormat, TextWrapping}; use egui::{Response, RichText, Sense, Spinner, Widget}; @@ -160,6 +162,8 @@ impl Network { } fn draw_title_text(&self, mut builder: StripBuilder) { + let Self { node, ..} = self; + let title_text = match &self.current_mode { Mode::Node => { self.node_view.title() @@ -172,10 +176,9 @@ impl Network { } }; - - let state = self.node.acquire_state(); - let syncing = state.stats.is_some() && - state.stats.as_ref().unwrap().sync_status != SyncStatus::NoSync; + let r_stats = node.state.read_stats(); + let syncing = r_stats.is_some() && + r_stats.as_ref().unwrap().sync_status != SyncStatus::NoSync; let mut b = builder.size(Size::remainder()); if syncing { @@ -188,15 +191,14 @@ impl Network { }); }); if syncing { - let stats = state.stats.as_ref().unwrap(); strip.cell(|ui| { ui.centered_and_justified(|ui| { - let status_text = if state.is_stopping() { + let status_text = if node.state.is_stopping() { get_sync_status(SyncStatus::Shutdown).to_string() - } else if state.is_restarting() { + } else if node.state.is_restarting() { "Restarting".to_string() } else { - get_sync_status(stats.sync_status).to_string() + get_sync_status(r_stats.as_ref().unwrap().sync_status).to_string() }; let mut job = LayoutJob::single_section(status_text, TextFormat { font_id: FontId::proportional(15.0), @@ -228,17 +230,22 @@ fn get_sync_status(sync_status: SyncStatus) -> Cow<'static, str> { .. } => { if highest_height == 0 { - Cow::Borrowed("Downloading headers data") + Cow::Borrowed("Downloading headers") } else { let percent = sync_head.height * 100 / highest_height; Cow::Owned(format!("Downloading headers: {}%", percent)) } } SyncStatus::TxHashsetDownload(stat) => { - Cow::Borrowed("Downloading chain state") + if stat.total_size > 0 { + let percent = stat.downloaded_size * 100 / stat.total_size; + Cow::Owned(format!("Downloading chain state: {}%", percent)) + } else { + Cow::Borrowed("Downloading chain state") + } } SyncStatus::TxHashsetSetup => { - Cow::Borrowed("Preparing chain state for validation") + Cow::Borrowed("Preparing state for validation") } SyncStatus::TxHashsetRangeProofsValidation { rproofs, diff --git a/src/node/node.rs b/src/node/node.rs index 2b6a6c5..cd0c76b 100644 --- a/src/node/node.rs +++ b/src/node/node.rs @@ -16,8 +16,8 @@ use std::{fs, thread}; use std::fmt::format; use std::ops::Deref; use std::path::{Path, PathBuf}; -use std::sync::{Arc, LockResult, mpsc, Mutex, MutexGuard}; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, LockResult, RwLock, RwLockReadGuard}; +use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering}; use std::thread::JoinHandle; use std::time::Duration; @@ -34,101 +34,89 @@ use log::info; pub struct Node { /// Node state updated from the separate thread - node_state: Arc>, + pub(crate) state: Arc, } impl Node { /// Instantiate new node with provided chain type, start server if needed pub fn new(chain_type: ChainTypes, start: bool) -> Self { - let stop_state = Arc::new(StopState::new()); - let mut state = NodeState::new(chain_type, stop_state.clone()); - let node_state = Arc::new(Mutex::new(state)); + let state = Arc::new(NodeState::new(chain_type)); if start { - let server = start_server(&chain_type, stop_state); - start_server_thread(node_state.clone(), server); - } else { - stop_state.stop(); + Self::start_server(state.clone(), chain_type); } - Self { node_state } - } - - /// Acquire node state to be used by a current thread - pub fn acquire_state(&self) -> MutexGuard<'_, NodeState> { - self.node_state.lock().unwrap() + Self { state } } /// Stop server pub fn stop(&self) { - self.acquire_state().stop_needed = true; + self.state.stop_needed.store(true, Ordering::Relaxed); } /// Start server with provided chain type pub fn start(&self, chain_type: ChainTypes) { - let mut state = self.node_state.lock().unwrap(); - if state.stop_state.is_stopped() { - self.start_with_acquired_state(state, chain_type); + if !self.state.is_restarting() && !self.state.is_running() { + Self::start_server(self.state.clone(), chain_type); } } + fn start_server(state: Arc, chain_type: ChainTypes) { + let server = start_server(&chain_type); + start_server_thread(state, server); + } + /// Restart server with provided chain type pub fn restart(&mut self, chain_type: ChainTypes) { - let mut state = self.acquire_state(); - if !state.stop_state.is_stopped() { - state.chain_type = chain_type; - state.restart_needed = true; + if self.state.is_running() { + self.state.restart_needed.store(true, Ordering::Relaxed); } else { - self.start_with_acquired_state(state, chain_type); + self.start(chain_type); } } - - /// Start server with provided acquired state - fn start_with_acquired_state(&self, mut state: MutexGuard, chain_type: ChainTypes) { - state.chain_type = chain_type; - state.stop_state = Arc::new(StopState::new()); - - let server = start_server(&chain_type, state.stop_state.clone()); - start_server_thread(self.node_state.clone(), server); - } } pub struct NodeState { - /// To check server state - stop_state: Arc, - /// Data for UI, None means server is not started - pub(crate) stats: Option, + /// Data for UI, None means server is not running + stats: Arc>>, /// Chain type of launched server - chain_type: ChainTypes, + chain_type: Arc, /// Thread flag to stop the server and start it again - restart_needed: bool, + restart_needed: AtomicBool, /// Thread flag to stop the server - stop_needed: bool, + stop_needed: AtomicBool, } impl NodeState { /// Instantiate new node state with provided chain type and server state - pub fn new(chain_type: ChainTypes, stop_state: Arc) -> Self { + pub fn new(chain_type: ChainTypes) -> Self { Self { - stop_state, - stats: None, - chain_type, - restart_needed: false, - stop_needed: false, + stats: Arc::new(RwLock::new(None)), + chain_type: Arc::new(chain_type), + restart_needed: AtomicBool::new(false), + stop_needed: AtomicBool::new(false), } } - /// Check if server is stopping at separate thread + /// Check if server is stopping pub fn is_stopping(&self) -> bool { - return self.stop_needed + return self.stop_needed.load(Ordering::Relaxed) } - /// Check if server is restarting at separate thread + /// Check if server is restarting pub fn is_restarting(&self) -> bool { - return self.restart_needed + return self.restart_needed.load(Ordering::Relaxed) + } + + pub fn is_running(&self) -> bool { + self.read_stats().is_some() + } + + pub fn read_stats(&self) -> RwLockReadGuard<'_, Option> { + self.stats.read().unwrap() } } -/// Start server with provided chain type and node state -fn start_server(chain_type: &ChainTypes, stop_state: Arc) -> Server { +/// Start server with provided chain type +fn start_server(chain_type: &ChainTypes) -> Server { let mut node_config_result = config::initial_setup_server(chain_type); if node_config_result.is_err() { // Remove config file on init error @@ -146,6 +134,7 @@ fn start_server(chain_type: &ChainTypes, stop_state: Arc) -> Server { let config = node_config.clone().unwrap(); let server_config = config.members.as_ref().unwrap().server.clone(); + // Remove lock file (in case if we have running node from another app) let mut db_path = PathBuf::from(&server_config.db_root); db_path.push("grin.lock"); fs::remove_file(db_path).unwrap(); @@ -188,7 +177,7 @@ fn start_server(chain_type: &ChainTypes, stop_state: Arc) -> Server { let api_chan: &'static mut (oneshot::Sender<()>, oneshot::Receiver<()>) = Box::leak(Box::new(oneshot::channel::<()>())); - let mut server_result = Server::new(server_config.clone(), Some(stop_state.clone()), api_chan); + let mut server_result = Server::new(server_config.clone(), None, api_chan); if server_result.is_err() { let mut db_path = PathBuf::from(&server_config.db_root); db_path.push("grin.lock"); @@ -207,35 +196,37 @@ fn start_server(chain_type: &ChainTypes, stop_state: Arc) -> Server { let server_config = config.members.as_ref().unwrap().server.clone(); let api_chan: &'static mut (oneshot::Sender<()>, oneshot::Receiver<()>) = Box::leak(Box::new(oneshot::channel::<()>())); - server_result = Server::new(server_config.clone(), Some(stop_state.clone()), api_chan); + server_result = Server::new(server_config.clone(), None, api_chan); } server_result.unwrap() } /// Start a thread to launch server and update node state with server stats -fn start_server_thread(node_state: Arc>, mut server: Server) -> JoinHandle<()> { +fn start_server_thread(state: Arc, mut server: Server) -> JoinHandle<()> { thread::spawn(move || loop { thread::sleep(Duration::from_millis(500)); - let mut state = node_state.lock().unwrap(); - if state.restart_needed { + + if state.is_restarting() { server.stop(); - // Create new server with new stop state - state.stop_state = Arc::new(StopState::new()); - server = start_server(&state.chain_type, state.stop_state.clone()); + // Create new server with current chain type + server = start_server(&state.chain_type); - state.restart_needed = false; - } else if state.stop_needed { + state.restart_needed.store(false, Ordering::Relaxed); + } else if state.is_stopping() { server.stop(); - state.stats = None; - state.stop_needed = false; + + let mut w_stats = state.stats.write().unwrap(); + *w_stats = None; + + state.stop_needed.store(false, Ordering::Relaxed); break; - } - if !state.stop_state.is_stopped() { + } else { let stats = server.get_server_stats(); if stats.is_ok() { - state.stats = Some(stats.as_ref().ok().unwrap().clone()); + let mut w_stats = state.stats.write().unwrap(); + *w_stats = Some(stats.as_ref().ok().unwrap().clone()); } } })