Update hyper/tokio/futures dependencies

This commit is contained in:
Jasper van der Maarel 2020-02-24 16:20:14 +01:00
parent 9213559548
commit 8ca3f9994b
No known key found for this signature in database
GPG key ID: CF87FC5B5B33452F
8 changed files with 259 additions and 953 deletions

662
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -13,8 +13,8 @@ edition = "2018"
[dependencies]
failure = "0.1"
failure_derive = "0.1"
futures = "0.1"
hyper = "0.12"
futures = "0.3"
hyper = "0.13"
rand = "0.5"
serde = "1"
serde_derive = "1"
@ -23,9 +23,7 @@ log = "0.4"
prettytable-rs = "0.7"
ring = "0.16"
term = "0.5"
tokio = "= 0.1.11"
tokio-core = "0.1"
tokio-retry = "0.1"
tokio = { version = "0.2", features = ["full"] }
uuid = { version = "0.7", features = ["serde", "v4"] }
url = "1.7.0"
chrono = { version = "0.4.4", features = ["serde"] }

View file

@ -13,7 +13,7 @@ edition = "2018"
blake2-rfc = "0.2"
failure = "0.1"
failure_derive = "0.1"
futures = "0.1"
futures = "0.3"
rand = "0.5"
semver = "0.9"
serde = "1"
@ -21,13 +21,9 @@ serde_derive = "1"
serde_json = "1"
log = "0.4"
ring = "0.16"
tokio = "= 0.1.11"
tokio-core = "0.1"
tokio-retry = "0.1"
tokio = { version = "0.2", features = ["full"] }
uuid = { version = "0.7", features = ["serde", "v4"] }
chrono = { version = "0.4.4", features = ["serde"] }
jsonrpc-client-core = "0.5.0"
jsonrpc-client-http = "0.5.0"
#http client (copied from grin)
http = "0.1.5"
@ -36,12 +32,8 @@ hyper-timeout = "0.3"
#Socks/Tor
byteorder = "1"
hyper = "0.12"
#hyper-tls = "0.1"
tokio-tcp = "0.1"
tokio-io = "0.1"
#native-tls = "0.1"
#tokio-tls = "0.1"
hyper = "0.13"
hyper-socks2 = "0.4"
ed25519-dalek = "1.0.0-pre.1"
data-encoding = "2"
regex = "1.3"

View file

@ -14,16 +14,11 @@
//! High level JSON/HTTP client API
use crate::client_utils::Socksv5Connector;
use crate::util::to_base64;
use failure::{Backtrace, Context, Fail, ResultExt};
use futures::future::result;
use futures::future::{err, ok, Either};
use futures::stream::Stream;
use http::uri::{InvalidUri, Uri};
use hyper::body;
use hyper::header::{ACCEPT, AUTHORIZATION, CONTENT_TYPE, USER_AGENT};
use hyper::rt::Future;
use hyper::{self, Body, Request};
use hyper::{self, Body, Client as HyperClient, Request, Uri};
use hyper_rustls;
use hyper_timeout::TimeoutConnector;
use serde::{Deserialize, Serialize};
@ -89,8 +84,6 @@ impl From<Context<ErrorKind>> for Error {
}
}
pub type ClientResponseFuture<T> = Box<dyn Future<Item = T, Error = Error> + Send>;
pub struct Client {
/// Whether to use socks proxy
pub use_socks: bool,
@ -120,18 +113,16 @@ impl Client {
/// Helper function to easily issue an async HTTP GET request against a given
/// URL that returns a future. Handles request building, JSON deserialization
/// and response code checking.
pub fn _get_async<'a, T>(
pub async fn _get_async<'a, T>(
&self,
url: &'a str,
api_secret: Option<String>,
) -> ClientResponseFuture<T>
) -> Result<T, Error>
where
for<'de> T: Deserialize<'de> + Send + 'static,
{
match self.build_request(url, "GET", api_secret, None) {
Ok(req) => Box::new(self.handle_request_async(req)),
Err(e) => Box::new(err(e)),
}
self.handle_request_async(self.build_request(url, "GET", api_secret, None)?)
.await
}
/// Helper function to easily issue a HTTP GET request
@ -165,21 +156,19 @@ impl Client {
/// provided JSON object as body on a given URL that returns a future. Handles
/// request building, JSON serialization and deserialization, and response code
/// checking.
pub fn post_async<IN, OUT>(
pub async fn post_async<IN, OUT>(
&self,
url: &str,
input: &IN,
api_secret: Option<String>,
) -> ClientResponseFuture<OUT>
) -> Result<OUT, Error>
where
IN: Serialize,
OUT: Send + 'static,
for<'de> OUT: Deserialize<'de>,
{
match self.create_post_request(url, api_secret, input) {
Ok(req) => Box::new(self.handle_request_async(req)),
Err(e) => Box::new(err(e)),
}
self.handle_request_async(self.create_post_request(url, api_secret, input)?)
.await
}
/// Helper function to easily issue a HTTP POST request with the provided JSON
@ -204,19 +193,18 @@ impl Client {
/// provided JSON object as body on a given URL that returns a future. Handles
/// request building, JSON serialization and deserialization, and response code
/// checking.
pub fn _post_no_ret_async<IN>(
pub async fn _post_no_ret_async<IN>(
&self,
url: &str,
api_secret: Option<String>,
input: &IN,
) -> ClientResponseFuture<()>
) -> Result<(), Error>
where
IN: Serialize,
{
match self.create_post_request(url, api_secret, input) {
Ok(req) => Box::new(self.send_request_async(req).and_then(|_| ok(()))),
Err(e) => Box::new(err(e)),
}
self.send_request_async(self.create_post_request(url, api_secret, input)?)
.await?;
Ok(())
}
fn build_request(
@ -226,14 +214,13 @@ impl Client {
api_secret: Option<String>,
body: Option<String>,
) -> Result<Request<Body>, Error> {
let uri = url.parse::<Uri>().map_err::<Error, _>(|e: InvalidUri| {
e.context(ErrorKind::Argument(format!("Invalid url {}", url)))
.into()
})?;
let uri: Uri = url
.parse()
.map_err(|_| ErrorKind::RequestError(format!("Invalid url {}", url)))?;
let mut builder = Request::builder();
if let Some(api_secret) = api_secret {
let basic_auth = format!("Basic {}", to_base64(&format!("grin:{}", api_secret)));
builder.header(AUTHORIZATION, basic_auth);
builder = builder.header(AUTHORIZATION, basic_auth);
}
builder
@ -277,124 +264,70 @@ impl Client {
})
}
fn handle_request_async<T>(&self, req: Request<Body>) -> ClientResponseFuture<T>
async fn handle_request_async<T>(&self, req: Request<Body>) -> Result<T, Error>
where
for<'de> T: Deserialize<'de> + Send + 'static,
{
Box::new(self.send_request_async(req).and_then(|data| {
serde_json::from_str(&data).map_err(|e| {
e.context(ErrorKind::ResponseError("Cannot parse response".to_owned()))
.into()
})
}))
let data = self.send_request_async(req).await?;
let ser = serde_json::from_str(&data)
.map_err(|e| e.context(ErrorKind::ResponseError("Cannot parse response".to_owned())))?;
Ok(ser)
}
fn send_request_async(
&self,
req: Request<Body>,
) -> Box<dyn Future<Item = String, Error = Error> + Send> {
//TODO: redundant code, enjoy figuring out type params for dynamic dispatch of client
match self.use_socks {
false => {
let https = hyper_rustls::HttpsConnector::new(1);
async fn send_request_async(&self, req: Request<Body>) -> Result<String, Error> {
let resp = if !self.use_socks {
let https = hyper_rustls::HttpsConnector::new();
let mut connector = TimeoutConnector::new(https);
connector.set_connect_timeout(Some(Duration::from_secs(20)));
connector.set_read_timeout(Some(Duration::from_secs(20)));
connector.set_write_timeout(Some(Duration::from_secs(20)));
let client = hyper::Client::builder().build::<_, hyper::Body>(connector);
Box::new(
client
.request(req)
.map_err(|e| {
ErrorKind::RequestError(format!("Cannot make request: {}", e)).into()
})
.and_then(|resp| {
if !resp.status().is_success() {
Either::A(err(ErrorKind::RequestError(format!(
"Wrong response code: {} with data {:?}",
resp.status(),
resp.body()
))
.into()))
let client = HyperClient::builder().build::<_, Body>(connector);
client.request(req).await
} else {
Either::B(
resp.into_body()
.map_err(|e| {
ErrorKind::RequestError(format!(
"Cannot read response body: {}",
e
))
.into()
})
.concat2()
.and_then(|ch| {
ok(String::from_utf8_lossy(&ch.to_vec()).to_string())
}),
)
}
}),
)
}
true => {
let addr = match self.socks_proxy_addr {
Some(a) => a,
None => {
return Box::new(result(Err(ErrorKind::RequestError(
"Can't parse Socks proxy address".to_string(),
)
.into())))
}
let addr = self.socks_proxy_addr.ok_or_else(|| {
ErrorKind::RequestError("Missing Socks proxy address".to_string())
})?;
let auth = format!("{}:{}", addr.ip(), addr.port());
let https = hyper_rustls::HttpsConnector::new();
let socks = hyper_socks2::SocksConnector {
proxy_addr: hyper::Uri::builder()
.scheme("socks5")
.authority(auth.as_str())
.path_and_query("/")
.build()
.map_err(|_| {
ErrorKind::RequestError("Can't parse Socks proxy address".to_string())
})?,
auth: None,
connector: https,
};
let socks_connector = Socksv5Connector::new(addr);
let mut connector = TimeoutConnector::new(socks_connector);
let mut connector = TimeoutConnector::new(socks);
connector.set_connect_timeout(Some(Duration::from_secs(20)));
connector.set_read_timeout(Some(Duration::from_secs(20)));
connector.set_write_timeout(Some(Duration::from_secs(20)));
let client = hyper::Client::builder().build::<_, hyper::Body>(connector);
Box::new(
client
.request(req)
.map_err(|e| {
ErrorKind::RequestError(format!("Cannot make request: {}", e)).into()
})
.and_then(|resp| {
if !resp.status().is_success() {
Either::A(err(ErrorKind::RequestError(format!(
"Wrong response code: {} with data {:?}",
resp.status(),
resp.body()
))
.into()))
} else {
Either::B(
resp.into_body()
.map_err(|e| {
ErrorKind::RequestError(format!(
"Cannot read response body: {}",
e
))
.into()
})
.concat2()
.and_then(|ch| {
ok(String::from_utf8_lossy(&ch.to_vec()).to_string())
}),
)
}
}),
)
}
}
let client = HyperClient::builder().build::<_, Body>(connector);
client.request(req).await
};
let resp =
resp.map_err(|e| ErrorKind::RequestError(format!("Cannot make request: {}", e)))?;
let raw = body::to_bytes(resp)
.await
.map_err(|e| ErrorKind::RequestError(format!("Cannot read response body: {}", e)))?;
Ok(String::from_utf8_lossy(&raw).to_string())
}
pub fn send_request(&self, req: Request<Body>) -> Result<String, Error> {
let task = self.send_request_async(req);
let mut rt = Builder::new()
.core_threads(1)
.basic_scheduler()
.enable_all()
.build()
.context(ErrorKind::Internal("can't create Tokio runtime".to_owned()))?;
let res = rt.block_on(task);
let _ = rt.shutdown_now().wait();
res
rt.block_on(task)
}
}

View file

@ -14,7 +14,5 @@
mod client;
pub mod json_rpc;
mod socksv5;
pub use self::socksv5::Socksv5Connector;
pub use client::{Client, Error as ClientError};

View file

@ -1,254 +0,0 @@
// MIT License
//
// Copyright (c) 2017 Vesa Vilhonen
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
// Copyright 2019 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use byteorder::{BigEndian, WriteBytesExt};
use futures::future::ok;
use futures::{Future, IntoFuture};
//use hyper_tls::MaybeHttpsStream;
//use native_tls::TlsConnector;
use std::io::{self, Error, ErrorKind, Write};
use std::net::SocketAddr;
use tokio_io::io::{read_exact, write_all};
use tokio_tcp::TcpStream;
//use tokio_tls::TlsConnectorExt;
use hyper::client::connect::{Connect, Connected, Destination};
pub struct Socksv5Connector {
proxy_addr: SocketAddr,
creds: Option<(Vec<u8>, Vec<u8>)>,
}
impl Socksv5Connector {
pub fn new(proxy_addr: SocketAddr) -> Socksv5Connector {
Socksv5Connector {
proxy_addr,
creds: None,
}
}
pub fn _new_with_creds<T: Into<Vec<u8>>>(
proxy_addr: SocketAddr,
creds: (T, T),
) -> io::Result<Socksv5Connector> {
let username = creds.0.into();
let password = creds.1.into();
if username.len() > 255 || password.len() > 255 {
Err(Error::new(ErrorKind::Other, "invalid credentials"))
} else {
Ok(Socksv5Connector {
proxy_addr,
creds: Some((username, password)),
})
}
}
}
impl Connect for Socksv5Connector {
type Transport = TcpStream;
type Error = Error;
type Future = Box<dyn Future<Item = (Self::Transport, Connected), Error = Self::Error> + Send>;
fn connect(&self, dst: Destination) -> Self::Future {
let creds = self.creds.clone();
Box::new(
TcpStream::connect(&self.proxy_addr)
.and_then(move |socket| do_handshake(socket, dst, creds)),
)
}
}
type HandshakeFutureConnected<T> = Box<dyn Future<Item = (T, Connected), Error = Error> + Send>;
type HandshakeFuture<T> = Box<dyn Future<Item = T, Error = Error> + Send>;
fn auth_negotiation(
socket: TcpStream,
creds: Option<(Vec<u8>, Vec<u8>)>,
) -> HandshakeFuture<TcpStream> {
let (username, password) = creds.unwrap();
let mut creds_msg: Vec<u8> = Vec::with_capacity(username.len() + password.len() + 3);
creds_msg.push(1);
creds_msg.push(username.len() as u8);
creds_msg.extend_from_slice(&username);
creds_msg.push(password.len() as u8);
creds_msg.extend_from_slice(&password);
Box::new(
write_all(socket, creds_msg)
.and_then(|(socket, _)| read_exact(socket, [0; 2]))
.and_then(|(socket, resp)| {
if resp[0] == 1 && resp[1] == 0 {
Ok(socket)
} else {
Err(Error::new(ErrorKind::InvalidData, "unauthorized"))
}
}),
)
}
fn answer_hello(
socket: TcpStream,
response: [u8; 2],
creds: Option<(Vec<u8>, Vec<u8>)>,
) -> HandshakeFuture<TcpStream> {
if response[0] == 5 && response[1] == 0 {
Box::new(ok(socket))
} else if response[0] == 5 && response[1] == 2 && creds.is_some() {
Box::new(auth_negotiation(socket, creds).and_then(ok))
} else {
Box::new(
Err(Error::new(
ErrorKind::InvalidData,
"wrong response from socks server",
))
.into_future(),
)
}
}
fn write_addr(socket: TcpStream, req: Destination) -> HandshakeFuture<TcpStream> {
let host = req.host();
if host.len() > u8::max_value() as usize {
return Box::new(Err(Error::new(ErrorKind::InvalidInput, "Host too long")).into_future());
}
let port = match req.port() {
Some(port) => port,
_ if req.scheme() == "https" => 443,
_ if req.scheme() == "http" => 80,
_ => {
return Box::new(
Err(Error::new(
ErrorKind::InvalidInput,
"Supports only http/https",
))
.into_future(),
)
}
};
let mut packet = Vec::new();
packet.write_all(&[5, 1, 0]).unwrap();
packet.write_u8(3).unwrap();
packet.write_u8(host.as_bytes().len() as u8).unwrap();
packet.write_all(host.as_bytes()).unwrap();
packet.write_u16::<BigEndian>(port).unwrap();
Box::new(write_all(socket, packet).map(|(socket, _)| socket))
}
fn read_response(socket: TcpStream, response: [u8; 3]) -> HandshakeFuture<TcpStream> {
if response[0] != 5 {
return Box::new(Err(Error::new(ErrorKind::Other, "invalid version")).into_future());
}
match response[1] {
0 => {}
1 => {
return Box::new(
Err(Error::new(ErrorKind::Other, "general SOCKS server failure")).into_future(),
)
}
2 => {
return Box::new(
Err(Error::new(
ErrorKind::Other,
"connection not allowed by ruleset",
))
.into_future(),
)
}
3 => {
return Box::new(Err(Error::new(ErrorKind::Other, "network unreachable")).into_future())
}
4 => return Box::new(Err(Error::new(ErrorKind::Other, "host unreachable")).into_future()),
5 => {
return Box::new(Err(Error::new(ErrorKind::Other, "connection refused")).into_future())
}
6 => return Box::new(Err(Error::new(ErrorKind::Other, "TTL expired")).into_future()),
7 => {
return Box::new(
Err(Error::new(ErrorKind::Other, "command not supported")).into_future(),
)
}
8 => {
return Box::new(
Err(Error::new(ErrorKind::Other, "address kind not supported")).into_future(),
)
}
_ => return Box::new(Err(Error::new(ErrorKind::Other, "unknown error")).into_future()),
};
if response[2] != 0 {
return Box::new(
Err(Error::new(ErrorKind::InvalidData, "invalid reserved byt")).into_future(),
);
}
Box::new(
read_exact(socket, [0; 1])
.and_then(|(socket, response)| match response[0] {
1 => read_exact(socket, [0; 6]),
_ => unimplemented!(),
})
.map(|(socket, _)| socket),
)
}
fn do_handshake(
socket: TcpStream,
req: Destination,
creds: Option<(Vec<u8>, Vec<u8>)>,
) -> HandshakeFutureConnected<TcpStream> {
let _is_https = req.scheme() == "https";
let _host = req.host();
let method: u8 = creds.clone().map(|_| 2).unwrap_or(0);
let established = write_all(socket, [5, 1, method])
.and_then(|(socket, _)| read_exact(socket, [0; 2]))
.and_then(|(socket, response)| answer_hello(socket, response, creds))
.and_then(|socket| write_addr(socket, req))
.and_then(|socket| read_exact(socket, [0; 3]))
.and_then(|(socket, response)| read_response(socket, response));
/*if is_https {
Box::new(established.and_then(move |socket| {
let tls = TlsConnector::builder().unwrap().build().unwrap();
tls.connect_async(&host, socket)
.map_err(|err| Error::new(ErrorKind::Other, err))
.map(|socket| MaybeHttpsStream::Https(socket))
}))
} else {*/
//Box::new(established.map(|socket| TcpStream::Http(socket)))
Box::new(established.map(|socket| (socket, Connected::new())))
/*}*/
}

View file

@ -259,7 +259,7 @@ impl EncryptedWalletSeed {
let mut key = [0; 64];
pbkdf2::derive(
ring::pbkdf2::PBKDF2_HMAC_SHA512,
NonZeroU32::new_unchecked(100),
NonZeroU32::new(100).unwrap(),
&salt,
password,
&mut key,
@ -271,7 +271,7 @@ impl EncryptedWalletSeed {
enc_bytes.push(0);
}
let unbound_key = aead::UnboundKey::new(&aead::CHACHA20_POLY1305, &key).unwrap();
let sealing_key: aead::SealingKey<RandomNonce> =
let mut sealing_key: aead::SealingKey<RandomNonce> =
aead::BoundKey::new(unbound_key, RandomNonce);
let aad = aead::Aad::empty();
sealing_key.seal_in_place_append_tag(aad, &mut enc_bytes);
@ -301,7 +301,7 @@ impl EncryptedWalletSeed {
let mut key = [0; 32];
pbkdf2::derive(
ring::pbkdf2::PBKDF2_HMAC_SHA512,
NonZeroU32::new_unchecked(100),
NonZeroU32::new(100).unwrap(),
&salt,
password,
&mut key,
@ -311,7 +311,8 @@ impl EncryptedWalletSeed {
n.copy_from_slice(&nonce[0..12]);
let nonce = OpeningNonce(n);
let unbound_key = aead::UnboundKey::new(&aead::CHACHA20_POLY1305, &key).unwrap();
let opening_key: aead::OpeningKey<OpeningNonce> = aead::BoundKey::new(unbound_key, nonce);
let mut opening_key: aead::OpeningKey<OpeningNonce> =
aead::BoundKey::new(unbound_key, nonce);
let aad = aead::Aad::empty();
opening_key.open_in_place(aad, &mut encrypted_seed);

View file

@ -17,7 +17,8 @@
use crate::api::{self, LocatedTxKernel, OutputListing, OutputPrintable};
use crate::core::core::{Transaction, TxKernel};
use crate::libwallet::{NodeClient, NodeVersionInfo};
use futures::{stream, Future, Stream};
use futures::stream::FuturesUnordered;
use futures::TryStreamExt;
use std::collections::HashMap;
use std::env;
use tokio::runtime::Builder;
@ -199,7 +200,6 @@ impl NodeClient for HTTPNodeClient {
.map(|commit| format!("{}", util::to_hex(commit.as_ref().to_vec())))
.collect();
let mut tasks = Vec::new();
// going to leave this here even though we're moving
// to the json RPC api to keep the functionality of
// parallelizing larger requests. Will raise default
@ -223,23 +223,39 @@ impl NodeClient for HTTPNodeClient {
trace!("Output query chunk size is: {}", chunk_size);
let url = format!("{}{}", self.node_url(), ENDPOINT);
let client = Client::new();
/*let res = client.post::<Request, Response>(url.as_str(), self.node_api_secret(), &req);*/
for query_chunk in query_params.chunks(chunk_size) {
let params = json!([query_chunk, null, null, false, false]);
let req = build_request("get_outputs", &params);
let task = async move {
let client = Client::new();
let params: Vec<_> = query_params
.chunks(chunk_size)
.map(|c| json!([c, null, null, false, false]))
.collect();
let mut reqs = Vec::with_capacity(params.len());
for p in &params {
reqs.push(build_request("get_outputs", p));
}
let mut tasks = Vec::with_capacity(params.len());
for req in &reqs {
tasks.push(client.post_async::<Request, Response>(
url.as_str(),
&req,
req,
self.node_api_secret(),
));
}
let task = stream::futures_unordered(tasks).collect();
let mut rt = Builder::new().core_threads(1).build().unwrap();
let res = rt.block_on(task);
let _ = rt.shutdown_now().wait();
let task: FuturesUnordered<_> = tasks.into_iter().collect();
task.try_collect().await
};
let mut rt = Builder::new()
.threaded_scheduler()
.enable_all()
.build()
.unwrap();
let res: Result<Vec<Response>, _> = rt.block_on(task);
let results: Vec<OutputPrintable> = match res {
Ok(resps) => {
let mut results = vec![];