Update hyper/tokio/futures dependencies (#3214)

* Update hyper, tokio, futures versions

* Update stratum server

* Update API

* Update webhooks
This commit is contained in:
jaspervdm 2020-02-18 23:45:27 +01:00 committed by GitHub
parent 2d4a2c30ce
commit 6bca34c6a8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 838 additions and 772 deletions

935
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -13,24 +13,23 @@ edition = "2018"
easy-jsonrpc-mw = "0.5.3"
failure = "0.1.1"
failure_derive = "0.1.1"
hyper = "0.12"
hyper = "0.13"
lazy_static = "1"
regex = "1"
ring = "0.13"
ring = "0.16"
serde = "1"
serde_derive = "1"
serde_json = "1"
log = "0.4"
tokio = "0.1.7"
tokio-core = "0.1.17"
tokio-tcp = "0.1"
tokio-rustls = "0.7"
tokio = { version = "0.2", features = ["full"] }
tokio-rustls = "0.12"
http = "0.1.5"
hyper-rustls = "0.14"
hyper-timeout = "0.2"
futures = "0.1.21"
rustls = "0.13"
hyper-rustls = "0.19"
hyper-timeout = "0.3"
futures = "0.3"
rustls = "0.16"
url = "1.7.0"
bytes = "0.5"
grin_core = { path = "../core", version = "3.1.0-beta.1" }
grin_chain = { path = "../chain", version = "3.1.0-beta.1" }

View file

@ -139,5 +139,5 @@ fn unauthorized_response(basic_realm: &HeaderValue) -> ResponseFuture {
.header(WWW_AUTHENTICATE, basic_realm)
.body(Body::empty())
.unwrap();
Box::new(ok(response))
Box::pin(ok(response))
}

View file

@ -17,23 +17,22 @@
use crate::rest::{Error, ErrorKind};
use crate::util::to_base64;
use failure::{Fail, ResultExt};
use futures::future::{err, ok, Either};
use http::uri::{InvalidUri, Uri};
use hyper::body;
use hyper::header::{ACCEPT, AUTHORIZATION, CONTENT_TYPE, USER_AGENT};
use hyper::rt::{Future, Stream};
use hyper::{Body, Client, Request};
use hyper_rustls;
use hyper_timeout::TimeoutConnector;
use serde::{Deserialize, Serialize};
use serde_json;
use std::time::Duration;
use tokio::runtime::Runtime;
pub type ClientResponseFuture<T> = Box<dyn Future<Item = T, Error = Error> + Send>;
use tokio::runtime::Builder;
/// Helper function to easily issue a HTTP GET request against a given URL that
/// returns a JSON object. Handles request building, JSON deserialization and
/// response code checking.
/// This function spawns a new Tokio runtime, which means it is pretty inefficient for multiple
/// requests. In those situations you are probably better off creating a runtime once and spawning
/// `get_async` tasks on it
pub fn get<T>(url: &str, api_secret: Option<String>) -> Result<T, Error>
where
for<'de> T: Deserialize<'de>,
@ -44,22 +43,18 @@ where
/// Helper function to easily issue an async HTTP GET request against a given
/// URL that returns a future. Handles request building, JSON deserialization
/// and response code checking.
pub fn get_async<T>(url: &str, api_secret: Option<String>) -> ClientResponseFuture<T>
pub async fn get_async<T>(url: &str, api_secret: Option<String>) -> Result<T, Error>
where
for<'de> T: Deserialize<'de> + Send + 'static,
{
match build_request(url, "GET", api_secret, None) {
Ok(req) => Box::new(handle_request_async(req)),
Err(e) => Box::new(err(e)),
}
handle_request_async(build_request(url, "GET", api_secret, None)?).await
}
/// Helper function to easily issue a HTTP GET request
/// on a given URL that returns nothing. Handles request
/// building and response code checking.
pub fn get_no_ret(url: &str, api_secret: Option<String>) -> Result<(), Error> {
let req = build_request(url, "GET", api_secret, None)?;
send_request(req)?;
send_request(build_request(url, "GET", api_secret, None)?)?;
Ok(())
}
@ -80,20 +75,17 @@ where
/// provided JSON object as body on a given URL that returns a future. Handles
/// request building, JSON serialization and deserialization, and response code
/// checking.
pub fn post_async<IN, OUT>(
pub async fn post_async<IN, OUT>(
url: &str,
input: &IN,
api_secret: Option<String>,
) -> ClientResponseFuture<OUT>
) -> Result<OUT, Error>
where
IN: Serialize,
OUT: Send + 'static,
for<'de> OUT: Deserialize<'de>,
{
match create_post_request(url, api_secret, input) {
Ok(req) => Box::new(handle_request_async(req)),
Err(e) => Box::new(err(e)),
}
handle_request_async(create_post_request(url, api_secret, input)?).await
}
/// Helper function to easily issue a HTTP POST request with the provided JSON
@ -104,8 +96,7 @@ pub fn post_no_ret<IN>(url: &str, api_secret: Option<String>, input: &IN) -> Res
where
IN: Serialize,
{
let req = create_post_request(url, api_secret, input)?;
send_request(req)?;
send_request(create_post_request(url, api_secret, input)?)?;
Ok(())
}
@ -113,18 +104,16 @@ where
/// provided JSON object as body on a given URL that returns a future. Handles
/// request building, JSON serialization and deserialization, and response code
/// checking.
pub fn post_no_ret_async<IN>(
pub async fn post_no_ret_async<IN>(
url: &str,
api_secret: Option<String>,
input: &IN,
) -> ClientResponseFuture<()>
) -> Result<(), Error>
where
IN: Serialize,
{
match create_post_request(url, api_secret, input) {
Ok(req) => Box::new(send_request_async(req).and_then(|_| ok(()))),
Err(e) => Box::new(err(e)),
}
send_request_async(create_post_request(url, api_secret, input)?).await?;
Ok(())
}
fn build_request(
@ -133,19 +122,15 @@ fn build_request(
api_secret: Option<String>,
body: Option<String>,
) -> Result<Request<Body>, Error> {
let uri = url.parse::<Uri>().map_err::<Error, _>(|e: InvalidUri| {
e.context(ErrorKind::Argument(format!("Invalid url {}", url)))
.into()
})?;
let mut builder = Request::builder();
if let Some(api_secret) = api_secret {
let basic_auth = format!("Basic {}", to_base64(&format!("grin:{}", api_secret)));
builder.header(AUTHORIZATION, basic_auth);
builder = builder.header(AUTHORIZATION, basic_auth);
}
builder
.method(method)
.uri(uri)
.uri(url)
.header(USER_AGENT, "grin-client")
.header(ACCEPT, "application/json")
.header(CONTENT_TYPE, "application/json")
@ -183,55 +168,50 @@ where
})
}
fn handle_request_async<T>(req: Request<Body>) -> ClientResponseFuture<T>
async fn handle_request_async<T>(req: Request<Body>) -> Result<T, Error>
where
for<'de> T: Deserialize<'de> + Send + 'static,
{
Box::new(send_request_async(req).and_then(|data| {
serde_json::from_str(&data).map_err(|e| {
e.context(ErrorKind::ResponseError("Cannot parse response".to_owned()))
.into()
})
}))
let data = send_request_async(req).await?;
let ser = serde_json::from_str(&data)
.map_err(|e| e.context(ErrorKind::ResponseError("Cannot parse response".to_owned())))?;
Ok(ser)
}
fn send_request_async(req: Request<Body>) -> Box<dyn Future<Item = String, Error = Error> + Send> {
let https = hyper_rustls::HttpsConnector::new(1);
async fn send_request_async(req: Request<Body>) -> Result<String, Error> {
let https = hyper_rustls::HttpsConnector::new();
let mut connector = TimeoutConnector::new(https);
connector.set_connect_timeout(Some(Duration::from_secs(20)));
connector.set_read_timeout(Some(Duration::from_secs(20)));
connector.set_write_timeout(Some(Duration::from_secs(20)));
let client = Client::builder().build::<_, hyper::Body>(connector);
Box::new(
client
let client = Client::builder().build::<_, Body>(connector);
let resp = client
.request(req)
.map_err(|e| ErrorKind::RequestError(format!("Cannot make request: {}", e)).into())
.and_then(|resp| {
.await
.map_err(|e| ErrorKind::RequestError(format!("Cannot make request: {}", e)))?;
if !resp.status().is_success() {
Either::A(err(ErrorKind::RequestError(format!(
return Err(ErrorKind::RequestError(format!(
"Wrong response code: {} with data {:?}",
resp.status(),
resp.body()
))
.into()))
} else {
Either::B(
resp.into_body()
.map_err(|e| {
ErrorKind::RequestError(format!("Cannot read response body: {}", e))
.into()
})
.concat2()
.and_then(|ch| ok(String::from_utf8_lossy(&ch.to_vec()).to_string())),
)
.into());
}
}),
)
let raw = body::to_bytes(resp)
.await
.map_err(|e| ErrorKind::RequestError(format!("Cannot read response body: {}", e)))?;
Ok(String::from_utf8_lossy(&raw).to_string())
}
pub fn send_request(req: Request<Body>) -> Result<String, Error> {
let task = send_request_async(req);
let mut rt =
Runtime::new().context(ErrorKind::Internal("can't create Tokio runtime".to_owned()))?;
Ok(rt.block_on(task)?)
let mut rt = Builder::new()
.basic_scheduler()
.enable_all()
.build()
.map_err(|e| ErrorKind::RequestError(format!("{}", e)))?;
rt.block_on(send_request_async(req))
}

View file

@ -56,8 +56,6 @@ use crate::util::to_base64;
use crate::util::RwLock;
use crate::web::*;
use easy_jsonrpc_mw::{Handler, MaybeReply};
use futures::future::ok;
use futures::Future;
use hyper::{Body, Request, Response, StatusCode};
use serde::Serialize;
use std::net::SocketAddr;
@ -139,8 +137,6 @@ pub fn node_apis(
}
}
type NodeResponseFuture = Box<dyn Future<Item = Response<Body>, Error = Error> + Send>;
/// V2 API Handler/Wrapper for owner functions
pub struct OwnerAPIHandlerV2 {
pub chain: Weak<Chain>,
@ -157,48 +153,40 @@ impl OwnerAPIHandlerV2 {
sync_state,
}
}
fn call_api(
&self,
req: Request<Body>,
api: Owner,
) -> Box<dyn Future<Item = serde_json::Value, Error = Error> + Send> {
Box::new(parse_body(req).and_then(move |val: serde_json::Value| {
let owner_api = &api as &dyn OwnerRpc;
match owner_api.handle_request(val) {
MaybeReply::Reply(r) => ok(r),
MaybeReply::DontReply => {
// Since it's http, we need to return something. We return [] because jsonrpc
// clients will parse it as an empty batch response.
ok(serde_json::json!([]))
}
}
}))
}
fn handle_post_request(&self, req: Request<Body>) -> NodeResponseFuture {
impl crate::router::Handler for OwnerAPIHandlerV2 {
fn post(&self, req: Request<Body>) -> ResponseFuture {
let api = Owner::new(
self.chain.clone(),
self.peers.clone(),
self.sync_state.clone(),
);
Box::new(
self.call_api(req, api)
.and_then(|resp| ok(json_response_pretty(&resp))),
)
}
}
impl crate::router::Handler for OwnerAPIHandlerV2 {
fn post(&self, req: Request<Body>) -> ResponseFuture {
Box::new(self.handle_post_request(req).and_then(ok).or_else(|e| {
Box::pin(async move {
match parse_body(req).await {
Ok(val) => {
let owner_api = &api as &dyn OwnerRpc;
let res = match owner_api.handle_request(val) {
MaybeReply::Reply(r) => r,
MaybeReply::DontReply => {
// Since it's http, we need to return something. We return [] because jsonrpc
// clients will parse it as an empty batch response.
serde_json::json!([])
}
};
Ok(json_response_pretty(&res))
}
Err(e) => {
error!("Request Error: {:?}", e);
ok(create_error_response(e))
}))
Ok(create_error_response(e))
}
}
})
}
fn options(&self, _req: Request<Body>) -> ResponseFuture {
Box::new(ok(create_ok_response("{}")))
Box::pin(async { Ok(create_ok_response("{}")) })
}
}
@ -222,48 +210,40 @@ impl ForeignAPIHandlerV2 {
sync_state,
}
}
fn call_api(
&self,
req: Request<Body>,
api: Foreign,
) -> Box<dyn Future<Item = serde_json::Value, Error = Error> + Send> {
Box::new(parse_body(req).and_then(move |val: serde_json::Value| {
let foreign_api = &api as &dyn ForeignRpc;
match foreign_api.handle_request(val) {
MaybeReply::Reply(r) => ok(r),
MaybeReply::DontReply => {
// Since it's http, we need to return something. We return [] because jsonrpc
// clients will parse it as an empty batch response.
ok(serde_json::json!([]))
}
}
}))
}
fn handle_post_request(&self, req: Request<Body>) -> NodeResponseFuture {
impl crate::router::Handler for ForeignAPIHandlerV2 {
fn post(&self, req: Request<Body>) -> ResponseFuture {
let api = Foreign::new(
self.chain.clone(),
self.tx_pool.clone(),
self.sync_state.clone(),
);
Box::new(
self.call_api(req, api)
.and_then(|resp| ok(json_response_pretty(&resp))),
)
}
}
impl crate::router::Handler for ForeignAPIHandlerV2 {
fn post(&self, req: Request<Body>) -> ResponseFuture {
Box::new(self.handle_post_request(req).and_then(ok).or_else(|e| {
Box::pin(async move {
match parse_body(req).await {
Ok(val) => {
let foreign_api = &api as &dyn ForeignRpc;
let res = match foreign_api.handle_request(val) {
MaybeReply::Reply(r) => r,
MaybeReply::DontReply => {
// Since it's http, we need to return something. We return [] because jsonrpc
// clients will parse it as an empty batch response.
serde_json::json!([])
}
};
Ok(json_response_pretty(&res))
}
Err(e) => {
error!("Request Error: {:?}", e);
ok(create_error_response(e))
}))
Ok(create_error_response(e))
}
}
})
}
fn options(&self, _req: Request<Body>) -> ResponseFuture {
Box::new(ok(create_ok_response("{}")))
Box::pin(async { Ok(create_ok_response("{}")) })
}
}
@ -308,7 +288,7 @@ fn create_ok_response(json: &str) -> Response<Body> {
/// Whenever the status code is `StatusCode::OK` the text parameter should be
/// valid JSON as the content type header will be set to `application/json'
fn response<T: Into<Body>>(status: StatusCode, text: T) -> Response<Body> {
let mut builder = &mut Response::builder();
let mut builder = Response::builder();
builder = builder
.status(status)

View file

@ -24,8 +24,6 @@ use crate::util;
use crate::util::RwLock;
use crate::web::*;
use failure::ResultExt;
use futures::future::{err, ok};
use futures::Future;
use hyper::{Body, Request, StatusCode};
use std::sync::Weak;
@ -102,30 +100,23 @@ pub struct PoolPushHandler {
pub tx_pool: Weak<RwLock<pool::TransactionPool>>,
}
impl PoolPushHandler {
fn update_pool(&self, req: Request<Body>) -> Box<dyn Future<Item = (), Error = Error> + Send> {
async fn update_pool(
pool: Weak<RwLock<pool::TransactionPool>>,
req: Request<Body>,
) -> Result<(), Error> {
let pool = w(&pool)?;
let params = QueryParams::from(req.uri().query());
let fluff = params.get("fluff").is_some();
let pool_arc = match w(&self.tx_pool) {
Ok(p) => p,
Err(e) => return Box::new(err(e)),
};
Box::new(
parse_body(req)
.and_then(move |wrapper: TxWrapper| {
util::from_hex(wrapper.tx_hex)
.map_err(|e| ErrorKind::RequestError(format!("Bad request: {}", e)).into())
})
.and_then(move |tx_bin| {
let wrapper: TxWrapper = parse_body(req).await?;
let tx_bin = util::from_hex(wrapper.tx_hex)
.map_err(|e| ErrorKind::RequestError(format!("Bad request: {}", e)))?;
// All wallet api interaction explicitly uses protocol version 1 for now.
let version = ProtocolVersion(1);
let tx: Transaction = ser::deserialize(&mut &tx_bin[..], version)
.map_err(|e| ErrorKind::RequestError(format!("Bad request: {}", e)))?;
ser::deserialize(&mut &tx_bin[..], version)
.map_err(|e| ErrorKind::RequestError(format!("Bad request: {}", e)).into())
})
.and_then(move |tx: Transaction| {
let source = pool::TxSource::PushApi;
info!(
"Pushing transaction {} to pool (inputs: {}, outputs: {}, kernels: {})",
@ -136,7 +127,7 @@ impl PoolPushHandler {
);
// Push to tx pool.
let mut tx_pool = pool_arc.write();
let mut tx_pool = pool.write();
let header = tx_pool
.blockchain
.chain_head()
@ -145,22 +136,19 @@ impl PoolPushHandler {
.add_to_pool(source, tx, !fluff, &header)
.context(ErrorKind::Internal("Failed to update pool".to_owned()))?;
Ok(())
}),
)
}
}
impl Handler for PoolPushHandler {
fn post(&self, req: Request<Body>) -> ResponseFuture {
Box::new(
self.update_pool(req)
.and_then(|_| ok(just_response(StatusCode::OK, "")))
.or_else(|e| {
ok(just_response(
StatusCode::INTERNAL_SERVER_ERROR,
format!("failed: {}", e),
))
}),
)
let pool = self.tx_pool.clone();
Box::pin(async move {
let res = match update_pool(pool, req).await {
Ok(_) => just_response(StatusCode::OK, ""),
Err(e) => {
just_response(StatusCode::INTERNAL_SERVER_ERROR, format!("failed: {}", e))
}
};
Ok(res)
})
}
}

View file

@ -21,19 +21,22 @@
use crate::router::{Handler, HandlerObj, ResponseFuture, Router, RouterError};
use crate::web::response;
use failure::{Backtrace, Context, Fail, ResultExt};
use futures::sync::oneshot;
use futures::Stream;
use hyper::rt::Future;
use hyper::{rt, Body, Request, Server, StatusCode};
use futures::channel::oneshot;
use futures::TryStreamExt;
use hyper::server::accept;
use hyper::service::make_service_fn;
use hyper::{Body, Request, Server, StatusCode};
use rustls;
use rustls::internal::pemfile;
use std::convert::Infallible;
use std::fmt::{self, Display};
use std::fs::File;
use std::net::SocketAddr;
use std::sync::Arc;
use std::{io, thread};
use tokio_rustls::ServerConfigExt;
use tokio_tcp;
use tokio::net::TcpListener;
use tokio::runtime::Runtime;
use tokio_rustls::TlsAcceptor;
/// Errors that can be returned by an ApiEndpoint implementation.
#[derive(Debug)]
@ -199,13 +202,23 @@ impl ApiServer {
thread::Builder::new()
.name("apis".to_string())
.spawn(move || {
let server = Server::bind(&addr)
.serve(router)
let server = async move {
let server = Server::bind(&addr).serve(make_service_fn(move |_| {
let router = router.clone();
async move { Ok::<_, Infallible>(router) }
}));
// TODO graceful shutdown is unstable, investigate
//.with_graceful_shutdown(rx)
.map_err(|e| eprintln!("HTTP API server error: {}", e));
rt::run(server);
server.await
};
let mut rt = Runtime::new()
.map_err(|e| eprintln!("HTTP API server error: {}", e))
.unwrap();
if let Err(e) = rt.block_on(server) {
eprintln!("HTTP API server error: {}", e)
}
})
.map_err(|_| ErrorKind::Internal("failed to spawn API thread".to_string()).into())
}
@ -225,28 +238,31 @@ impl ApiServer {
.into());
}
let tls_conf = conf.build_server_config()?;
let acceptor = TlsAcceptor::from(conf.build_server_config()?);
thread::Builder::new()
.name("apis".to_string())
.spawn(move || {
let listener = tokio_tcp::TcpListener::bind(&addr).expect("failed to bind");
let tls = listener
.incoming()
.and_then(move |s| tls_conf.accept_async(s))
.then(|r| match r {
Ok(x) => Ok::<_, io::Error>(Some(x)),
Err(e) => {
error!("accept_async failed: {}", e);
Ok(None)
}
})
.filter_map(|x| x);
let server = Server::builder(tls)
.serve(router)
.map_err(|e| eprintln!("HTTP API server error: {}", e));
let server = async move {
let mut listener = TcpListener::bind(&addr).await.expect("failed to bind");
let listener = listener.incoming().and_then(move |s| acceptor.accept(s));
rt::run(server);
let server = Server::builder(accept::from_stream(listener)).serve(
make_service_fn(move |_| {
let router = router.clone();
async move { Ok::<_, Infallible>(router) }
}),
);
server.await
};
let mut rt = Runtime::new()
.map_err(|e| eprintln!("HTTP API server error: {}", e))
.unwrap();
if let Err(e) = rt.block_on(server) {
eprintln!("HTTP API server error: {}", e)
}
})
.map_err(|_| ErrorKind::Internal("failed to spawn API thread".to_string()).into())
}

View file

@ -12,21 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use futures::future;
use futures::future::{self, Future};
use hyper;
use hyper::rt::Future;
use hyper::service::{NewService, Service};
use hyper::service::Service;
use hyper::{Body, Method, Request, Response, StatusCode};
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
lazy_static! {
static ref WILDCARD_HASH: u64 = calculate_hash(&"*");
static ref WILDCARD_STOP_HASH: u64 = calculate_hash(&"**");
}
pub type ResponseFuture = Box<dyn Future<Item = Response<Body>, Error = hyper::Error> + Send>;
pub type ResponseFuture =
Pin<Box<dyn Future<Output = Result<Response<Body>, hyper::Error>> + Send>>;
pub trait Handler {
fn get(&self, _req: Request<Body>) -> ResponseFuture {
@ -203,13 +205,16 @@ impl Router {
}
}
impl Service for Router {
type ReqBody = Body;
type ResBody = Body;
impl Service<Request<Body>> for Router {
type Response = Response<Body>;
type Error = hyper::Error;
type Future = ResponseFuture;
fn call(&mut self, req: Request<Self::ReqBody>) -> Self::Future {
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Request<Body>) -> Self::Future {
match self.get(req.uri().path()) {
Err(_) => not_found(),
Ok(mut handlers) => match handlers.next() {
@ -220,18 +225,6 @@ impl Service for Router {
}
}
impl NewService for Router {
type ReqBody = Body;
type ResBody = Body;
type Error = hyper::Error;
type InitError = hyper::Error;
type Service = Router;
type Future = Box<dyn Future<Item = Self::Service, Error = Self::InitError> + Send>;
fn new_service(&self) -> Self::Future {
Box::new(future::ok(self.clone()))
}
}
impl Node {
fn new(key: u64, value: Option<HandlerObj>) -> Node {
Node {
@ -276,7 +269,7 @@ impl Node {
pub fn not_found() -> ResponseFuture {
let mut response = Response::new(Body::empty());
*response.status_mut() = StatusCode::NOT_FOUND;
Box::new(future::ok(response))
Box::pin(future::ok(response))
}
fn calculate_hash<T: Hash>(t: &T) -> u64 {
@ -305,19 +298,20 @@ fn collect_node_middleware(handlers: &mut Vec<HandlerObj>, node: &Node) {
mod tests {
use super::*;
use tokio::prelude::future::ok;
use tokio_core::reactor::Core;
use futures::executor::block_on;
struct HandlerImpl(u16);
impl Handler for HandlerImpl {
fn get(&self, _req: Request<Body>) -> ResponseFuture {
Box::new(future::ok(
Response::builder()
.status(self.0)
let code = self.0;
Box::pin(async move {
let res = Response::builder()
.status(code)
.body(Body::default())
.unwrap(),
))
.unwrap();
Ok(res)
})
}
}
@ -358,15 +352,18 @@ mod tests {
.unwrap();
let call_handler = |url| {
let mut event_loop = Core::new().unwrap();
let task = routes
let task = async {
let resp = routes
.get(url)
.unwrap()
.next()
.unwrap()
.get(Request::new(Body::default()))
.and_then(|resp| ok(resp.status().as_u16()));
event_loop.run(task).unwrap()
.await
.unwrap();
resp.status().as_u16()
};
block_on(task)
};
assert_eq!(call_handler("/v1/users"), 101);

View file

@ -1,7 +1,8 @@
use crate::rest::*;
use crate::router::ResponseFuture;
use futures::future::{err, ok};
use futures::{Future, Stream};
use bytes::Buf;
use futures::future::ok;
use hyper::body;
use hyper::{Body, Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use serde_json;
@ -10,21 +11,16 @@ use std::fmt::Debug;
use url::form_urlencoded;
/// Parse request body
pub fn parse_body<T>(req: Request<Body>) -> Box<dyn Future<Item = T, Error = Error> + Send>
pub async fn parse_body<T>(req: Request<Body>) -> Result<T, Error>
where
for<'de> T: Deserialize<'de> + Send + 'static,
{
Box::new(
req.into_body()
.concat2()
.map_err(|e| ErrorKind::RequestError(format!("Failed to read request: {}", e)).into())
.and_then(|body| match serde_json::from_reader(&body.to_vec()[..]) {
Ok(obj) => ok(obj),
Err(e) => {
err(ErrorKind::RequestError(format!("Invalid request body: {}", e)).into())
}
}),
)
let raw = body::to_bytes(req.into_body())
.await
.map_err(|e| ErrorKind::RequestError(format!("Failed to read request: {}", e)))?;
serde_json::from_reader(raw.bytes())
.map_err(|e| ErrorKind::RequestError(format!("Invalid request body: {}", e)).into())
}
/// Convert Result to ResponseFuture
@ -83,7 +79,7 @@ pub fn just_response<T: Into<Body> + Debug>(status: StatusCode, text: T) -> Resp
/// Text response as future
pub fn response<T: Into<Body> + Debug>(status: StatusCode, text: T) -> ResponseFuture {
Box::new(ok(just_response(status, text)))
Box::pin(ok(just_response(status, text)))
}
pub struct QueryParams {

View file

@ -1 +1,2 @@
hard_tabs = true
edition = "2018"

View file

@ -10,10 +10,10 @@ workspace = ".."
edition = "2018"
[dependencies]
hyper = "0.12"
hyper-rustls = "0.14"
hyper = "0.13"
hyper-rustls = "0.19"
fs2 = "0.4"
futures = "0.1"
futures = "0.3"
http = "0.1"
lmdb-zero = "0.4.4"
rand = "0.6"
@ -22,7 +22,8 @@ log = "0.4"
serde_derive = "1"
serde_json = "1"
chrono = "0.4.4"
tokio = "0.1.11"
tokio = {version = "0.2", features = ["full"] }
tokio-util = { version = "0.2", features = ["codec"] }
walkdir = "2.2.9"
grin_api = { path = "../api", version = "3.1.0-beta.1" }

View file

@ -24,7 +24,7 @@ use crate::common::types::{ServerConfig, WebHooksConfig};
use crate::core::core;
use crate::core::core::hash::Hashed;
use crate::p2p::types::PeerAddr;
use futures::future::Future;
use futures::TryFutureExt;
use hyper::client::HttpConnector;
use hyper::header::HeaderValue;
use hyper::Client;
@ -33,7 +33,7 @@ use hyper_rustls::HttpsConnector;
use serde::Serialize;
use serde_json::{json, to_string};
use std::time::Duration;
use tokio::runtime::Runtime;
use tokio::runtime::{Builder, Runtime};
/// Returns the list of event hooks that will be initialized for network events
pub fn init_net_hooks(config: &ServerConfig) -> Vec<Box<dyn NetEvents + Send + Sync>> {
@ -153,7 +153,7 @@ fn parse_url(value: &Option<String>) -> Option<hyper::Uri> {
Ok(value) => value,
Err(_) => panic!("Invalid url : {}", url),
};
let scheme = uri.scheme_part().map(|s| s.as_str());
let scheme = uri.scheme().map(|s| s.as_str());
if (scheme != Some("http")) && (scheme != Some("https")) {
panic!(
"Invalid url scheme {}, expected one of ['http', https']",
@ -199,7 +199,7 @@ impl WebHook {
nthreads, timeout
);
let https = HttpsConnector::new(nthreads as usize);
let https = HttpsConnector::new();
let client = Client::builder()
.keep_alive_timeout(keep_alive)
.build::<_, hyper::Body>(https);
@ -210,7 +210,12 @@ impl WebHook {
header_received_url,
block_accepted_url,
client,
runtime: Runtime::new().unwrap(),
runtime: Builder::new()
.threaded_scheduler()
.enable_all()
.core_threads(nthreads as usize)
.build()
.unwrap(),
}
}
@ -235,16 +240,11 @@ impl WebHook {
HeaderValue::from_static("application/json"),
);
let future = self
.client
.request(req)
.map(|_res| {})
.map_err(move |_res| {
let future = self.client.request(req).map_err(move |_res| {
warn!("Error sending POST request to {}", url);
});
let handle = self.runtime.executor();
handle.spawn(future);
self.runtime.spawn(future);
}
fn make_request<T: Serialize>(&self, payload: &T, uri: &Option<hyper::Uri>) -> bool {
if let Some(url) = uri {

View file

@ -14,11 +14,12 @@
//! Mining Stratum Server
use futures::future::Future;
use futures::stream::Stream;
use tokio::io::AsyncRead;
use tokio::io::{lines, write_all};
use futures::channel::mpsc;
use futures::pin_mut;
use futures::{SinkExt, StreamExt, TryStreamExt};
use tokio::net::TcpListener;
use tokio::runtime::Runtime;
use tokio_util::codec::{Framed, LinesCodec};
use crate::util::RwLock;
use chrono::prelude::Utc;
@ -26,7 +27,6 @@ use serde;
use serde_json;
use serde_json::Value;
use std::collections::HashMap;
use std::io::BufReader;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
@ -44,8 +44,6 @@ use crate::mining::mine_block;
use crate::pool;
use crate::util;
use futures::sync::mpsc;
type Tx = mpsc::UnboundedSender<String>;
// ----------------------------------------
@ -600,53 +598,68 @@ impl Handler {
// Worker Factory Thread Function
fn accept_connections(listen_addr: SocketAddr, handler: Arc<Handler>) {
info!("Start tokio stratum server");
let listener = TcpListener::bind(&listen_addr).expect(&format!(
let task = async move {
let mut listener = TcpListener::bind(&listen_addr).await.expect(&format!(
"Stratum: Failed to bind to listen address {}",
listen_addr
));
let server = listener
.incoming()
.filter_map(|s| async { s.map_err(|e| error!("accept error = {:?}", e)).ok() })
.for_each(move |socket| {
let handler = handler.clone();
async move {
// Spawn a task to process the connection
let (tx, rx) = mpsc::unbounded();
let (tx, mut rx) = mpsc::unbounded();
let worker_id = handler.workers.add_worker(tx);
info!("Worker {} connected", worker_id);
let (reader, writer) = socket.split();
let reader = BufReader::new(reader);
let framed = Framed::new(socket, LinesCodec::new());
let (mut writer, mut reader) = framed.split();
let h = handler.clone();
let workers = h.workers.clone();
let input = lines(reader)
.for_each(move |line| {
let request = serde_json::from_str(&line)?;
let read = async move {
while let Some(line) = reader
.try_next()
.await
.map_err(|e| error!("error reading line: {}", e))?
{
let request = serde_json::from_str(&line)
.map_err(|e| error!("error serializing line: {}", e))?;
let resp = h.handle_rpc_requests(request, worker_id);
workers.send_to(worker_id, resp);
Ok(())
})
.map_err(|e| error!("error {}", e));
h.workers.send_to(worker_id, resp);
}
let output = rx.fold(writer, |writer, s| {
let s2 = s + "\n";
write_all(writer, s2.into_bytes())
.map(|(writer, _)| writer)
.map_err(|e| error!("cannot send {}", e))
});
Result::<_, ()>::Ok(())
};
let workers = handler.workers.clone();
let both = output.map(|_| ()).select(input);
tokio::spawn(both.then(move |_| {
workers.remove_worker(worker_id);
let write = async move {
while let Some(line) = rx.next().await {
let line = line + "\n";
writer
.send(line)
.await
.map_err(|e| error!("error writing line: {}", e))?;
}
Result::<_, ()>::Ok(())
};
let task = async move {
pin_mut!(read, write);
futures::future::select(read, write).await;
handler.workers.remove_worker(worker_id);
info!("Worker {} disconnected", worker_id);
Ok(())
}));
Ok(())
})
.map_err(|err| {
error!("accept error = {:?}", err);
};
tokio::spawn(task);
}
});
tokio::run(server.map(|_| ()).map_err(|_| ()));
server.await
};
let mut rt = Runtime::new().unwrap();
rt.block_on(task);
}
// ----------------------------------------