Use global Tokio runtime (#543)

This commit is contained in:
jaspervdm 2020-12-15 21:09:19 +01:00 committed by GitHub
parent 92c5918e42
commit f4f4184283
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 27 additions and 34 deletions

2
Cargo.lock generated
View file

@ -1479,7 +1479,6 @@ dependencies = [
"blake2-rfc", "blake2-rfc",
"byteorder", "byteorder",
"chrono", "chrono",
"crossbeam-utils",
"data-encoding", "data-encoding",
"ed25519-dalek", "ed25519-dalek",
"failure", "failure",
@ -1493,6 +1492,7 @@ dependencies = [
"hyper-rustls", "hyper-rustls",
"hyper-socks2-mw", "hyper-socks2-mw",
"hyper-timeout", "hyper-timeout",
"lazy_static",
"log", "log",
"rand 0.6.5", "rand 0.6.5",
"regex", "regex",

View file

@ -23,7 +23,7 @@ ring = "0.16"
tokio = { version = "0.2", features = ["full"] } tokio = { version = "0.2", features = ["full"] }
uuid = { version = "0.8", features = ["serde", "v4"] } uuid = { version = "0.8", features = ["serde", "v4"] }
chrono = { version = "0.4.11", features = ["serde"] } chrono = { version = "0.4.11", features = ["serde"] }
crossbeam-utils = "0.7" lazy_static = "1.4"
#http client (copied from grin) #http client (copied from grin)
http = "0.2" http = "0.2"

View file

@ -15,19 +15,35 @@
//! High level JSON/HTTP client API //! High level JSON/HTTP client API
use crate::util::to_base64; use crate::util::to_base64;
use crossbeam_utils::thread::scope;
use failure::{Backtrace, Context, Fail, ResultExt}; use failure::{Backtrace, Context, Fail, ResultExt};
use hyper::body; use hyper::body;
use hyper::header::{ACCEPT, AUTHORIZATION, CONTENT_TYPE, USER_AGENT}; use hyper::header::{ACCEPT, AUTHORIZATION, CONTENT_TYPE, USER_AGENT};
use hyper::{self, Body, Client as HyperClient, Request, Uri}; use hyper::{self, Body, Client as HyperClient, Request, Uri};
use hyper_rustls; use hyper_rustls;
use hyper_timeout::TimeoutConnector; use hyper_timeout::TimeoutConnector;
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json; use serde_json;
use std::fmt::{self, Display}; use std::fmt::{self, Display};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::time::Duration; use std::time::Duration;
use tokio::runtime::Builder; use tokio::runtime::{Builder, Runtime};
// Global Tokio runtime.
// Needs a `Mutex` because `Runtime::block_on` requires mutable access.
// Tokio v0.3 requires immutable self, but we are waiting on upstream
// updates before we can upgrade.
// See: https://github.com/seanmonstar/reqwest/pull/1076
lazy_static! {
pub static ref RUNTIME: Arc<Mutex<Runtime>> = Arc::new(Mutex::new(
Builder::new()
.threaded_scheduler()
.enable_all()
.build()
.unwrap()
));
}
/// Errors that can be returned by an ApiEndpoint implementation. /// Errors that can be returned by an ApiEndpoint implementation.
#[derive(Debug)] #[derive(Debug)]
@ -323,18 +339,9 @@ impl Client {
} }
pub fn send_request(&self, req: Request<Body>) -> Result<String, Error> { pub fn send_request(&self, req: Request<Body>) -> Result<String, Error> {
let task = self.send_request_async(req); RUNTIME
scope(|s| { .lock()
let handle = s.spawn(|_| {
let mut rt = Builder::new()
.basic_scheduler()
.enable_all()
.build()
.context(ErrorKind::Internal("can't create Tokio runtime".to_owned()))?;
rt.block_on(task)
});
handle.join().unwrap()
})
.unwrap() .unwrap()
.block_on(self.send_request_async(req))
} }
} }

View file

@ -15,4 +15,4 @@
mod client; mod client;
pub mod json_rpc; pub mod json_rpc;
pub use client::{Client, Error as ClientError}; pub use client::{Client, Error as ClientError, RUNTIME};

View file

@ -17,14 +17,12 @@
use crate::api::{self, LocatedTxKernel, OutputListing, OutputPrintable}; use crate::api::{self, LocatedTxKernel, OutputListing, OutputPrintable};
use crate::core::core::{Transaction, TxKernel}; use crate::core::core::{Transaction, TxKernel};
use crate::libwallet::{NodeClient, NodeVersionInfo}; use crate::libwallet::{NodeClient, NodeVersionInfo};
use crossbeam_utils::thread::scope;
use futures::stream::FuturesUnordered; use futures::stream::FuturesUnordered;
use futures::TryStreamExt; use futures::TryStreamExt;
use std::collections::HashMap; use std::collections::HashMap;
use std::env; use std::env;
use tokio::runtime::Builder;
use crate::client_utils::Client; use crate::client_utils::{Client, RUNTIME};
use crate::libwallet; use crate::libwallet;
use crate::util::secp::pedersen; use crate::util::secp::pedersen;
use crate::util::ToHex; use crate::util::ToHex;
@ -251,19 +249,7 @@ impl NodeClient for HTTPNodeClient {
task.try_collect().await task.try_collect().await
}; };
let res = scope(|s| { let res: Result<Vec<Response>, _> = RUNTIME.lock().unwrap().block_on(task);
let handle = s.spawn(|_| {
let mut rt = Builder::new()
.threaded_scheduler()
.enable_all()
.build()
.unwrap();
let res: Result<Vec<Response>, _> = rt.block_on(task);
res
});
handle.join().unwrap()
})
.unwrap();
let results: Vec<OutputPrintable> = match res { let results: Vec<OutputPrintable> = match res {
Ok(resps) => { Ok(resps) => {