diff --git a/Cargo.lock b/Cargo.lock index c2c433a27..6c98bd9ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -768,9 +768,9 @@ checksum = "1b980f2816d6ee8673b6517b52cb0e808a180efc92e5c19d02cdda79066703ef" [[package]] name = "futures" -version = "0.3.5" +version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e05b85ec287aac0dc34db7d4a569323df697f9c55b99b15d6b4ef8cde49f613" +checksum = "28560757fe2bb34e79f907794bb6b22ae8b0e5c669b638a1132f2592b19035b4" dependencies = [ "futures-channel", "futures-core", @@ -783,9 +783,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.5" +version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f366ad74c28cca6ba456d95e6422883cfb4b252a83bed929c83abfdbbf2967d5" +checksum = "ba3dda0b6588335f360afc675d0564c17a77a2bda81ca178a4b6081bd86c7f0b" dependencies = [ "futures-core", "futures-sink", @@ -793,15 +793,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.5" +version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59f5fff90fd5d971f936ad674802482ba441b6f09ba5e15fd8b39145582ca399" +checksum = "d0c8ff0461b82559810cdccfde3215c3f373807f5e5232b71479bff7bb2583d7" [[package]] name = "futures-executor" -version = "0.3.5" +version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10d6bb888be1153d3abeb9006b11b02cf5e9b209fda28693c31ae1e4e012e314" +checksum = "29d6d2ff5bb10fb95c85b8ce46538a2e5f5e7fdc755623a7d4529ab8a4ed9d2a" dependencies = [ "futures-core", "futures-task", @@ -810,17 +810,16 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.5" +version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de27142b013a8e869c14957e6d2edeef89e97c289e69d042ee3a49acd8b51789" +checksum = "b1f9d34af5a1aac6fb380f735fe510746c38067c5bf16c7fd250280503c971b2" [[package]] name = "futures-macro" -version = "0.3.5" +version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0b5a30a4328ab5473878237c447333c093297bded83a4983d10f4deea240d39" +checksum = "6dbd947adfffb0efc70599b3ddcf7b5597bb5fa9e245eb99f62b3a5f7bb8bd3c" dependencies = [ - "proc-macro-hack", "proc-macro2 1.0.24", "quote 1.0.7", "syn 1.0.60", @@ -828,24 +827,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.5" +version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f2032893cb734c7a05d85ce0cc8b8c4075278e93b24b66f9de99d6eb0fa8acc" +checksum = "e3055baccb68d74ff6480350f8d6eb8fcfa3aa11bdc1a1ae3afdd0514617d508" [[package]] name = "futures-task" -version = "0.3.5" +version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bdb66b5f09e22019b1ab0830f7785bcea8e7a42148683f99214f73f8ec21a626" -dependencies = [ - "once_cell", -] +checksum = "6ee7c6485c30167ce4dfb83ac568a849fe53274c831081476ee13e0dce1aad72" [[package]] name = "futures-util" -version = "0.3.5" +version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8764574ff08b701a084482c3c7031349104b07ac897393010494beaa18ce32c6" +checksum = "d9b5cf40b47a271f77a8b1bec03ca09044d99d2372c0de244e66430761127164" dependencies = [ "futures-channel", "futures-core", @@ -854,10 +850,8 @@ dependencies = [ "futures-sink", "futures-task", "memchr", - "pin-project 0.4.20", + "pin-project-lite 0.2.7", "pin-utils", - "proc-macro-hack", - "proc-macro-nested", "slab", ] @@ -937,6 +931,7 @@ dependencies = [ "cursive_table_view", "failure", "failure_derive", + "futures 0.3.19", "grin_api", "grin_chain", "grin_config", @@ -961,7 +956,7 @@ dependencies = [ "easy-jsonrpc-mw", "failure", "failure_derive", - "futures 0.3.5", + "futures 0.3.19", "grin_chain", "grin_core", "grin_p2p", @@ -1136,7 +1131,7 @@ version = "5.2.0-alpha.1" dependencies = [ "chrono", "fs2", - "futures 0.3.5", + "futures 0.3.19", "grin_api", "grin_chain", "grin_core", @@ -1319,7 +1314,7 @@ dependencies = [ "httparse", "httpdate", "itoa", - "pin-project 1.0.2", + "pin-project", "socket2", "tokio", "tower-service", @@ -1999,33 +1994,13 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" -[[package]] -name = "pin-project" -version = "0.4.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e75373ff9037d112bb19bc61333a06a159eaeb217660dcfbea7d88e1db823919" -dependencies = [ - "pin-project-internal 0.4.20", -] - [[package]] name = "pin-project" version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ccc2237c2c489783abd8c4c80e5450fc0e98644555b1364da68cc29aa151ca7" dependencies = [ - "pin-project-internal 1.0.2", -] - -[[package]] -name = "pin-project-internal" -version = "0.4.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10b4b44893d3c370407a1d6a5cfde7c41ae0478e31c516c85f67eb3adc51be6d" -dependencies = [ - "proc-macro2 1.0.24", - "quote 1.0.7", - "syn 1.0.60", + "pin-project-internal", ] [[package]] @@ -2047,9 +2022,9 @@ checksum = "282adbf10f2698a7a77f8e983a74b2d18176c19a7fd32a45446139ae7b02b715" [[package]] name = "pin-project-lite" -version = "0.2.0" +version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b063f57ec186e6140e2b8b6921e5f1bd89c7356dda5b33acc5401203ca6131c" +checksum = "8d31d11c69a6b52a174b42bdc0c30e5e11670f90788b2c471c31c1d17d449443" [[package]] name = "pin-utils" @@ -2081,18 +2056,6 @@ dependencies = [ "output_vt100", ] -[[package]] -name = "proc-macro-hack" -version = "0.5.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e0456befd48169b9f13ef0f0ad46d492cf9d2dbb918bcf38e01eed4ce3ec5e4" - -[[package]] -name = "proc-macro-nested" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0afe1bd463b9e9ed51d0e0f0b50b6b146aec855c56fd182bb242388710a9b6de" - [[package]] name = "proc-macro2" version = "0.4.30" @@ -2928,7 +2891,7 @@ checksum = "9f47026cdc4080c07e49b37087de021820269d996f581aac150ef9e5583eefe3" dependencies = [ "cfg-if 1.0.0", "log", - "pin-project-lite 0.2.0", + "pin-project-lite 0.2.7", "tracing-core", ] diff --git a/Cargo.toml b/Cargo.toml index 1ff7566cd..06d9b42ee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ ctrlc = { version = "3.1", features = ["termination"] } cursive_table_view = "0.13.2" humansize = "1.1.0" serde = "1" +futures = "0.3.19" serde_json = "1" log = "0.4" term = "0.6" diff --git a/api/src/handlers.rs b/api/src/handlers.rs index 9f0b6ac4a..c020f4746 100644 --- a/api/src/handlers.rs +++ b/api/src/handlers.rs @@ -38,12 +38,15 @@ use crate::router::ResponseFuture; use crate::router::Router; use crate::util::to_base64; use crate::util::RwLock; +use crate::util::StopState; use crate::web::*; use easy_jsonrpc_mw::{Handler, MaybeReply}; +use futures::channel::oneshot; use hyper::{Body, Request, Response, StatusCode}; use serde::Serialize; use std::net::SocketAddr; use std::sync::{Arc, Weak}; +use std::thread; /// Listener version, providing same API but listening for requests on a /// port and wrapping the calls @@ -56,6 +59,8 @@ pub fn node_apis( api_secret: Option, foreign_api_secret: Option, tls_config: Option, + api_chan: &'static mut (oneshot::Sender<()>, oneshot::Receiver<()>), + stop_state: Arc, ) -> Result<(), Error> where B: BlockChain + 'static, @@ -104,10 +109,24 @@ where let mut apis = ApiServer::new(); warn!("Starting HTTP Node APIs server at {}.", addr); let socket_addr: SocketAddr = addr.parse().expect("unable to parse socket address"); - let api_thread = apis.start(socket_addr, router, tls_config); + let api_thread = apis.start(socket_addr, router, tls_config, api_chan); warn!("HTTP Node listener started."); + thread::Builder::new() + .name("api_monitor".to_string()) + .spawn(move || { + // monitor for stop state is_stopped + loop { + std::thread::sleep(std::time::Duration::from_millis(100)); + if stop_state.is_stopped() { + apis.stop(); + break; + } + } + }) + .ok(); + match api_thread { Ok(_) => Ok(()), Err(e) => { diff --git a/api/src/rest.rs b/api/src/rest.rs index 7139cd442..f90d7e4b8 100644 --- a/api/src/rest.rs +++ b/api/src/rest.rs @@ -186,10 +186,11 @@ impl ApiServer { addr: SocketAddr, router: Router, conf: Option, + api_chan: &'static mut (oneshot::Sender<()>, oneshot::Receiver<()>), ) -> Result, Error> { match conf { - Some(conf) => self.start_tls(addr, router, conf), - None => self.start_no_tls(addr, router), + Some(conf) => self.start_tls(addr, router, conf, api_chan), + None => self.start_no_tls(addr, router, api_chan), } } @@ -198,6 +199,7 @@ impl ApiServer { &mut self, addr: SocketAddr, router: Router, + api_chan: &'static mut (oneshot::Sender<()>, oneshot::Receiver<()>), ) -> Result, Error> { if self.shutdown_sender.is_some() { return Err(ErrorKind::Internal( @@ -205,18 +207,26 @@ impl ApiServer { ) .into()); } - let (tx, _rx) = oneshot::channel::<()>(); + let rx = &mut api_chan.1; + let tx = &mut api_chan.0; + + // Jones's trick to update memory + let m = oneshot::channel::<()>(); + let tx = std::mem::replace(tx, m.0); self.shutdown_sender = Some(tx); + thread::Builder::new() .name("apis".to_string()) .spawn(move || { let server = async move { - let server = Server::bind(&addr).serve(make_service_fn(move |_| { - let router = router.clone(); - async move { Ok::<_, Infallible>(router) } - })); - // TODO graceful shutdown is unstable, investigate - //.with_graceful_shutdown(rx) + let server = Server::bind(&addr) + .serve(make_service_fn(move |_| { + let router = router.clone(); + async move { Ok::<_, Infallible>(router) } + })) + .with_graceful_shutdown(async { + rx.await.ok(); + }); server.await }; @@ -238,6 +248,7 @@ impl ApiServer { addr: SocketAddr, router: Router, conf: TLSConfig, + api_chan: &'static mut (oneshot::Sender<()>, oneshot::Receiver<()>), ) -> Result, Error> { if self.shutdown_sender.is_some() { return Err(ErrorKind::Internal( @@ -246,6 +257,14 @@ impl ApiServer { .into()); } + let rx = &mut api_chan.1; + let tx = &mut api_chan.0; + + // Jones's trick to update memory + let m = oneshot::channel::<()>(); + let tx = std::mem::replace(tx, m.0); + self.shutdown_sender = Some(tx); + let acceptor = TlsAcceptor::from(conf.build_server_config()?); thread::Builder::new() @@ -258,12 +277,14 @@ impl ApiServer { .and_then(move |s| acceptor.accept(s)) .filter(|r| r.is_ok()); - let server = Server::builder(accept::from_stream(listener)).serve( - make_service_fn(move |_| { + let server = Server::builder(accept::from_stream(listener)) + .serve(make_service_fn(move |_| { let router = router.clone(); async move { Ok::<_, Infallible>(router) } - }), - ); + })) + .with_graceful_shutdown(async { + rx.await.ok(); + }); server.await }; @@ -281,9 +302,10 @@ impl ApiServer { /// Stops the API server, it panics in case of error pub fn stop(&mut self) -> bool { if self.shutdown_sender.is_some() { - // TODO re-enable stop after investigation - //let tx = mem::replace(&mut self.shutdown_sender, None).unwrap(); - //tx.send(()).expect("Failed to stop API server"); + let tx = self.shutdown_sender.as_mut().unwrap(); + let m = oneshot::channel::<()>(); + let tx = std::mem::replace(tx, m.0); + tx.send(()).expect("Failed to stop API server"); info!("API server has been stopped"); true } else { diff --git a/api/tests/rest.rs b/api/tests/rest.rs index 4f583ab2b..1d88dcf2a 100644 --- a/api/tests/rest.rs +++ b/api/tests/rest.rs @@ -2,6 +2,7 @@ use grin_api as api; use grin_util as util; use crate::api::*; +use futures::channel::oneshot; use hyper::{Body, Request, StatusCode}; use std::net::SocketAddr; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -71,7 +72,9 @@ fn test_start_api() { router.add_middleware(counter.clone()); let server_addr = "127.0.0.1:14434"; let addr: SocketAddr = server_addr.parse().expect("unable to parse server address"); - assert!(server.start(addr, router, None).is_ok()); + let api_chan: &'static mut (oneshot::Sender<()>, oneshot::Receiver<()>) = + Box::leak(Box::new(oneshot::channel::<()>())); + assert!(server.start(addr, router, None, api_chan).is_ok()); let url = format!("http://{}/v1/", server_addr); let index = request_with_retry(url.as_str()).unwrap(); assert_eq!(index.len(), 2); @@ -96,7 +99,9 @@ fn test_start_api_tls() { let router = build_router(); let server_addr = "0.0.0.0:14444"; let addr: SocketAddr = server_addr.parse().expect("unable to parse server address"); - assert!(server.start(addr, router, Some(tls_conf)).is_ok()); + let api_chan: &'static mut (oneshot::Sender<()>, oneshot::Receiver<()>) = + Box::leak(Box::new(oneshot::channel::<()>())); + assert!(server.start(addr, router, Some(tls_conf), api_chan).is_ok()); let index = request_with_retry("https://yourdomain.com:14444/v1/").unwrap(); assert_eq!(index.len(), 2); assert!(!server.stop()); diff --git a/servers/src/grin/server.rs b/servers/src/grin/server.rs index c66190776..89ba2a8da 100644 --- a/servers/src/grin/server.rs +++ b/servers/src/grin/server.rs @@ -51,6 +51,7 @@ use crate::p2p::types::{Capabilities, PeerAddr}; use crate::pool; use crate::util::file::get_first_line; use crate::util::{RwLock, StopState}; +use futures::channel::oneshot; use grin_util::logger::LogEntry; /// Arcified thread-safe TransactionPool with type parameters used by server components @@ -87,6 +88,8 @@ impl Server { config: ServerConfig, logs_rx: Option>, mut info_callback: F, + stop_state: Option>, + api_chan: &'static mut (oneshot::Sender<()>, oneshot::Receiver<()>), ) -> Result<(), Error> where F: FnMut(Server, Option>), @@ -94,7 +97,7 @@ impl Server { let mining_config = config.stratum_mining_config.clone(); let enable_test_miner = config.run_test_miner; let test_miner_wallet_url = config.test_miner_wallet_url.clone(); - let serv = Server::new(config)?; + let serv = Server::new(config, stop_state, api_chan)?; if let Some(c) = mining_config { let enable_stratum_server = c.enable_stratum_server; @@ -145,7 +148,11 @@ impl Server { } /// Instantiates a new server associated with the provided future reactor. - pub fn new(config: ServerConfig) -> Result { + pub fn new( + config: ServerConfig, + stop_state: Option>, + api_chan: &'static mut (oneshot::Sender<()>, oneshot::Receiver<()>), + ) -> Result { // Obtain our lock_file or fail immediately with an error. let lock_file = Server::one_grin_at_a_time(&config)?; @@ -156,7 +163,11 @@ impl Server { Some(b) => b, }; - let stop_state = Arc::new(StopState::new()); + let stop_state = if stop_state.is_some() { + stop_state.unwrap() + } else { + Arc::new(StopState::new()) + }; let pool_adapter = Arc::new(PoolToChainAdapter::new()); let pool_net_adapter = Arc::new(PoolToNetAdapter::new(config.dandelion_config.clone())); @@ -289,7 +300,6 @@ impl Server { } }; - // TODO fix API shutdown and join this thread api::node_apis( &config.api_http_addr, shared_chain.clone(), @@ -299,6 +309,8 @@ impl Server { api_secret, foreign_api_secret, tls_conf, + api_chan, + stop_state.clone(), )?; info!("Starting dandelion monitor: {}", &config.api_http_addr); diff --git a/src/bin/cmd/server.rs b/src/bin/cmd/server.rs index 233e4f4af..6eb60f9f7 100644 --- a/src/bin/cmd/server.rs +++ b/src/bin/cmd/server.rs @@ -25,21 +25,27 @@ use crate::config::GlobalConfig; use crate::p2p::Seeding; use crate::servers; use crate::tui::ui; +use futures::channel::oneshot; use grin_p2p::msg::PeerAddrs; use grin_p2p::PeerAddr; use grin_util::logger::LogEntry; use std::sync::mpsc; /// wrap below to allow UI to clean up on stop -pub fn start_server(config: servers::ServerConfig, logs_rx: Option>) { - start_server_tui(config, logs_rx); - // Just kill process for now, otherwise the process - // hangs around until sigint because the API server - // currently has no shutdown facility +pub fn start_server( + config: servers::ServerConfig, + logs_rx: Option>, + api_chan: &'static mut (oneshot::Sender<()>, oneshot::Receiver<()>), +) { + start_server_tui(config, logs_rx, api_chan); exit(0); } -fn start_server_tui(config: servers::ServerConfig, logs_rx: Option>) { +fn start_server_tui( + config: servers::ServerConfig, + logs_rx: Option>, + api_chan: &'static mut (oneshot::Sender<()>, oneshot::Receiver<()>), +) { // Run the UI controller.. here for now for simplicity to access // everything it might need if config.run_tui.unwrap_or(false) { @@ -53,6 +59,8 @@ fn start_server_tui(config: servers::ServerConfig, logs_rx: Option>, global_config: GlobalConfig, logs_rx: Option>, + api_chan: &'static mut (oneshot::Sender<()>, oneshot::Receiver<()>), ) -> i32 { // just get defaults from the global config let mut server_config = global_config.members.as_ref().unwrap().server.clone(); @@ -118,7 +129,7 @@ pub fn server_command( if let Some(a) = server_args { match a.subcommand() { ("run", _) => { - start_server(server_config, logs_rx); + start_server(server_config, logs_rx, api_chan); } ("", _) => { println!("Subcommand required, use 'grin help server' for details"); @@ -132,7 +143,7 @@ pub fn server_command( } } } else { - start_server(server_config, logs_rx); + start_server(server_config, logs_rx, api_chan); } 0 } diff --git a/src/bin/grin.rs b/src/bin/grin.rs index 84ad6d58b..e50afbb7c 100644 --- a/src/bin/grin.rs +++ b/src/bin/grin.rs @@ -23,6 +23,7 @@ use crate::config::config::SERVER_CONFIG_FILE_NAME; use crate::core::global; use crate::util::init_logger; use clap::App; +use futures::channel::oneshot; use grin_api as api; use grin_chain as chain; use grin_config as config; @@ -127,6 +128,9 @@ fn real_main() -> i32 { let mut logging_config = config.members.as_ref().unwrap().logging.clone().unwrap(); logging_config.tui_running = config.members.as_ref().unwrap().server.run_tui; + let api_chan: &'static mut (oneshot::Sender<()>, oneshot::Receiver<()>) = + Box::leak(Box::new(oneshot::channel::<()>())); + let (logs_tx, logs_rx) = if logging_config.tui_running.unwrap() { let (logs_tx, logs_rx) = mpsc::sync_channel::(200); (Some(logs_tx), Some(logs_rx)) @@ -177,7 +181,7 @@ fn real_main() -> i32 { match args.subcommand() { // server commands and options ("server", Some(server_args)) => { - cmd::server_command(Some(server_args), node_config.unwrap(), logs_rx) + cmd::server_command(Some(server_args), node_config.unwrap(), logs_rx, api_chan) } // client commands and options @@ -196,6 +200,6 @@ fn real_main() -> i32 { // If nothing is specified, try to just use the config file instead // this could possibly become the way to configure most things // with most command line options being phased out - _ => cmd::server_command(None, node_config.unwrap(), logs_rx), + _ => cmd::server_command(None, node_config.unwrap(), logs_rx, api_chan), } }