mirror of
https://github.com/mimblewimble/grin.git
synced 2025-03-13 20:41:08 +03:00
Upgrade Tokio to v1.x and Hyper to v0.14 (#3804)
* update of tokio and related dependencies to 1.x * update to hyper 0.14 * fixes to http connector for tests
This commit is contained in:
parent
b93d88b58c
commit
8e79856168
12 changed files with 324 additions and 461 deletions
551
Cargo.lock
generated
551
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
|
@ -11,7 +11,7 @@ edition = "2018"
|
|||
|
||||
[dependencies]
|
||||
easy-jsonrpc-mw = "0.5.4"
|
||||
hyper = "0.13"
|
||||
hyper = { version = "0.14", features = ["full"] }
|
||||
lazy_static = "1"
|
||||
regex = "1"
|
||||
ring = "0.16"
|
||||
|
@ -20,15 +20,17 @@ serde_derive = "1"
|
|||
serde_json = "1"
|
||||
thiserror = "1"
|
||||
log = "0.4"
|
||||
tokio = { version = "0.2", features = ["full"] }
|
||||
tokio-rustls = "0.13"
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
tokio-rustls = "0.23"
|
||||
http = "0.2"
|
||||
hyper-rustls = "0.20"
|
||||
hyper-timeout = "0.3"
|
||||
hyper-rustls = "0.23"
|
||||
hyper-timeout = "0.4"
|
||||
futures = "0.3"
|
||||
rustls = "0.17"
|
||||
rustls = "0.20"
|
||||
rustls-pemfile = "1.0"
|
||||
async-stream = "0.3"
|
||||
url = "2.1"
|
||||
bytes = "0.5"
|
||||
bytes = "1"
|
||||
|
||||
grin_core = { path = "../core", version = "5.4.0-alpha.0" }
|
||||
grin_chain = { path = "../chain", version = "5.4.0-alpha.0" }
|
||||
|
|
|
@ -216,7 +216,12 @@ where
|
|||
}
|
||||
|
||||
async fn send_request_async(req: Request<Body>, timeout: TimeOut) -> Result<String, Error> {
|
||||
let https = hyper_rustls::HttpsConnector::new();
|
||||
let https = hyper_rustls::HttpsConnectorBuilder::new()
|
||||
.with_native_roots()
|
||||
.https_or_http()
|
||||
.enable_http1()
|
||||
.build();
|
||||
|
||||
let (connect, read, write) = (
|
||||
Some(timeout.connect),
|
||||
Some(timeout.read),
|
||||
|
@ -242,7 +247,7 @@ async fn send_request_async(req: Request<Body>, timeout: TimeOut) -> Result<Stri
|
|||
.into());
|
||||
}
|
||||
|
||||
let raw = body::to_bytes(resp)
|
||||
let raw = body::to_bytes(resp.into_body())
|
||||
.await
|
||||
.map_err(|e| Error::RequestError(format!("Cannot read response body: {}", e)))?;
|
||||
|
||||
|
@ -250,8 +255,7 @@ async fn send_request_async(req: Request<Body>, timeout: TimeOut) -> Result<Stri
|
|||
}
|
||||
|
||||
pub fn send_request(req: Request<Body>, timeout: TimeOut) -> Result<String, Error> {
|
||||
let mut rt = Builder::new()
|
||||
.basic_scheduler()
|
||||
let rt = Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.map_err(|e| Error::RequestError(format!("{}", e)))?;
|
||||
|
|
|
@ -97,7 +97,7 @@ pub enum Error {
|
|||
/// Json error
|
||||
Json(serde_json::Error),
|
||||
/// Client error
|
||||
Hyper(hyper::error::Error),
|
||||
Hyper(hyper::Error),
|
||||
/// Error response
|
||||
Rpc(RpcError),
|
||||
/// Response to a request did not have the expected nonce
|
||||
|
@ -120,8 +120,8 @@ impl From<serde_json::Error> for Error {
|
|||
}
|
||||
}
|
||||
|
||||
impl From<hyper::error::Error> for Error {
|
||||
fn from(e: hyper::error::Error) -> Error {
|
||||
impl From<hyper::Error> for Error {
|
||||
fn from(e: hyper::Error) -> Error {
|
||||
Error::Hyper(e)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,11 +21,10 @@
|
|||
use crate::router::{Handler, HandlerObj, ResponseFuture, Router, RouterError};
|
||||
use crate::web::response;
|
||||
use futures::channel::oneshot;
|
||||
use futures::TryStreamExt;
|
||||
use hyper::server::accept;
|
||||
use hyper::service::make_service_fn;
|
||||
use hyper::{Body, Request, Server, StatusCode};
|
||||
use rustls::internal::pemfile;
|
||||
use rustls_pemfile as pemfile;
|
||||
use std::convert::Infallible;
|
||||
use std::fs::File;
|
||||
use std::net::SocketAddr;
|
||||
|
@ -33,7 +32,6 @@ use std::sync::Arc;
|
|||
use std::{io, thread};
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::runtime::Runtime;
|
||||
use tokio::stream::StreamExt;
|
||||
use tokio_rustls::TlsAcceptor;
|
||||
|
||||
/// Errors that can be returned by an ApiEndpoint implementation.
|
||||
|
@ -83,8 +81,10 @@ impl TLSConfig {
|
|||
})?;
|
||||
let mut reader = io::BufReader::new(certfile);
|
||||
|
||||
pemfile::certs(&mut reader)
|
||||
.map_err(|_| Error::Internal("failed to load certificate".to_string()))
|
||||
let certs = pemfile::certs(&mut reader)
|
||||
.map_err(|_| Error::Internal("failed to load certificate".to_string()))?;
|
||||
|
||||
Ok(certs.into_iter().map(rustls::Certificate).collect())
|
||||
}
|
||||
|
||||
fn load_private_key(&self) -> Result<rustls::PrivateKey, Error> {
|
||||
|
@ -97,15 +97,19 @@ impl TLSConfig {
|
|||
if keys.len() != 1 {
|
||||
return Err(Error::Internal("expected a single private key".to_string()));
|
||||
}
|
||||
Ok(keys[0].clone())
|
||||
Ok(rustls::PrivateKey(keys[0].clone()))
|
||||
}
|
||||
|
||||
pub fn build_server_config(&self) -> Result<Arc<rustls::ServerConfig>, Error> {
|
||||
let certs = self.load_certs()?;
|
||||
let key = self.load_private_key()?;
|
||||
let mut cfg = rustls::ServerConfig::new(rustls::NoClientAuth::new());
|
||||
cfg.set_single_cert(certs, key)
|
||||
|
||||
let cfg = rustls::ServerConfig::builder()
|
||||
.with_safe_defaults()
|
||||
.with_no_client_auth()
|
||||
.with_single_cert(certs, key)
|
||||
.map_err(|e| Error::Internal(format!("set single certificate failed {}", e)))?;
|
||||
|
||||
Ok(Arc::new(cfg))
|
||||
}
|
||||
}
|
||||
|
@ -175,7 +179,7 @@ impl ApiServer {
|
|||
server.await
|
||||
};
|
||||
|
||||
let mut rt = Runtime::new()
|
||||
let rt = Runtime::new()
|
||||
.map_err(|e| eprintln!("HTTP API server error: {}", e))
|
||||
.unwrap();
|
||||
if let Err(e) = rt.block_on(server) {
|
||||
|
@ -214,13 +218,26 @@ impl ApiServer {
|
|||
.name("apis".to_string())
|
||||
.spawn(move || {
|
||||
let server = async move {
|
||||
let mut listener = TcpListener::bind(&addr).await.expect("failed to bind");
|
||||
let listener = listener
|
||||
.incoming()
|
||||
.and_then(move |s| acceptor.accept(s))
|
||||
.filter(|r| r.is_ok());
|
||||
let listener = TcpListener::bind(&addr).await.expect("failed to bind");
|
||||
|
||||
let server = Server::builder(accept::from_stream(listener))
|
||||
let tls_stream = async_stream::stream! {
|
||||
loop {
|
||||
let (socket, _addr) = match listener.accept().await {
|
||||
Ok(conn) => conn,
|
||||
Err(e) => {
|
||||
eprintln!("Error accepting connection: {}", e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
match acceptor.accept(socket).await {
|
||||
Ok(stream) => yield Ok::<_, std::io::Error>(stream),
|
||||
Err(_) => continue,
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let server = Server::builder(accept::from_stream(tls_stream))
|
||||
.serve(make_service_fn(move |_| {
|
||||
let router = router.clone();
|
||||
async move { Ok::<_, Infallible>(router) }
|
||||
|
@ -232,7 +249,7 @@ impl ApiServer {
|
|||
server.await
|
||||
};
|
||||
|
||||
let mut rt = Runtime::new()
|
||||
let rt = Runtime::new()
|
||||
.map_err(|e| eprintln!("HTTP API server error: {}", e))
|
||||
.unwrap();
|
||||
if let Err(e) = rt.block_on(server) {
|
||||
|
|
|
@ -1,12 +1,12 @@
|
|||
use crate::rest::*;
|
||||
use crate::router::ResponseFuture;
|
||||
use bytes::Buf;
|
||||
use futures::future::ok;
|
||||
use hyper::body;
|
||||
use hyper::{Body, Request, Response, StatusCode};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::Debug;
|
||||
use std::io::Cursor;
|
||||
use url::form_urlencoded;
|
||||
|
||||
/// Parse request body
|
||||
|
@ -18,7 +18,8 @@ where
|
|||
.await
|
||||
.map_err(|e| Error::RequestError(format!("Failed to read request: {}", e)))?;
|
||||
|
||||
serde_json::from_reader(raw.bytes())
|
||||
let cursor = Cursor::new(raw);
|
||||
serde_json::from_reader(cursor)
|
||||
.map_err(|e| Error::RequestError(format!("Invalid request body: {}", e)))
|
||||
}
|
||||
|
||||
|
|
|
@ -188,7 +188,7 @@ where
|
|||
B: 'a + Backend<T>,
|
||||
{
|
||||
/// Build a new prunable Merkle Mountain Range using the provided backend.
|
||||
pub fn new(backend: &'a mut B) -> PMMR<'_, T, B> {
|
||||
pub fn new(backend: &'a mut B) -> PMMR<'a, T, B> {
|
||||
PMMR {
|
||||
backend,
|
||||
size: 0,
|
||||
|
@ -198,7 +198,7 @@ where
|
|||
|
||||
/// Build a new prunable Merkle Mountain Range pre-initialized until
|
||||
/// size with the provided backend.
|
||||
pub fn at(backend: &'a mut B, size: u64) -> PMMR<'_, T, B> {
|
||||
pub fn at(backend: &'a mut B, size: u64) -> PMMR<'a, T, B> {
|
||||
PMMR {
|
||||
backend,
|
||||
size,
|
||||
|
|
|
@ -41,7 +41,7 @@ where
|
|||
B: 'a + Backend<T>,
|
||||
{
|
||||
/// Build a new readonly PMMR.
|
||||
pub fn new(backend: &'a B) -> ReadonlyPMMR<'_, T, B> {
|
||||
pub fn new(backend: &'a B) -> ReadonlyPMMR<'a, T, B> {
|
||||
ReadonlyPMMR {
|
||||
backend,
|
||||
size: 0,
|
||||
|
@ -51,7 +51,7 @@ where
|
|||
|
||||
/// Build a new readonly PMMR pre-initialized to
|
||||
/// size with the provided backend.
|
||||
pub fn at(backend: &'a B, size: u64) -> ReadonlyPMMR<'_, T, B> {
|
||||
pub fn at(backend: &'a B, size: u64) -> ReadonlyPMMR<'a, T, B> {
|
||||
ReadonlyPMMR {
|
||||
backend,
|
||||
size,
|
||||
|
|
|
@ -40,7 +40,7 @@ where
|
|||
B: 'a + Backend<T>,
|
||||
{
|
||||
/// Build a new readonly PMMR.
|
||||
pub fn new(backend: &'a B) -> RewindablePMMR<'_, T, B> {
|
||||
pub fn new(backend: &'a B) -> RewindablePMMR<'a, T, B> {
|
||||
RewindablePMMR {
|
||||
backend,
|
||||
last_pos: 0,
|
||||
|
@ -50,7 +50,7 @@ where
|
|||
|
||||
/// Build a new readonly PMMR pre-initialized to
|
||||
/// last_pos with the provided backend.
|
||||
pub fn at(backend: &'a B, last_pos: u64) -> RewindablePMMR<'_, T, B> {
|
||||
pub fn at(backend: &'a B, last_pos: u64) -> RewindablePMMR<'a, T, B> {
|
||||
RewindablePMMR {
|
||||
backend,
|
||||
last_pos,
|
||||
|
|
|
@ -10,8 +10,8 @@ workspace = ".."
|
|||
edition = "2018"
|
||||
|
||||
[dependencies]
|
||||
hyper = "0.13"
|
||||
hyper-rustls = "0.20"
|
||||
hyper = { version = "0.14", features = ["full"] }
|
||||
hyper-rustls = "0.23"
|
||||
fs2 = "0.4"
|
||||
futures = "0.3"
|
||||
http = "0.2"
|
||||
|
@ -22,8 +22,10 @@ log = "0.4"
|
|||
serde_derive = "1"
|
||||
serde_json = "1"
|
||||
chrono = "0.4.11"
|
||||
tokio = {version = "0.2", features = ["full"] }
|
||||
tokio-util = { version = "0.2", features = ["codec"] }
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
tokio-util = { version = "0.7", features = ["codec"] }
|
||||
async-stream = "0.3"
|
||||
rustls = "0.20"
|
||||
walkdir = "2.3.1"
|
||||
|
||||
grin_api = { path = "../api", version = "5.4.0-alpha.0" }
|
||||
|
|
|
@ -220,7 +220,12 @@ impl WebHook {
|
|||
nthreads, timeout
|
||||
);
|
||||
|
||||
let https = HttpsConnector::new();
|
||||
let https = hyper_rustls::HttpsConnectorBuilder::new()
|
||||
.with_native_roots()
|
||||
.https_only()
|
||||
.enable_http1()
|
||||
.build();
|
||||
|
||||
let client = Client::builder()
|
||||
.pool_idle_timeout(keep_alive)
|
||||
.build::<_, hyper::Body>(https);
|
||||
|
@ -231,10 +236,9 @@ impl WebHook {
|
|||
header_received_url,
|
||||
block_accepted_url,
|
||||
client,
|
||||
runtime: Builder::new()
|
||||
.threaded_scheduler()
|
||||
runtime: Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
.core_threads(nthreads as usize)
|
||||
.worker_threads(nthreads as usize)
|
||||
.build()
|
||||
.unwrap(),
|
||||
}
|
||||
|
|
|
@ -602,64 +602,72 @@ impl Handler {
|
|||
fn accept_connections(listen_addr: SocketAddr, handler: Arc<Handler>) {
|
||||
info!("Start tokio stratum server");
|
||||
let task = async move {
|
||||
let mut listener = TcpListener::bind(&listen_addr).await.unwrap_or_else(|_| {
|
||||
let listener = TcpListener::bind(&listen_addr).await.unwrap_or_else(|_| {
|
||||
panic!("Stratum: Failed to bind to listen address {}", listen_addr)
|
||||
});
|
||||
let server = listener
|
||||
.incoming()
|
||||
.filter_map(|s| async { s.map_err(|e| error!("accept error = {:?}", e)).ok() })
|
||||
.for_each(move |socket| {
|
||||
let handler = handler.clone();
|
||||
async move {
|
||||
// Spawn a task to process the connection
|
||||
let (tx, mut rx) = mpsc::unbounded();
|
||||
|
||||
let worker_id = handler.workers.add_worker(tx);
|
||||
info!("Worker {} connected", worker_id);
|
||||
|
||||
let framed = Framed::new(socket, LinesCodec::new());
|
||||
let (mut writer, mut reader) = framed.split();
|
||||
|
||||
let h = handler.clone();
|
||||
let read = async move {
|
||||
while let Some(line) = reader
|
||||
.try_next()
|
||||
.await
|
||||
.map_err(|e| error!("error reading line: {}", e))?
|
||||
{
|
||||
let request = serde_json::from_str(&line)
|
||||
.map_err(|e| error!("error serializing line: {}", e))?;
|
||||
let resp = h.handle_rpc_requests(request, worker_id);
|
||||
h.workers.send_to(worker_id, resp);
|
||||
}
|
||||
|
||||
Result::<_, ()>::Ok(())
|
||||
};
|
||||
|
||||
let write = async move {
|
||||
while let Some(line) = rx.next().await {
|
||||
writer
|
||||
.send(line)
|
||||
.await
|
||||
.map_err(|e| error!("error writing line: {}", e))?;
|
||||
}
|
||||
|
||||
Result::<_, ()>::Ok(())
|
||||
};
|
||||
|
||||
let task = async move {
|
||||
pin_mut!(read, write);
|
||||
futures::future::select(read, write).await;
|
||||
handler.workers.remove_worker(worker_id);
|
||||
info!("Worker {} disconnected", worker_id);
|
||||
};
|
||||
tokio::spawn(task);
|
||||
let server = async_stream::stream! {
|
||||
loop {
|
||||
match listener.accept().await {
|
||||
Ok((socket, _)) => yield socket,
|
||||
Err(e) => {
|
||||
error!("accept error = {:?}", e);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
.for_each(move |socket| {
|
||||
let handler = handler.clone();
|
||||
async move {
|
||||
// Spawn a task to process the connection
|
||||
let (tx, mut rx) = mpsc::unbounded();
|
||||
|
||||
let worker_id = handler.workers.add_worker(tx);
|
||||
info!("Worker {} connected", worker_id);
|
||||
|
||||
let framed = Framed::new(socket, LinesCodec::new());
|
||||
let (mut writer, mut reader) = framed.split();
|
||||
|
||||
let h = handler.clone();
|
||||
let read = async move {
|
||||
while let Some(line) = reader
|
||||
.try_next()
|
||||
.await
|
||||
.map_err(|e| error!("error reading line: {}", e))?
|
||||
{
|
||||
let request = serde_json::from_str(&line)
|
||||
.map_err(|e| error!("error serializing line: {}", e))?;
|
||||
let resp = h.handle_rpc_requests(request, worker_id);
|
||||
h.workers.send_to(worker_id, resp);
|
||||
}
|
||||
|
||||
Result::<_, ()>::Ok(())
|
||||
};
|
||||
|
||||
let write = async move {
|
||||
while let Some(line) = rx.next().await {
|
||||
writer
|
||||
.send(line)
|
||||
.await
|
||||
.map_err(|e| error!("error writing line: {}", e))?;
|
||||
}
|
||||
|
||||
Result::<_, ()>::Ok(())
|
||||
};
|
||||
|
||||
let task = async move {
|
||||
pin_mut!(read, write);
|
||||
futures::future::select(read, write).await;
|
||||
handler.workers.remove_worker(worker_id);
|
||||
info!("Worker {} disconnected", worker_id);
|
||||
};
|
||||
tokio::spawn(task);
|
||||
}
|
||||
});
|
||||
server.await
|
||||
};
|
||||
|
||||
let mut rt = Runtime::new().unwrap();
|
||||
let rt = Runtime::new().unwrap();
|
||||
rt.block_on(task);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue