node: refactor sync state to avoid locks at main thread
This commit is contained in:
parent
ba5cd82f4b
commit
06f32dcb77
2 changed files with 77 additions and 79 deletions
|
@ -14,7 +14,9 @@
|
||||||
|
|
||||||
use std::borrow::Cow;
|
use std::borrow::Cow;
|
||||||
use std::collections::hash_map::DefaultHasher;
|
use std::collections::hash_map::DefaultHasher;
|
||||||
|
use std::sync::atomic::Ordering;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
use chrono::Utc;
|
||||||
use eframe::epaint::{Color32, FontId, Stroke};
|
use eframe::epaint::{Color32, FontId, Stroke};
|
||||||
use eframe::epaint::text::{LayoutJob, TextFormat, TextWrapping};
|
use eframe::epaint::text::{LayoutJob, TextFormat, TextWrapping};
|
||||||
use egui::{Response, RichText, Sense, Spinner, Widget};
|
use egui::{Response, RichText, Sense, Spinner, Widget};
|
||||||
|
@ -160,6 +162,8 @@ impl Network {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn draw_title_text(&self, mut builder: StripBuilder) {
|
fn draw_title_text(&self, mut builder: StripBuilder) {
|
||||||
|
let Self { node, ..} = self;
|
||||||
|
|
||||||
let title_text = match &self.current_mode {
|
let title_text = match &self.current_mode {
|
||||||
Mode::Node => {
|
Mode::Node => {
|
||||||
self.node_view.title()
|
self.node_view.title()
|
||||||
|
@ -172,10 +176,9 @@ impl Network {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let r_stats = node.state.read_stats();
|
||||||
let state = self.node.acquire_state();
|
let syncing = r_stats.is_some() &&
|
||||||
let syncing = state.stats.is_some() &&
|
r_stats.as_ref().unwrap().sync_status != SyncStatus::NoSync;
|
||||||
state.stats.as_ref().unwrap().sync_status != SyncStatus::NoSync;
|
|
||||||
|
|
||||||
let mut b = builder.size(Size::remainder());
|
let mut b = builder.size(Size::remainder());
|
||||||
if syncing {
|
if syncing {
|
||||||
|
@ -188,15 +191,14 @@ impl Network {
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
if syncing {
|
if syncing {
|
||||||
let stats = state.stats.as_ref().unwrap();
|
|
||||||
strip.cell(|ui| {
|
strip.cell(|ui| {
|
||||||
ui.centered_and_justified(|ui| {
|
ui.centered_and_justified(|ui| {
|
||||||
let status_text = if state.is_stopping() {
|
let status_text = if node.state.is_stopping() {
|
||||||
get_sync_status(SyncStatus::Shutdown).to_string()
|
get_sync_status(SyncStatus::Shutdown).to_string()
|
||||||
} else if state.is_restarting() {
|
} else if node.state.is_restarting() {
|
||||||
"Restarting".to_string()
|
"Restarting".to_string()
|
||||||
} else {
|
} else {
|
||||||
get_sync_status(stats.sync_status).to_string()
|
get_sync_status(r_stats.as_ref().unwrap().sync_status).to_string()
|
||||||
};
|
};
|
||||||
let mut job = LayoutJob::single_section(status_text, TextFormat {
|
let mut job = LayoutJob::single_section(status_text, TextFormat {
|
||||||
font_id: FontId::proportional(15.0),
|
font_id: FontId::proportional(15.0),
|
||||||
|
@ -228,17 +230,22 @@ fn get_sync_status(sync_status: SyncStatus) -> Cow<'static, str> {
|
||||||
..
|
..
|
||||||
} => {
|
} => {
|
||||||
if highest_height == 0 {
|
if highest_height == 0 {
|
||||||
Cow::Borrowed("Downloading headers data")
|
Cow::Borrowed("Downloading headers")
|
||||||
} else {
|
} else {
|
||||||
let percent = sync_head.height * 100 / highest_height;
|
let percent = sync_head.height * 100 / highest_height;
|
||||||
Cow::Owned(format!("Downloading headers: {}%", percent))
|
Cow::Owned(format!("Downloading headers: {}%", percent))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
SyncStatus::TxHashsetDownload(stat) => {
|
SyncStatus::TxHashsetDownload(stat) => {
|
||||||
Cow::Borrowed("Downloading chain state")
|
if stat.total_size > 0 {
|
||||||
|
let percent = stat.downloaded_size * 100 / stat.total_size;
|
||||||
|
Cow::Owned(format!("Downloading chain state: {}%", percent))
|
||||||
|
} else {
|
||||||
|
Cow::Borrowed("Downloading chain state")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
SyncStatus::TxHashsetSetup => {
|
SyncStatus::TxHashsetSetup => {
|
||||||
Cow::Borrowed("Preparing chain state for validation")
|
Cow::Borrowed("Preparing state for validation")
|
||||||
}
|
}
|
||||||
SyncStatus::TxHashsetRangeProofsValidation {
|
SyncStatus::TxHashsetRangeProofsValidation {
|
||||||
rproofs,
|
rproofs,
|
||||||
|
|
127
src/node/node.rs
127
src/node/node.rs
|
@ -16,8 +16,8 @@ use std::{fs, thread};
|
||||||
use std::fmt::format;
|
use std::fmt::format;
|
||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::sync::{Arc, LockResult, mpsc, Mutex, MutexGuard};
|
use std::sync::{Arc, LockResult, RwLock, RwLockReadGuard};
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering};
|
||||||
use std::thread::JoinHandle;
|
use std::thread::JoinHandle;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
|
@ -34,101 +34,89 @@ use log::info;
|
||||||
|
|
||||||
pub struct Node {
|
pub struct Node {
|
||||||
/// Node state updated from the separate thread
|
/// Node state updated from the separate thread
|
||||||
node_state: Arc<Mutex<NodeState>>,
|
pub(crate) state: Arc<NodeState>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Node {
|
impl Node {
|
||||||
/// Instantiate new node with provided chain type, start server if needed
|
/// Instantiate new node with provided chain type, start server if needed
|
||||||
pub fn new(chain_type: ChainTypes, start: bool) -> Self {
|
pub fn new(chain_type: ChainTypes, start: bool) -> Self {
|
||||||
let stop_state = Arc::new(StopState::new());
|
let state = Arc::new(NodeState::new(chain_type));
|
||||||
let mut state = NodeState::new(chain_type, stop_state.clone());
|
|
||||||
let node_state = Arc::new(Mutex::new(state));
|
|
||||||
if start {
|
if start {
|
||||||
let server = start_server(&chain_type, stop_state);
|
Self::start_server(state.clone(), chain_type);
|
||||||
start_server_thread(node_state.clone(), server);
|
|
||||||
} else {
|
|
||||||
stop_state.stop();
|
|
||||||
}
|
}
|
||||||
Self { node_state }
|
Self { state }
|
||||||
}
|
|
||||||
|
|
||||||
/// Acquire node state to be used by a current thread
|
|
||||||
pub fn acquire_state(&self) -> MutexGuard<'_, NodeState> {
|
|
||||||
self.node_state.lock().unwrap()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Stop server
|
/// Stop server
|
||||||
pub fn stop(&self) {
|
pub fn stop(&self) {
|
||||||
self.acquire_state().stop_needed = true;
|
self.state.stop_needed.store(true, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Start server with provided chain type
|
/// Start server with provided chain type
|
||||||
pub fn start(&self, chain_type: ChainTypes) {
|
pub fn start(&self, chain_type: ChainTypes) {
|
||||||
let mut state = self.node_state.lock().unwrap();
|
if !self.state.is_restarting() && !self.state.is_running() {
|
||||||
if state.stop_state.is_stopped() {
|
Self::start_server(self.state.clone(), chain_type);
|
||||||
self.start_with_acquired_state(state, chain_type);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn start_server(state: Arc<NodeState>, chain_type: ChainTypes) {
|
||||||
|
let server = start_server(&chain_type);
|
||||||
|
start_server_thread(state, server);
|
||||||
|
}
|
||||||
|
|
||||||
/// Restart server with provided chain type
|
/// Restart server with provided chain type
|
||||||
pub fn restart(&mut self, chain_type: ChainTypes) {
|
pub fn restart(&mut self, chain_type: ChainTypes) {
|
||||||
let mut state = self.acquire_state();
|
if self.state.is_running() {
|
||||||
if !state.stop_state.is_stopped() {
|
self.state.restart_needed.store(true, Ordering::Relaxed);
|
||||||
state.chain_type = chain_type;
|
|
||||||
state.restart_needed = true;
|
|
||||||
} else {
|
} else {
|
||||||
self.start_with_acquired_state(state, chain_type);
|
self.start(chain_type);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Start server with provided acquired state
|
|
||||||
fn start_with_acquired_state(&self, mut state: MutexGuard<NodeState>, chain_type: ChainTypes) {
|
|
||||||
state.chain_type = chain_type;
|
|
||||||
state.stop_state = Arc::new(StopState::new());
|
|
||||||
|
|
||||||
let server = start_server(&chain_type, state.stop_state.clone());
|
|
||||||
start_server_thread(self.node_state.clone(), server);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct NodeState {
|
pub struct NodeState {
|
||||||
/// To check server state
|
/// Data for UI, None means server is not running
|
||||||
stop_state: Arc<StopState>,
|
stats: Arc<RwLock<Option<ServerStats>>>,
|
||||||
/// Data for UI, None means server is not started
|
|
||||||
pub(crate) stats: Option<ServerStats>,
|
|
||||||
/// Chain type of launched server
|
/// Chain type of launched server
|
||||||
chain_type: ChainTypes,
|
chain_type: Arc<ChainTypes>,
|
||||||
/// Thread flag to stop the server and start it again
|
/// Thread flag to stop the server and start it again
|
||||||
restart_needed: bool,
|
restart_needed: AtomicBool,
|
||||||
/// Thread flag to stop the server
|
/// Thread flag to stop the server
|
||||||
stop_needed: bool,
|
stop_needed: AtomicBool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl NodeState {
|
impl NodeState {
|
||||||
/// Instantiate new node state with provided chain type and server state
|
/// Instantiate new node state with provided chain type and server state
|
||||||
pub fn new(chain_type: ChainTypes, stop_state: Arc<StopState>) -> Self {
|
pub fn new(chain_type: ChainTypes) -> Self {
|
||||||
Self {
|
Self {
|
||||||
stop_state,
|
stats: Arc::new(RwLock::new(None)),
|
||||||
stats: None,
|
chain_type: Arc::new(chain_type),
|
||||||
chain_type,
|
restart_needed: AtomicBool::new(false),
|
||||||
restart_needed: false,
|
stop_needed: AtomicBool::new(false),
|
||||||
stop_needed: false,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Check if server is stopping at separate thread
|
/// Check if server is stopping
|
||||||
pub fn is_stopping(&self) -> bool {
|
pub fn is_stopping(&self) -> bool {
|
||||||
return self.stop_needed
|
return self.stop_needed.load(Ordering::Relaxed)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Check if server is restarting at separate thread
|
/// Check if server is restarting
|
||||||
pub fn is_restarting(&self) -> bool {
|
pub fn is_restarting(&self) -> bool {
|
||||||
return self.restart_needed
|
return self.restart_needed.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_running(&self) -> bool {
|
||||||
|
self.read_stats().is_some()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn read_stats(&self) -> RwLockReadGuard<'_, Option<ServerStats>> {
|
||||||
|
self.stats.read().unwrap()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Start server with provided chain type and node state
|
/// Start server with provided chain type
|
||||||
fn start_server(chain_type: &ChainTypes, stop_state: Arc<StopState>) -> Server {
|
fn start_server(chain_type: &ChainTypes) -> Server {
|
||||||
let mut node_config_result = config::initial_setup_server(chain_type);
|
let mut node_config_result = config::initial_setup_server(chain_type);
|
||||||
if node_config_result.is_err() {
|
if node_config_result.is_err() {
|
||||||
// Remove config file on init error
|
// Remove config file on init error
|
||||||
|
@ -146,6 +134,7 @@ fn start_server(chain_type: &ChainTypes, stop_state: Arc<StopState>) -> Server {
|
||||||
let config = node_config.clone().unwrap();
|
let config = node_config.clone().unwrap();
|
||||||
let server_config = config.members.as_ref().unwrap().server.clone();
|
let server_config = config.members.as_ref().unwrap().server.clone();
|
||||||
|
|
||||||
|
// Remove lock file (in case if we have running node from another app)
|
||||||
let mut db_path = PathBuf::from(&server_config.db_root);
|
let mut db_path = PathBuf::from(&server_config.db_root);
|
||||||
db_path.push("grin.lock");
|
db_path.push("grin.lock");
|
||||||
fs::remove_file(db_path).unwrap();
|
fs::remove_file(db_path).unwrap();
|
||||||
|
@ -188,7 +177,7 @@ fn start_server(chain_type: &ChainTypes, stop_state: Arc<StopState>) -> Server {
|
||||||
|
|
||||||
let api_chan: &'static mut (oneshot::Sender<()>, oneshot::Receiver<()>) =
|
let api_chan: &'static mut (oneshot::Sender<()>, oneshot::Receiver<()>) =
|
||||||
Box::leak(Box::new(oneshot::channel::<()>()));
|
Box::leak(Box::new(oneshot::channel::<()>()));
|
||||||
let mut server_result = Server::new(server_config.clone(), Some(stop_state.clone()), api_chan);
|
let mut server_result = Server::new(server_config.clone(), None, api_chan);
|
||||||
if server_result.is_err() {
|
if server_result.is_err() {
|
||||||
let mut db_path = PathBuf::from(&server_config.db_root);
|
let mut db_path = PathBuf::from(&server_config.db_root);
|
||||||
db_path.push("grin.lock");
|
db_path.push("grin.lock");
|
||||||
|
@ -207,35 +196,37 @@ fn start_server(chain_type: &ChainTypes, stop_state: Arc<StopState>) -> Server {
|
||||||
let server_config = config.members.as_ref().unwrap().server.clone();
|
let server_config = config.members.as_ref().unwrap().server.clone();
|
||||||
let api_chan: &'static mut (oneshot::Sender<()>, oneshot::Receiver<()>) =
|
let api_chan: &'static mut (oneshot::Sender<()>, oneshot::Receiver<()>) =
|
||||||
Box::leak(Box::new(oneshot::channel::<()>()));
|
Box::leak(Box::new(oneshot::channel::<()>()));
|
||||||
server_result = Server::new(server_config.clone(), Some(stop_state.clone()), api_chan);
|
server_result = Server::new(server_config.clone(), None, api_chan);
|
||||||
}
|
}
|
||||||
|
|
||||||
server_result.unwrap()
|
server_result.unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Start a thread to launch server and update node state with server stats
|
/// Start a thread to launch server and update node state with server stats
|
||||||
fn start_server_thread(node_state: Arc<Mutex<NodeState>>, mut server: Server) -> JoinHandle<()> {
|
fn start_server_thread(state: Arc<NodeState>, mut server: Server) -> JoinHandle<()> {
|
||||||
thread::spawn(move || loop {
|
thread::spawn(move || loop {
|
||||||
thread::sleep(Duration::from_millis(500));
|
thread::sleep(Duration::from_millis(500));
|
||||||
let mut state = node_state.lock().unwrap();
|
|
||||||
if state.restart_needed {
|
if state.is_restarting() {
|
||||||
server.stop();
|
server.stop();
|
||||||
|
|
||||||
// Create new server with new stop state
|
// Create new server with current chain type
|
||||||
state.stop_state = Arc::new(StopState::new());
|
server = start_server(&state.chain_type);
|
||||||
server = start_server(&state.chain_type, state.stop_state.clone());
|
|
||||||
|
|
||||||
state.restart_needed = false;
|
state.restart_needed.store(false, Ordering::Relaxed);
|
||||||
} else if state.stop_needed {
|
} else if state.is_stopping() {
|
||||||
server.stop();
|
server.stop();
|
||||||
state.stats = None;
|
|
||||||
state.stop_needed = false;
|
let mut w_stats = state.stats.write().unwrap();
|
||||||
|
*w_stats = None;
|
||||||
|
|
||||||
|
state.stop_needed.store(false, Ordering::Relaxed);
|
||||||
break;
|
break;
|
||||||
}
|
} else {
|
||||||
if !state.stop_state.is_stopped() {
|
|
||||||
let stats = server.get_server_stats();
|
let stats = server.get_server_stats();
|
||||||
if stats.is_ok() {
|
if stats.is_ok() {
|
||||||
state.stats = Some(stats.as_ref().ok().unwrap().clone());
|
let mut w_stats = state.stats.write().unwrap();
|
||||||
|
*w_stats = Some(stats.as_ref().ok().unwrap().clone());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in a new issue