Remove Iron dependency and update hyper to version 0.12 (#1241)

* Remove Iron dependecy and update hyper to version 0.12 #876

* REMOVE ME

* Revert "REMOVE ME"

This reverts commit e9a976eee98a2d5a4dfae5d9e1e4f5ed640c05d3.

* Rebase and start updating libwallet

Libwallet doesn't compile yet.

* Wallet compiles

* Grin compiles

* No compilation errors in tests

* All tests pass

* Reeturn future from handler

* Refactoring

* Fix lifetime issue one more time

I have to force push to rollback all the work done in last 2 days

* Fix wallet send issue

* Clean up
This commit is contained in:
e-max 2018-08-01 11:44:07 +02:00 committed by Yeastplume
parent b040aaa434
commit 25e3d9e7d3
21 changed files with 1434 additions and 1252 deletions

603
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -6,19 +6,21 @@ workspace = ".."
publish = false
[dependencies]
failure = "0.1"
failure_derive = "0.1"
hyper = "0.10"
iron = "0.5"
failure = "0.1.1"
failure_derive = "0.1.1"
hyper = "0.12"
lazy_static = "1"
mount = "0.3"
regex = "1"
router = "0.5"
serde = "1"
serde_derive = "1"
serde_json = "1"
slog = { version = "~2.2", features = ["max_level_trace", "release_max_level_trace"] }
urlencoded = "0.5" # 0.6+ lacks trait `plugin::Plugin<iron::Request<'_, '_>>`
tokio = "0.1.7"
tokio-core = "0.1.17"
http = "0.1.5"
futures = "0.1.21"
url = "1.7.0"
grin_core = { path = "../core" }
grin_chain = { path = "../chain" }

View file

@ -15,12 +15,15 @@
//! High level JSON/HTTP client API
use failure::{Fail, ResultExt};
use hyper;
use hyper::client::Response;
use hyper::status::{StatusClass, StatusCode};
use http::uri::{InvalidUri, Uri};
use hyper::header::{ACCEPT, USER_AGENT};
use hyper::rt::{Future, Stream};
use hyper::{Body, Client, Request};
use serde::{Deserialize, Serialize};
use serde_json;
use std::io::Read;
use futures::future::{err, ok, Either};
use tokio_core::reactor::Core;
use rest::{Error, ErrorKind};
@ -31,63 +34,104 @@ pub fn get<'a, T>(url: &'a str) -> Result<T, Error>
where
for<'de> T: Deserialize<'de>,
{
let client = hyper::Client::new();
let res = check_error(client.get(url).send())?;
serde_json::from_reader(res).map_err(|e| {
e.context(ErrorKind::Internal(
"Server returned invalid JSON".to_owned(),
)).into()
})
let uri = url.parse::<Uri>().map_err::<Error, _>(|e: InvalidUri| {
e.context(ErrorKind::Argument(format!("Invalid url {}", url)))
.into()
})?;
let req = Request::builder()
.method("GET")
.uri(uri)
.header(USER_AGENT, "grin-client")
.header(ACCEPT, "application/json")
.body(Body::empty())
.map_err(|_e| ErrorKind::RequestError("Bad request".to_owned()))?;
handle_request(req)
}
/// Helper function to easily issue a HTTP POST request with the provided JSON
/// object as body on a given URL that returns a JSON object. Handles request
/// building, JSON serialization and deserialization, and response code
/// checking.
pub fn post<'a, IN>(url: &'a str, input: &IN) -> Result<(), Error>
pub fn post<IN, OUT>(url: &str, input: &IN) -> Result<OUT, Error>
where
IN: Serialize,
for<'de> OUT: Deserialize<'de>,
{
let req = create_post_request(url, input)?;
handle_request(req)
}
/// Helper function to easily issue a HTTP POST request with the provided JSON
/// object as body on a given URL that returns nothing. Handles request
/// building, JSON serialization, and response code
/// checking.
pub fn post_no_ret<IN>(url: &str, input: &IN) -> Result<(), Error>
where
IN: Serialize,
{
let in_json = serde_json::to_string(input).context(ErrorKind::Internal(
"Could not serialize data to JSON".to_owned(),
))?;
let client = hyper::Client::new();
let _res = check_error(client.post(url).body(&mut in_json.as_bytes()).send())?;
let req = create_post_request(url, input)?;
send_request(req)?;
Ok(())
}
// convert hyper error and check for non success response codes
fn check_error(res: hyper::Result<Response>) -> Result<Response, Error> {
if let Err(e) = res {
return Err(
e.context(ErrorKind::Internal("Error during request".to_owned()))
.into(),
);
}
let mut response = res.unwrap();
match response.status.class() {
StatusClass::Success => Ok(response),
StatusClass::ServerError => Err(ErrorKind::Internal(format!(
"Server error: {}",
err_msg(&mut response)
)))?,
StatusClass::ClientError => if response.status == StatusCode::NotFound {
Err(ErrorKind::NotFound)?
} else {
Err(ErrorKind::Argument(format!(
"Argument error: {}",
err_msg(&mut response)
)))?
},
_ => Err(ErrorKind::Internal(format!("Unrecognized error.")))?,
}
fn create_post_request<IN>(url: &str, input: &IN) -> Result<Request<Body>, Error>
where
IN: Serialize,
{
let json = serde_json::to_string(input).context(ErrorKind::Internal(
"Could not serialize data to JSON".to_owned(),
))?;
let uri = url.parse::<Uri>().map_err::<Error, _>(|e: InvalidUri| {
e.context(ErrorKind::Argument(format!("Invalid url {}", url)))
.into()
})?;
Request::builder()
.method("POST")
.uri(uri)
.header(USER_AGENT, "grin-client")
.header(ACCEPT, "application/json")
.body(json.into())
.map_err::<Error, _>(|e| {
e.context(ErrorKind::RequestError("Bad request".to_owned()))
.into()
})
}
fn err_msg(resp: &mut Response) -> String {
let mut msg = String::new();
if let Err(_) = resp.read_to_string(&mut msg) {
"<no message>".to_owned()
} else {
msg
}
fn handle_request<T>(req: Request<Body>) -> Result<T, Error>
where
for<'de> T: Deserialize<'de>,
{
let data = send_request(req)?;
serde_json::from_str(&data).map_err(|e| {
e.context(ErrorKind::ResponseError("Cannot parse response".to_owned()))
.into()
})
}
fn send_request(req: Request<Body>) -> Result<String, Error> {
let mut event_loop = Core::new().unwrap();
let client = Client::new();
let task = client
.request(req)
.map_err(|_e| ErrorKind::RequestError("Cannot make request".to_owned()))
.and_then(|resp| {
if !resp.status().is_success() {
Either::A(err(ErrorKind::RequestError(
"Wrong response code".to_owned(),
)))
} else {
Either::B(
resp.into_body()
.map_err(|_e| {
ErrorKind::RequestError("Cannot read response body".to_owned())
})
.concat2()
.and_then(|ch| ok(String::from_utf8_lossy(&ch.to_vec()).to_string())),
)
}
});
Ok(event_loop.run(task)?)
}

View file

@ -12,16 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::io::Read;
use std::cell::RefCell;
use std::collections::HashMap;
use std::fmt::Debug;
use std::net::SocketAddr;
use std::sync::{Arc, RwLock, Weak};
use std::thread;
use failure::{Fail, ResultExt};
use iron::prelude::{IronError, IronResult, Plugin, Request, Response};
use iron::{status, Handler};
use serde::Serialize;
use failure::ResultExt;
use futures::future::{err, ok};
use futures::{Future, Stream};
use hyper::{Body, Request, Response, StatusCode};
use rest::{Error, ErrorKind};
use serde::{Deserialize, Serialize};
use serde_json;
use urlencoded::UrlEncodedQuery;
use chain;
use core::core::hash::{Hash, Hashed};
@ -31,10 +35,13 @@ use p2p;
use p2p::types::ReasonForBan;
use pool;
use regex::Regex;
use rest::{ApiServer, Error, ErrorKind};
use rest::*;
use router::{Handler, ResponseFuture, Router, RouterError};
use types::*;
use url::form_urlencoded;
use util;
use util::secp::pedersen::Commitment;
use util::{self, LOGGER};
use util::LOGGER;
// All handlers use `Weak` references instead of `Arc` to avoid cycles that
// can never be destroyed. These 2 functions are simple helpers to reduce the
@ -52,7 +59,7 @@ struct IndexHandler {
impl IndexHandler {}
impl Handler for IndexHandler {
fn handle(&self, _req: &mut Request) -> IronResult<Response> {
fn get(&self, _req: Request<Body>) -> ResponseFuture {
json_response_pretty(&self.list)
}
}
@ -90,14 +97,16 @@ impl OutputHandler {
Err(ErrorKind::NotFound)?
}
fn outputs_by_ids(&self, req: &mut Request) -> Vec<Output> {
let mut commitments: Vec<&str> = vec![];
if let Ok(params) = req.get_ref::<UrlEncodedQuery>() {
if let Some(ids) = params.get("id") {
for id in ids {
for id in id.split(",") {
commitments.push(id.clone());
}
fn outputs_by_ids(&self, req: &Request<Body>) -> Vec<Output> {
let mut commitments: Vec<String> = vec![];
let params = form_urlencoded::parse(req.uri().query().unwrap().as_bytes())
.into_owned()
.collect::<Vec<(String, String)>>();
for (k, id) in params {
if k == "id" {
for id in id.split(",") {
commitments.push(id.to_owned());
}
}
}
@ -106,7 +115,7 @@ impl OutputHandler {
let mut outputs: Vec<Output> = vec![];
for x in commitments {
if let Ok(output) = self.get_output(x) {
if let Ok(output) = self.get_output(&x) {
outputs.push(output);
}
}
@ -158,36 +167,41 @@ impl OutputHandler {
}
// returns outputs for a specified range of blocks
fn outputs_block_batch(&self, req: &mut Request) -> Vec<BlockOutputs> {
fn outputs_block_batch(&self, req: &Request<Body>) -> Vec<BlockOutputs> {
let mut commitments: Vec<Commitment> = vec![];
let mut start_height = 1;
let mut end_height = 1;
let mut include_rp = false;
if let Ok(params) = req.get_ref::<UrlEncodedQuery>() {
if let Some(ids) = params.get("id") {
for id in ids {
for id in id.split(",") {
if let Ok(x) = util::from_hex(String::from(id)) {
commitments.push(Commitment::from_vec(x));
}
let params = form_urlencoded::parse(req.uri().query().unwrap().as_bytes())
.into_owned()
.fold(HashMap::new(), |mut hm, (k, v)| {
hm.entry(k).or_insert(vec![]).push(v);
hm
});
if let Some(ids) = params.get("id") {
for id in ids {
for id in id.split(",") {
if let Ok(x) = util::from_hex(String::from(id)) {
commitments.push(Commitment::from_vec(x));
}
}
}
if let Some(heights) = params.get("start_height") {
for height in heights {
start_height = height.parse().unwrap();
}
}
if let Some(heights) = params.get("start_height") {
for height in heights {
start_height = height.parse().unwrap();
}
if let Some(heights) = params.get("end_height") {
for height in heights {
end_height = height.parse().unwrap();
}
}
if let Some(_) = params.get("include_rp") {
include_rp = true;
}
if let Some(heights) = params.get("end_height") {
for height in heights {
end_height = height.parse().unwrap();
}
}
if let Some(_) = params.get("include_rp") {
include_rp = true;
}
debug!(
LOGGER,
@ -211,16 +225,17 @@ impl OutputHandler {
}
impl Handler for OutputHandler {
fn handle(&self, req: &mut Request) -> IronResult<Response> {
let url = req.url.clone();
let mut path_elems = url.path();
if *path_elems.last().unwrap() == "" {
path_elems.pop();
}
match *path_elems.last().unwrap() {
"byids" => json_response(&self.outputs_by_ids(req)),
"byheight" => json_response(&self.outputs_block_batch(req)),
_ => Ok(Response::with((status::BadRequest, ""))),
fn get(&self, req: Request<Body>) -> ResponseFuture {
match req.uri()
.path()
.trim_right_matches("/")
.rsplit("/")
.next()
.unwrap()
{
"byids" => json_response(&self.outputs_by_ids(&req)),
"byheight" => json_response(&self.outputs_block_batch(&req)),
_ => response(StatusCode::BAD_REQUEST, ""),
}
}
}
@ -306,18 +321,20 @@ impl TxHashSetHandler {
}
impl Handler for TxHashSetHandler {
fn handle(&self, req: &mut Request) -> IronResult<Response> {
let url = req.url.clone();
let mut path_elems = url.path();
fn get(&self, req: Request<Body>) -> ResponseFuture {
let mut start_index = 1;
let mut max = 100;
let mut id = "";
if *path_elems.last().unwrap() == "" {
path_elems.pop();
}
let mut id = "".to_owned();
// TODO: probably need to set a reasonable max limit here
let mut last_n = 10;
if let Ok(params) = req.get_ref::<UrlEncodedQuery>() {
if let Some(query_string) = req.uri().query() {
let params = form_urlencoded::parse(query_string.as_bytes())
.into_owned()
.fold(HashMap::new(), |mut hm, (k, v)| {
hm.entry(k).or_insert(vec![]).push(v);
hm
});
if let Some(nums) = params.get("n") {
for num in nums {
if let Ok(n) = str::parse(num) {
@ -340,19 +357,26 @@ impl Handler for TxHashSetHandler {
}
}
if let Some(ids) = params.get("id") {
for i in ids {
id = i;
if !ids.is_empty() {
id = ids.last().unwrap().to_owned();
}
}
}
match *path_elems.last().unwrap() {
match req.uri()
.path()
.trim_right()
.trim_right_matches("/")
.rsplit("/")
.next()
.unwrap()
{
"roots" => json_response_pretty(&self.get_roots()),
"lastoutputs" => json_response_pretty(&self.get_last_n_output(last_n)),
"lastrangeproofs" => json_response_pretty(&self.get_last_n_rangeproof(last_n)),
"lastkernels" => json_response_pretty(&self.get_last_n_kernel(last_n)),
"outputs" => json_response_pretty(&self.outputs(start_index, max)),
"merkleproof" => json_response_pretty(&self.get_merkle_proof_for_output(id).unwrap()),
_ => Ok(Response::with((status::BadRequest, ""))),
"merkleproof" => json_response_pretty(&self.get_merkle_proof_for_output(&id).unwrap()),
_ => response(StatusCode::BAD_REQUEST, ""),
}
}
}
@ -362,7 +386,7 @@ pub struct PeersAllHandler {
}
impl Handler for PeersAllHandler {
fn handle(&self, _req: &mut Request) -> IronResult<Response> {
fn get(&self, _req: Request<Body>) -> ResponseFuture {
let peers = &w(&self.peers).all_peers();
json_response_pretty(&peers)
}
@ -373,7 +397,7 @@ pub struct PeersConnectedHandler {
}
impl Handler for PeersConnectedHandler {
fn handle(&self, _req: &mut Request) -> IronResult<Response> {
fn get(&self, _req: Request<Body>) -> ResponseFuture {
let mut peers = vec![];
for p in &w(&self.peers).connected_peers() {
let p = p.read().unwrap();
@ -385,62 +409,54 @@ impl Handler for PeersConnectedHandler {
}
/// Peer operations
/// GET /v1/peers/10.12.12.13
/// POST /v1/peers/10.12.12.13/ban
/// POST /v1/peers/10.12.12.13/unban
pub struct PeerPostHandler {
pub struct PeerHandler {
pub peers: Weak<p2p::Peers>,
}
impl Handler for PeerPostHandler {
fn handle(&self, req: &mut Request) -> IronResult<Response> {
let url = req.url.clone();
let mut path_elems = url.path();
if *path_elems.last().unwrap() == "" {
path_elems.pop();
impl Handler for PeerHandler {
fn get(&self, req: Request<Body>) -> ResponseFuture {
if let Ok(addr) = req.uri()
.path()
.trim_right_matches("/")
.rsplit("/")
.next()
.unwrap()
.parse()
{
match w(&self.peers).get_peer(addr) {
Ok(peer) => json_response(&peer),
Err(_) => response(StatusCode::BAD_REQUEST, ""),
}
} else {
response(
StatusCode::BAD_REQUEST,
format!("url unrecognized: {}", req.uri().path()),
)
}
match *path_elems.last().unwrap() {
}
fn post(&self, req: Request<Body>) -> ResponseFuture {
let mut path_elems = req.uri().path().trim_right_matches("/").rsplit("/");
match path_elems.next().unwrap() {
"ban" => {
path_elems.pop();
if let Ok(addr) = path_elems.last().unwrap().parse() {
if let Ok(addr) = path_elems.next().unwrap().parse() {
w(&self.peers).ban_peer(&addr, ReasonForBan::ManualBan);
Ok(Response::with((status::Ok, "")))
response(StatusCode::OK, "")
} else {
Ok(Response::with((status::BadRequest, "")))
response(StatusCode::BAD_REQUEST, "bad address to ban")
}
}
"unban" => {
path_elems.pop();
if let Ok(addr) = path_elems.last().unwrap().parse() {
if let Ok(addr) = path_elems.next().unwrap().parse() {
w(&self.peers).unban_peer(&addr);
Ok(Response::with((status::Ok, "")))
response(StatusCode::OK, "")
} else {
Ok(Response::with((status::BadRequest, "")))
response(StatusCode::BAD_REQUEST, "bad address to unban")
}
}
_ => Ok(Response::with((status::BadRequest, ""))),
}
}
}
/// Get details about a given peer
pub struct PeerGetHandler {
pub peers: Weak<p2p::Peers>,
}
impl Handler for PeerGetHandler {
fn handle(&self, req: &mut Request) -> IronResult<Response> {
let url = req.url.clone();
let mut path_elems = url.path();
if *path_elems.last().unwrap() == "" {
path_elems.pop();
}
if let Ok(addr) = path_elems.last().unwrap().parse() {
match w(&self.peers).get_peer(addr) {
Ok(peer) => json_response(&peer),
Err(_) => Ok(Response::with((status::BadRequest, ""))),
}
} else {
Ok(Response::with((status::BadRequest, "")))
_ => response(StatusCode::BAD_REQUEST, "unrecognized command"),
}
}
}
@ -459,7 +475,7 @@ impl StatusHandler {
}
impl Handler for StatusHandler {
fn handle(&self, _req: &mut Request) -> IronResult<Response> {
fn get(&self, _req: Request<Body>) -> ResponseFuture {
json_response(&self.get_status())
}
}
@ -477,7 +493,7 @@ impl ChainHandler {
}
impl Handler for ChainHandler {
fn handle(&self, _req: &mut Request) -> IronResult<Response> {
fn get(&self, _req: Request<Body>) -> ResponseFuture {
json_response(&self.get_tip())
}
}
@ -489,10 +505,10 @@ pub struct ChainValidationHandler {
}
impl Handler for ChainValidationHandler {
fn handle(&self, _req: &mut Request) -> IronResult<Response> {
fn get(&self, _req: Request<Body>) -> ResponseFuture {
// TODO - read skip_rproofs from query params
w(&self.chain).validate(true).unwrap();
Ok(Response::with((status::Ok, "{}")))
response(StatusCode::OK, "")
}
}
@ -504,9 +520,9 @@ pub struct ChainCompactHandler {
}
impl Handler for ChainCompactHandler {
fn handle(&self, _req: &mut Request) -> IronResult<Response> {
fn get(&self, _req: Request<Body>) -> ResponseFuture {
w(&self.chain).compact().unwrap();
Ok(Response::with((status::Ok, "{}")))
response(StatusCode::OK, "")
}
}
@ -529,7 +545,9 @@ impl HeaderHandler {
check_block_param(&input)?;
let vec = util::from_hex(input).unwrap();
let h = Hash::from_vec(&vec);
let header = w(&self.chain).get_block_header(&h).context(ErrorKind::NotFound)?;
let header = w(&self.chain)
.get_block_header(&h)
.context(ErrorKind::NotFound)?;
Ok(BlockHeaderPrintable::from_header(&header))
}
}
@ -573,58 +591,82 @@ impl BlockHandler {
}
fn check_block_param(input: &String) -> Result<(), Error> {
lazy_static! {
static ref RE: Regex = Regex::new(r"[0-9a-fA-F]{64}").unwrap();
}
if !RE.is_match(&input) {
return Err(ErrorKind::Argument(
"Not a valid hash or height.".to_owned(),
))?;
}
return Ok(())
lazy_static! {
static ref RE: Regex = Regex::new(r"[0-9a-fA-F]{64}").unwrap();
}
if !RE.is_match(&input) {
return Err(ErrorKind::Argument(
"Not a valid hash or height.".to_owned(),
))?;
}
return Ok(());
}
impl Handler for BlockHandler {
fn handle(&self, req: &mut Request) -> IronResult<Response> {
let url = req.url.clone();
let mut path_elems = url.path();
if *path_elems.last().unwrap() == "" {
path_elems.pop();
}
let el = *path_elems.last().unwrap();
let h = self.parse_input(el.to_string())
.map_err(|e| IronError::new(Fail::compat(e), status::BadRequest))?;
fn get(&self, req: Request<Body>) -> ResponseFuture {
let el = req.uri()
.path()
.trim_right_matches("/")
.rsplit("/")
.next()
.unwrap();
let mut compact = false;
if let Ok(params) = req.get_ref::<UrlEncodedQuery>() {
if let Some(_) = params.get("compact") {
compact = true;
let h = self.parse_input(el.to_string());
if h.is_err() {
error!(LOGGER, "block_handler: bad parameter {}", el.to_string());
return response(StatusCode::BAD_REQUEST, "");
}
let h = h.unwrap();
if let Some(param) = req.uri().query() {
if param == "compact" {
match self.get_compact_block(&h) {
Ok(b) => json_response(&b),
Err(_) => {
error!(LOGGER, "block_handler: can not get compact block {}", h);
response(StatusCode::INTERNAL_SERVER_ERROR, "")
}
}
} else {
debug!(
LOGGER,
"block_handler: unsupported query parameter {}", param
);
response(StatusCode::BAD_REQUEST, "")
}
}
if compact {
let b = self.get_compact_block(&h)
.map_err(|e| IronError::new(Fail::compat(e), status::InternalServerError))?;
json_response(&b)
} else {
let b = self.get_block(&h)
.map_err(|e| IronError::new(Fail::compat(e), status::InternalServerError))?;
json_response(&b)
match self.get_block(&h) {
Ok(b) => json_response(&b),
Err(_) => {
error!(LOGGER, "block_handler: can not get block {}", h);
response(StatusCode::INTERNAL_SERVER_ERROR, "")
}
}
}
}
}
impl Handler for HeaderHandler {
fn handle(&self, req: &mut Request) -> IronResult<Response> {
let url = req.url.clone();
let mut path_elems = url.path();
if *path_elems.last().unwrap() == "" {
path_elems.pop();
fn get(&self, req: Request<Body>) -> ResponseFuture {
let el = req.uri()
.path()
.trim_right_matches("/")
.rsplit("/")
.next()
.unwrap();
match self.get_header(el.to_string()) {
Ok(h) => json_response(&h),
Err(_) => {
error!(
LOGGER,
"header_handler: can not get header {}",
el.to_string()
);
response(StatusCode::INTERNAL_SERVER_ERROR, "")
}
}
let el = *path_elems.last().unwrap();
let h = self.get_header(el.to_string())
.map_err(|e| IronError::new(Fail::compat(e), status::InternalServerError))?;
json_response(&h)
}
}
@ -635,9 +677,9 @@ struct PoolInfoHandler<T> {
impl<T> Handler for PoolInfoHandler<T>
where
T: pool::BlockChain + Send + Sync + 'static,
T: pool::BlockChain + Send + Sync,
{
fn handle(&self, _req: &mut Request) -> IronResult<Response> {
fn get(&self, _req: Request<Body>) -> ResponseFuture {
let pool_arc = w(&self.tx_pool);
let pool = pool_arc.read().unwrap();
@ -658,78 +700,106 @@ struct PoolPushHandler<T> {
tx_pool: Weak<RwLock<pool::TransactionPool<T>>>,
}
impl<T> PoolPushHandler<T>
where
T: pool::BlockChain + Send + Sync + 'static,
{
fn update_pool(&self, req: Request<Body>) -> Box<Future<Item = (), Error = Error> + Send> {
let params = match req.uri().query() {
Some(query_string) => form_urlencoded::parse(query_string.as_bytes())
.into_owned()
.fold(HashMap::new(), |mut hm, (k, v)| {
hm.entry(k).or_insert(vec![]).push(v);
hm
}),
None => HashMap::new(),
};
let fluff = params.get("fluff").is_some();
let pool_arc = w(&self.tx_pool).clone();
Box::new(
parse_body(req)
.and_then(move |wrapper: TxWrapper| {
util::from_hex(wrapper.tx_hex)
.map_err(|_| ErrorKind::RequestError("Bad request".to_owned()).into())
})
.and_then(move |tx_bin| {
ser::deserialize(&mut &tx_bin[..])
.map_err(|_| ErrorKind::RequestError("Bad request".to_owned()).into())
})
.and_then(move |tx: Transaction| {
let source = pool::TxSource {
debug_name: "push-api".to_string(),
identifier: "?.?.?.?".to_string(),
};
info!(
LOGGER,
"Pushing transaction with {} inputs and {} outputs to pool.",
tx.inputs.len(),
tx.outputs.len()
);
// Push to tx pool.
let mut tx_pool = pool_arc.write().unwrap();
tx_pool
.add_to_pool(source, tx, !fluff)
.map_err(|_| ErrorKind::RequestError("Bad request".to_owned()).into())
}),
)
}
}
impl<T> Handler for PoolPushHandler<T>
where
T: pool::BlockChain + Send + Sync + 'static,
{
fn handle(&self, req: &mut Request) -> IronResult<Response> {
let wrapper: TxWrapper = serde_json::from_reader(req.body.by_ref())
.map_err(|e| IronError::new(Fail::compat(e), status::BadRequest))?;
let tx_bin = util::from_hex(wrapper.tx_hex)
.map_err(|e| IronError::new(Fail::compat(e), status::BadRequest))?;
let tx: Transaction = ser::deserialize(&mut &tx_bin[..])
.map_err(|e| IronError::new(Fail::compat(e), status::BadRequest))?;
let source = pool::TxSource {
debug_name: "push-api".to_string(),
identifier: "?.?.?.?".to_string(),
};
info!(
LOGGER,
"Pushing transaction with {} inputs and {} outputs to pool.",
tx.inputs.len(),
tx.outputs.len()
);
let mut fluff = false;
if let Ok(params) = req.get_ref::<UrlEncodedQuery>() {
if let Some(_) = params.get("fluff") {
fluff = true;
}
}
// Push to tx pool.
let res = {
let pool_arc = w(&self.tx_pool);
let mut tx_pool = pool_arc.write().unwrap();
tx_pool.add_to_pool(source, tx, !fluff)
};
match res {
Ok(()) => Ok(Response::with(status::Ok)),
Err(e) => {
debug!(LOGGER, "error - {:?}", e);
Err(IronError::new(Fail::compat(e), status::BadRequest))
}
}
fn post(&self, req: Request<Body>) -> ResponseFuture {
Box::new(
self.update_pool(req)
.and_then(|_| ok(just_response(StatusCode::OK, "")))
.or_else(|_| ok(just_response(StatusCode::BAD_REQUEST, ""))),
)
}
}
// Utility to serialize a struct into JSON and produce a sensible IronResult
// Utility to serialize a struct into JSON and produce a sensible Response
// out of it.
fn json_response<T>(s: &T) -> IronResult<Response>
fn json_response<T>(s: &T) -> ResponseFuture
where
T: Serialize,
{
match serde_json::to_string(s) {
Ok(json) => Ok(Response::with((status::Ok, json))),
Err(_) => Ok(Response::with((status::InternalServerError, ""))),
Ok(json) => response(StatusCode::OK, json),
Err(_) => response(StatusCode::INTERNAL_SERVER_ERROR, ""),
}
}
// pretty-printed version of above
fn json_response_pretty<T>(s: &T) -> IronResult<Response>
fn json_response_pretty<T>(s: &T) -> ResponseFuture
where
T: Serialize,
{
match serde_json::to_string_pretty(s) {
Ok(json) => Ok(Response::with((status::Ok, json))),
Err(_) => Ok(Response::with((status::InternalServerError, ""))),
Ok(json) => response(StatusCode::OK, json),
Err(_) => response(StatusCode::INTERNAL_SERVER_ERROR, ""),
}
}
/// Start all server HTTP handlers. Register all of them with Iron
fn response<T: Into<Body> + Debug>(status: StatusCode, text: T) -> ResponseFuture {
Box::new(ok(just_response(status, text)))
}
fn just_response<T: Into<Body> + Debug>(status: StatusCode, text: T) -> Response<Body> {
debug!(LOGGER, "HTTP API -> status: {}, text: {:?}", status, text);
let mut resp = Response::new(text.into());
*resp.status_mut() = status;
resp
}
thread_local!( static ROUTER: RefCell<Option<Router>> = RefCell::new(None) );
/// Start all server HTTP handlers. Register all of them with Router
/// and runs the corresponding HTTP server.
///
/// Hyper currently has a bug that prevents clean shutdown. In order
@ -748,98 +818,132 @@ pub fn start_rest_apis<T>(
let _ = thread::Builder::new()
.name("apis".to_string())
.spawn(move || {
// build handlers and register them under the appropriate endpoint
let output_handler = OutputHandler {
chain: chain.clone(),
};
let block_handler = BlockHandler {
chain: chain.clone(),
};
let header_handler = HeaderHandler {
chain: chain.clone(),
};
let chain_tip_handler = ChainHandler {
chain: chain.clone(),
};
let chain_compact_handler = ChainCompactHandler {
chain: chain.clone(),
};
let chain_validation_handler = ChainValidationHandler {
chain: chain.clone(),
};
let status_handler = StatusHandler {
chain: chain.clone(),
peers: peers.clone(),
};
let txhashset_handler = TxHashSetHandler {
chain: chain.clone(),
};
let pool_info_handler = PoolInfoHandler {
tx_pool: tx_pool.clone(),
};
let pool_push_handler = PoolPushHandler {
tx_pool: tx_pool.clone(),
};
let peers_all_handler = PeersAllHandler {
peers: peers.clone(),
};
let peers_connected_handler = PeersConnectedHandler {
peers: peers.clone(),
};
let peer_post_handler = PeerPostHandler {
peers: peers.clone(),
};
let peer_get_handler = PeerGetHandler {
peers: peers.clone(),
};
let mut apis = ApiServer::new();
let route_list = vec![
"get blocks".to_string(),
"get chain".to_string(),
"get chain/compact".to_string(),
"get chain/validate".to_string(),
"get chain/outputs".to_string(),
"post chain/height-index".to_string(),
"get status".to_string(),
"get txhashset/roots".to_string(),
"get txhashset/lastoutputs?n=10".to_string(),
"get txhashset/lastrangeproofs".to_string(),
"get txhashset/lastkernels".to_string(),
"get txhashset/outputs?start_index=1&max=100".to_string(),
"get pool".to_string(),
"post pool/push".to_string(),
"post peers/a.b.c.d:p/ban".to_string(),
"post peers/a.b.c.d:p/unban".to_string(),
"get peers/all".to_string(),
"get peers/connected".to_string(),
"get peers/a.b.c.d".to_string(),
];
let index_handler = IndexHandler { list: route_list };
ROUTER.with(|router| {
*router.borrow_mut() =
Some(build_router(chain, tx_pool, peers).expect("unbale to build API router"));
let router = router!(
index: get "/" => index_handler,
blocks: get "/blocks/*" => block_handler,
headers: get "/headers/*" => header_handler,
chain_tip: get "/chain" => chain_tip_handler,
chain_compact: get "/chain/compact" => chain_compact_handler,
chain_validate: get "/chain/validate" => chain_validation_handler,
chain_outputs: get "/chain/outputs/*" => output_handler,
status: get "/status" => status_handler,
txhashset_roots: get "/txhashset/*" => txhashset_handler,
pool_info: get "/pool" => pool_info_handler,
pool_push: post "/pool/push" => pool_push_handler,
peers_all: get "/peers/all" => peers_all_handler,
peers_connected: get "/peers/connected" => peers_connected_handler,
peer: post "/peers/*" => peer_post_handler,
peer: get "/peers/*" => peer_get_handler
);
let mut apis = ApiServer::new("/v1".to_string());
apis.register_handler(router);
info!(LOGGER, "Starting HTTP API server at {}.", addr);
apis.start(&addr[..]).unwrap_or_else(|e| {
error!(LOGGER, "Failed to start API HTTP server: {}.", e);
info!(LOGGER, "Starting HTTP API server at {}.", addr);
let socket_addr: SocketAddr = addr.parse().expect("unable to parse socket address");
apis.start(socket_addr, &handle).unwrap_or_else(|e| {
error!(LOGGER, "Failed to start API HTTP server: {}.", e);
});
});
});
}
pub fn handle(req: Request<Body>) -> ResponseFuture {
ROUTER.with(|router| match *router.borrow() {
Some(ref h) => h.handle(req),
None => {
error!(LOGGER, "No HTTP API router configured");
response(StatusCode::INTERNAL_SERVER_ERROR, "No router configured")
}
})
}
fn parse_body<T>(req: Request<Body>) -> Box<Future<Item = T, Error = Error> + Send>
where
for<'de> T: Deserialize<'de> + Send + 'static,
{
Box::new(
req.into_body()
.concat2()
.map_err(|_e| ErrorKind::RequestError("Failed to read request".to_owned()).into())
.and_then(|body| match serde_json::from_reader(&body.to_vec()[..]) {
Ok(obj) => ok(obj),
Err(_) => err(ErrorKind::RequestError("Invalid request body".to_owned()).into()),
}),
)
}
pub fn build_router<T>(
chain: Weak<chain::Chain>,
tx_pool: Weak<RwLock<pool::TransactionPool<T>>>,
peers: Weak<p2p::Peers>,
) -> Result<Router, RouterError>
where
T: pool::BlockChain + Send + Sync + 'static,
{
let route_list = vec![
"get blocks".to_string(),
"get chain".to_string(),
"get chain/compact".to_string(),
"get chain/validate".to_string(),
"get chain/outputs".to_string(),
"get status".to_string(),
"get txhashset/roots".to_string(),
"get txhashset/lastoutputs?n=10".to_string(),
"get txhashset/lastrangeproofs".to_string(),
"get txhashset/lastkernels".to_string(),
"get txhashset/outputs?start_index=1&max=100".to_string(),
"get pool".to_string(),
"post pool/push".to_string(),
"post peers/a.b.c.d:p/ban".to_string(),
"post peers/a.b.c.d:p/unban".to_string(),
"get peers/all".to_string(),
"get peers/connected".to_string(),
"get peers/a.b.c.d".to_string(),
];
let index_handler = IndexHandler { list: route_list };
let output_handler = OutputHandler {
chain: chain.clone(),
};
let block_handler = BlockHandler {
chain: chain.clone(),
};
let header_handler = HeaderHandler {
chain: chain.clone(),
};
let chain_tip_handler = ChainHandler {
chain: chain.clone(),
};
let chain_compact_handler = ChainCompactHandler {
chain: chain.clone(),
};
let chain_validation_handler = ChainValidationHandler {
chain: chain.clone(),
};
let status_handler = StatusHandler {
chain: chain.clone(),
peers: peers.clone(),
};
let txhashset_handler = TxHashSetHandler {
chain: chain.clone(),
};
let pool_info_handler = PoolInfoHandler {
tx_pool: tx_pool.clone(),
};
let pool_push_handler = PoolPushHandler {
tx_pool: tx_pool.clone(),
};
let peers_all_handler = PeersAllHandler {
peers: peers.clone(),
};
let peers_connected_handler = PeersConnectedHandler {
peers: peers.clone(),
};
let peer_handler = PeerHandler {
peers: peers.clone(),
};
let mut router = Router::new();
router.add_route("/v1/", Box::new(index_handler))?;
router.add_route("/v1/blocks/*", Box::new(block_handler))?;
router.add_route("/v1/headers/*", Box::new(header_handler))?;
router.add_route("/v1/chain", Box::new(chain_tip_handler))?;
router.add_route("/v1/chain/outputs/*", Box::new(output_handler))?;
router.add_route("/v1/chain/compact", Box::new(chain_compact_handler))?;
router.add_route("/v1/chain/validate", Box::new(chain_validation_handler))?;
router.add_route("/v1/txhashset/*", Box::new(txhashset_handler))?;
router.add_route("/v1/status", Box::new(status_handler))?;
router.add_route("/v1/pool", Box::new(pool_info_handler))?;
router.add_route("/v1/pool/push", Box::new(pool_push_handler))?;
router.add_route("/v1/peers/all", Box::new(peers_all_handler))?;
router.add_route("/v1/peers/connected", Box::new(peers_connected_handler))?;
router.add_route("/v1/peers/**", Box::new(peer_handler))?;
Ok(router)
}

View file

@ -18,31 +18,34 @@ extern crate grin_p2p as p2p;
extern crate grin_pool as pool;
extern crate grin_store as store;
extern crate grin_util as util;
extern crate url;
extern crate failure;
#[macro_use]
extern crate failure_derive;
extern crate hyper;
extern crate iron;
#[macro_use]
extern crate lazy_static;
extern crate mount;
extern crate regex;
#[macro_use]
extern crate router;
extern crate serde;
#[macro_use]
extern crate serde_derive;
extern crate serde_json;
#[macro_use]
extern crate slog;
extern crate urlencoded;
extern crate futures;
extern crate http;
extern crate tokio;
extern crate tokio_core;
pub mod client;
mod handlers;
mod rest;
mod router;
mod types;
pub use handlers::start_rest_apis;
pub use rest::*;
pub use router::*;
pub use types::*;

View file

@ -18,20 +18,17 @@
//! To use it, just have your service(s) implement the ApiEndpoint trait and
//! register them on a ApiServer.
use hyper::rt::Future;
use hyper::service::service_fn;
use hyper::{Body, Request, Server};
use router::ResponseFuture;
use std::fmt::{self, Display};
use std::mem;
use std::net::ToSocketAddrs;
use std::string::ToString;
use std::net::SocketAddr;
use tokio::runtime::current_thread::Runtime;
use failure::{Backtrace, Context, Fail};
use iron::Listening;
use iron::middleware::Handler;
use iron::prelude::Iron;
use mount::Mount;
use router::Router;
/// Errors that can be returned by an ApiEndpoint implementation.
#[derive(Debug)]
pub struct Error {
inner: Context<ErrorKind>,
@ -45,6 +42,10 @@ pub enum ErrorKind {
Argument(String),
#[fail(display = "Not found.")]
NotFound,
#[fail(display = "Request error: {}", _0)]
RequestError(String),
#[fail(display = "ResponseError error: {}", _0)]
ResponseError(String),
}
impl Fail for Error {
@ -84,45 +85,37 @@ impl From<Context<ErrorKind>> for Error {
}
/// HTTP server allowing the registration of ApiEndpoint implementations.
pub struct ApiServer {
root: String,
router: Router,
mount: Mount,
server_listener: Option<Listening>,
}
pub struct ApiServer {}
impl ApiServer {
/// Creates a new ApiServer that will serve ApiEndpoint implementations
/// under the root URL.
pub fn new(root: String) -> ApiServer {
ApiServer {
root: root,
router: Router::new(),
mount: Mount::new(),
server_listener: None,
}
pub fn new() -> ApiServer {
ApiServer {}
}
/// Starts the ApiServer at the provided address.
pub fn start<A: ToSocketAddrs>(&mut self, addr: A) -> Result<(), String> {
// replace this value to satisfy borrow checker
let r = mem::replace(&mut self.router, Router::new());
let mut m = mem::replace(&mut self.mount, Mount::new());
m.mount("/", r);
let result = Iron::new(m).http(addr);
let return_value = result.as_ref().map(|_| ()).map_err(|e| e.to_string());
self.server_listener = Some(result.unwrap());
return_value
pub fn start<F>(&mut self, addr: SocketAddr, f: &'static F) -> Result<(), String>
where
F: Fn(Request<Body>) -> ResponseFuture + Send + Sync + 'static,
{
let server = Server::bind(&addr)
.serve(move || service_fn(f))
.map_err(|e| eprintln!("server error: {}", e));
let mut rt = Runtime::new().unwrap();
if rt.block_on(server).is_err() {
return Err("tokio block_on error".to_owned());
}
Ok(())
}
/// Stops the API server
pub fn stop(&mut self) {
let r = mem::replace(&mut self.server_listener, None);
r.unwrap().close().unwrap();
}
/// Registers an iron handler (via mount)
pub fn register_handler<H: Handler>(&mut self, handler: H) -> &mut Mount {
self.mount.mount(&self.root, handler)
// TODO implement proper stop, the following method doesn't
// work for current_thread runtime.
// if let Some(rt) = self.rt.take() {
// rt.shutdown_now().wait().unwrap();
// }
}
}

309
api/src/router.rs Normal file
View file

@ -0,0 +1,309 @@
use futures::future;
use hyper;
use hyper::rt::Future;
use hyper::{Body, Method, Request, Response, StatusCode};
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use util::LOGGER;
lazy_static! {
static ref WILDCARD_HASH: u64 = calculate_hash(&"*");
static ref WILDCARD_STOP_HASH: u64 = calculate_hash(&"**");
}
pub type ResponseFuture = Box<Future<Item = Response<Body>, Error = hyper::Error> + Send>;
pub trait Handler {
fn get(&self, _req: Request<Body>) -> ResponseFuture {
not_found()
}
fn post(&self, _req: Request<Body>) -> ResponseFuture {
not_found()
}
fn put(&self, _req: Request<Body>) -> ResponseFuture {
not_found()
}
fn patch(&self, _req: Request<Body>) -> ResponseFuture {
not_found()
}
fn delete(&self, _req: Request<Body>) -> ResponseFuture {
not_found()
}
fn head(&self, _req: Request<Body>) -> ResponseFuture {
not_found()
}
fn options(&self, _req: Request<Body>) -> ResponseFuture {
not_found()
}
fn trace(&self, _req: Request<Body>) -> ResponseFuture {
not_found()
}
fn connect(&self, _req: Request<Body>) -> ResponseFuture {
not_found()
}
}
#[derive(Fail, Debug)]
pub enum RouterError {
#[fail(display = "Route already exists")]
RouteAlreadyExists,
#[fail(display = "Route not found")]
RouteNotFound,
#[fail(display = "Value not found")]
NoValue,
}
#[derive(Clone)]
pub struct Router {
nodes: Vec<Node>,
}
#[derive(Debug, Clone, Copy)]
struct NodeId(usize);
const MAX_CHILDREN: usize = 16;
type HandlerObj = Box<Handler>;
#[derive(Clone)]
struct Node {
key: u64,
value: Option<Arc<HandlerObj>>,
children: [NodeId; MAX_CHILDREN],
children_count: usize,
}
impl Router {
pub fn new() -> Router {
let root = Node::new(calculate_hash(&""), None);
let mut nodes = vec![];
nodes.push(root);
Router { nodes }
}
fn root(&self) -> NodeId {
NodeId(0)
}
fn node(&self, id: NodeId) -> &Node {
&self.nodes[id.0]
}
fn node_mut(&mut self, id: NodeId) -> &mut Node {
&mut self.nodes[id.0]
}
fn find(&self, parent: NodeId, key: u64) -> Option<NodeId> {
let node = self.node(parent);
node.children
.iter()
.find(|&id| {
let node_key = self.node(*id).key;
node_key == key || node_key == *WILDCARD_HASH || node_key == *WILDCARD_STOP_HASH
})
.cloned()
}
fn add_empty_node(&mut self, parent: NodeId, key: u64) -> NodeId {
let id = NodeId(self.nodes.len());
self.nodes.push(Node::new(key, None));
self.node_mut(parent).add_child(id);
id
}
pub fn add_route(&mut self, route: &'static str, value: HandlerObj) -> Result<(), RouterError> {
let keys = generate_path(route);
let mut node_id = self.root();
for key in keys {
node_id = self.find(node_id, key)
.unwrap_or_else(|| self.add_empty_node(node_id, key));
}
match self.node(node_id).value() {
None => {
self.node_mut(node_id).set_value(value);
Ok(())
}
Some(_) => Err(RouterError::RouteAlreadyExists),
}
}
pub fn get(&self, path: &str) -> Result<Arc<HandlerObj>, RouterError> {
let keys = generate_path(path);
let mut node_id = self.root();
for key in keys {
node_id = self.find(node_id, key).ok_or(RouterError::RouteNotFound)?;
if self.node(node_id).key == *WILDCARD_STOP_HASH {
debug!(LOGGER, "ROUTER stop card");
break;
}
}
self.node(node_id).value().ok_or(RouterError::NoValue)
}
pub fn handle(&self, req: Request<Body>) -> ResponseFuture {
match self.get(req.uri().path()) {
Err(_) => not_found(),
Ok(h) => match req.method() {
&Method::GET => h.get(req),
&Method::POST => h.post(req),
&Method::PUT => h.put(req),
&Method::DELETE => h.delete(req),
&Method::PATCH => h.patch(req),
&Method::OPTIONS => h.options(req),
&Method::CONNECT => h.connect(req),
&Method::TRACE => h.trace(req),
&Method::HEAD => h.head(req),
_ => not_found(),
},
}
}
}
impl Node {
fn new(key: u64, value: Option<Arc<HandlerObj>>) -> Node {
Node {
key,
value,
children: [NodeId(0); MAX_CHILDREN],
children_count: 0,
}
}
fn value(&self) -> Option<Arc<HandlerObj>> {
match &self.value {
None => None,
Some(v) => Some(v.clone()),
}
}
fn set_value(&mut self, value: HandlerObj) {
self.value = Some(Arc::new(value));
}
fn add_child(&mut self, child_id: NodeId) {
if self.children_count == MAX_CHILDREN {
panic!("Can't add a route, children limit exceeded");
}
self.children[self.children_count] = child_id;
self.children_count += 1;
}
}
pub fn not_found() -> ResponseFuture {
let mut response = Response::new(Body::empty());
*response.status_mut() = StatusCode::NOT_FOUND;
Box::new(future::ok(response))
}
fn calculate_hash<T: Hash>(t: &T) -> u64 {
let mut s = DefaultHasher::new();
t.hash(&mut s);
s.finish()
}
fn generate_path(route: &str) -> Vec<u64> {
route
.split('/')
.skip(1)
.map(|path| calculate_hash(&path))
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::prelude::future::ok;
use tokio_core::reactor::Core;
struct HandlerImpl(u16);
impl Handler for HandlerImpl {
fn get(&self, _req: Request<Body>) -> ResponseFuture {
Box::new(future::ok(
Response::builder()
.status(self.0)
.body(Body::default())
.unwrap(),
))
}
}
#[test]
fn test_add_route() {
let mut routes = Router::new();
routes
.add_route("/v1/users", Box::new(HandlerImpl(1)))
.unwrap();
assert!(
routes
.add_route("/v1/users", Box::new(HandlerImpl(2)))
.is_err()
);
routes
.add_route("/v1/users/xxx", Box::new(HandlerImpl(3)))
.unwrap();
routes
.add_route("/v1/users/xxx/yyy", Box::new(HandlerImpl(3)))
.unwrap();
routes
.add_route("/v1/zzz/*", Box::new(HandlerImpl(3)))
.unwrap();
assert!(
routes
.add_route("/v1/zzz/ccc", Box::new(HandlerImpl(2)))
.is_err()
);
routes
.add_route("/v1/zzz/*/zzz", Box::new(HandlerImpl(6)))
.unwrap();
}
#[test]
fn test_get() {
let mut routes = Router::new();
routes
.add_route("/v1/users", Box::new(HandlerImpl(101)))
.unwrap();
routes
.add_route("/v1/users/xxx", Box::new(HandlerImpl(103)))
.unwrap();
routes
.add_route("/v1/users/xxx/yyy", Box::new(HandlerImpl(103)))
.unwrap();
routes
.add_route("/v1/zzz/*", Box::new(HandlerImpl(103)))
.unwrap();
routes
.add_route("/v1/zzz/*/zzz", Box::new(HandlerImpl(106)))
.unwrap();
let call_handler = |url| {
let mut event_loop = Core::new().unwrap();
let task = routes
.get(url)
.unwrap()
.get(Request::new(Body::default()))
.and_then(|resp| ok(resp.status().as_u16()));
event_loop.run(task).unwrap()
};
assert_eq!(call_handler("/v1/users"), 101);
assert_eq!(call_handler("/v1/users/xxx"), 103);
assert!(routes.get("/v1/users/yyy").is_err());
assert_eq!(call_handler("/v1/users/xxx/yyy"), 103);
assert!(routes.get("/v1/zzz").is_err());
assert_eq!(call_handler("/v1/zzz/1"), 103);
assert_eq!(call_handler("/v1/zzz/2"), 103);
assert_eq!(call_handler("/v1/zzz/2/zzz"), 106);
}
}

View file

@ -27,8 +27,10 @@ use util::LOGGER;
use peer::Peer;
use store::{PeerData, PeerStore, State};
use types::{Capabilities, ChainAdapter, Direction, Error, NetAdapter, P2PConfig, ReasonForBan,
TxHashSetRead, MAX_PEER_ADDRS};
use types::{
Capabilities, ChainAdapter, Direction, Error, NetAdapter, P2PConfig, ReasonForBan,
TxHashSetRead, MAX_PEER_ADDRS,
};
pub struct Peers {
pub adapter: Arc<ChainAdapter>,
@ -615,17 +617,8 @@ impl ChainAdapter for Peers {
self.adapter.txhashset_receive_ready()
}
fn txhashset_write(
&self,
h: Hash,
txhashset_data: File,
peer_addr: SocketAddr,
) -> bool {
if !self.adapter.txhashset_write(
h,
txhashset_data,
peer_addr,
) {
fn txhashset_write(&self, h: Hash, txhashset_data: File, peer_addr: SocketAddr) -> bool {
if !self.adapter.txhashset_write(h, txhashset_data, peer_addr) {
debug!(
LOGGER,
"Received a bad txhashset data from {}, the peer will be banned", &peer_addr

View file

@ -20,8 +20,10 @@ use std::sync::Arc;
use conn::{Message, MessageHandler, Response};
use core::core;
use core::core::hash::{Hash, Hashed};
use msg::{BanReason, GetPeerAddrs, Headers, Locator, PeerAddrs, Ping, Pong, SockAddr,
TxHashSetArchive, TxHashSetRequest, Type};
use msg::{
BanReason, GetPeerAddrs, Headers, Locator, PeerAddrs, Ping, Pong, SockAddr, TxHashSetArchive,
TxHashSetRequest, Type,
};
use rand::{self, Rng};
use types::{Error, NetAdapter};
use util::LOGGER;
@ -241,21 +243,19 @@ impl MessageHandler for Protocol {
);
return Err(Error::BadMessage);
}
let mut tmp = env::temp_dir();
tmp.push("txhashset.zip");
let mut save_txhashset_to_file = |file| -> Result<(), Error> {
let mut save_txhashset_to_file = |file| -> Result<(), Error> {
let mut tmp_zip = File::create(file)?;
msg.copy_attachment(sm_arch.bytes as usize, &mut tmp_zip)?;
tmp_zip.sync_all()?;
Ok(())
};
if let Err(e) = save_txhashset_to_file(tmp.clone()){
if let Err(e) = save_txhashset_to_file(tmp.clone()) {
error!(
LOGGER,
"handle_payload: txhashset archive save to file fail. err={:?}",
e
"handle_payload: txhashset archive save to file fail. err={:?}", e
);
return Err(e);
}
@ -267,11 +267,8 @@ impl MessageHandler for Protocol {
);
let tmp_zip = File::open(tmp)?;
let res = self.adapter.txhashset_write(
sm_arch.hash,
tmp_zip,
self.addr,
);
let res = self.adapter
.txhashset_write(sm_arch.hash, tmp_zip, self.addr);
debug!(
LOGGER,

View file

@ -254,12 +254,7 @@ impl ChainAdapter for DummyAdapter {
false
}
fn txhashset_write(
&self,
_h: Hash,
_txhashset_data: File,
_peer_addr: SocketAddr,
) -> bool {
fn txhashset_write(&self, _h: Hash, _txhashset_data: File, _peer_addr: SocketAddr) -> bool {
false
}
}

View file

@ -6,11 +6,10 @@ workspace = ".."
publish = false
[dependencies]
hyper = "0.10"
hyper = "0.12"
itertools = "0.7"
lmdb-zero = "0.4.4"
rand = "0.3"
router = "0.6"
slog = { version = "~2.2", features = ["max_level_trace", "release_max_level_trace"] }
serde = "1"
serde_derive = "1"

View file

@ -16,7 +16,6 @@
//! a mining worker implementation
//!
use std::io::Read;
use std::net::{SocketAddr, ToSocketAddrs};
use std::str;
use std::sync::atomic::{AtomicBool, Ordering};
@ -26,7 +25,7 @@ use std::time;
use chrono::prelude::{Utc};
use chrono::Duration;
use hyper;
use api;
use p2p;
use pool::DandelionConfig;
@ -35,7 +34,7 @@ use util::LOGGER;
const SEEDS_URL: &'static str = "http://grin-tech.org/seeds.txt";
// DNS Seeds with contact email associated
const DNS_SEEDS: &'static [&'static str] = &[
"t3.seed.grin-tech.org", // igno.peverell@protonmail.com
"t3.seed.grin-tech.org", // igno.peverell@protonmail.com
];
pub fn connect_and_monitor(
@ -268,22 +267,7 @@ pub fn dns_seeds() -> Box<Fn() -> Vec<SocketAddr> + Send> {
/// http. Easy method until we have a set of DNS names we can rely on.
pub fn web_seeds() -> Box<Fn() -> Vec<SocketAddr> + Send> {
Box::new(|| {
let client = hyper::Client::new();
debug!(LOGGER, "Retrieving seed nodes from {}", &SEEDS_URL);
// http get, filtering out non 200 results
let mut res = client
.get(SEEDS_URL)
.send()
.expect("Failed to resolve seeds.");
if res.status != hyper::Ok {
panic!("Failed to resolve seeds, got status {}.", res.status);
}
let mut buf = vec![];
res.read_to_end(&mut buf)
.expect("Could not read seed list.");
let text = str::from_utf8(&buf[..]).expect("Corrupted seed list.");
let text: String = api::client::get(SEEDS_URL).expect("Failed to resolve seeds");
let addrs = text.split_whitespace()
.map(|s| s.parse().unwrap())
.collect::<Vec<_>>();

View file

@ -188,15 +188,19 @@ fn test_p2p() {
let base_addr = server_config_one.base_addr;
let api_server_port = server_config_one.api_server_port;
// Check that when we get peer connected the peer is here
let peers_connected = get_connected_peers(&base_addr, api_server_port);
assert!(peers_connected.is_ok());
assert_eq!(peers_connected.unwrap().len(), 1);
// Check that peer all is also working
let mut peers_all = get_all_peers(&base_addr, api_server_port);
assert!(peers_all.is_ok());
assert_eq!(peers_all.unwrap().len(), 1);
let pall = peers_all.unwrap();
println!("Peers: {:?}", &pall);
assert_eq!(pall.len(), 1);
// Check that when we get peer connected the peer is here
let peers_connected = get_connected_peers(&base_addr, api_server_port);
assert!(peers_connected.is_ok());
let pc = peers_connected.unwrap();
println!("Peers connected: {:?}", &pc);
assert_eq!(pc.len(), 1);
// Check that the peer status is Healthy
let addr = format!(
@ -431,7 +435,7 @@ pub fn ban_peer(base_addr: &String, api_server_port: u16, peer_addr: &String) ->
"http://{}:{}/v1/peers/{}/ban",
base_addr, api_server_port, peer_addr
);
api::client::post(url.as_str(), &"").map_err(|e| Error::API(e))
api::client::post_no_ret(url.as_str(), &"").map_err(|e| Error::API(e))
}
pub fn unban_peer(
@ -443,7 +447,7 @@ pub fn unban_peer(
"http://{}:{}/v1/peers/{}/unban",
base_addr, api_server_port, peer_addr
);
api::client::post(url.as_str(), &"").map_err(|e| Error::API(e))
api::client::post_no_ret(url.as_str(), &"").map_err(|e| Error::API(e))
}
pub fn get_peer(
@ -455,7 +459,10 @@ pub fn get_peer(
"http://{}:{}/v1/peers/{}",
base_addr, api_server_port, peer_addr
);
api::client::get::<p2p::PeerData>(url.as_str()).map_err(|e| Error::API(e))
api::client::get::<p2p::PeerData>(url.as_str()).map_err(|e| {
println!("got error {:}", e);
Error::API(e)
})
}
pub fn get_connected_peers(

View file

@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
extern crate router;
#[macro_use]
extern crate slog;

View file

@ -337,30 +337,27 @@ impl LocalServerContainer {
.unwrap_or_else(|e| panic!("Error creating wallet: {:?} Config: {:?}", e, config));
wallet.keychain = Some(keychain);
let _ =
wallet::controller::owner_single_use(
Arc::new(Mutex::new(Box::new(wallet))),
|api| {
let result = api.issue_send_tx(
amount,
minimum_confirmations,
wallet::controller::owner_single_use(Arc::new(Mutex::new(Box::new(wallet))), |api| {
let result = api.issue_send_tx(
amount,
minimum_confirmations,
dest,
max_outputs,
selection_strategy == "all",
);
match result {
Ok(_) => println!(
"Tx sent: {} grin to {} (strategy '{}')",
core::core::amount_to_hr_string(amount),
dest,
max_outputs,
selection_strategy == "all",
);
match result {
Ok(_) => println!(
"Tx sent: {} grin to {} (strategy '{}')",
core::core::amount_to_hr_string(amount),
dest,
selection_strategy,
),
Err(e) => {
println!("Tx not sent to {}: {:?}", dest, e);
}
};
Ok(())
},
).unwrap_or_else(|e| panic!("Error creating wallet: {:?} Config: {:?}", e, config));
selection_strategy,
),
Err(e) => {
println!("Tx not sent to {}: {:?}", dest, e);
}
};
Ok(())
}).unwrap_or_else(|e| panic!("Error creating wallet: {:?} Config: {:?}", e, config));
}
/// Stops the running wallet server

View file

@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
extern crate router;
#[macro_use]
extern crate slog;
@ -59,7 +58,6 @@ fn basic_wallet_transactions() {
let coinbase_wallet_config = { coinbase_wallet.lock().unwrap().wallet_config.clone() };
let coinbase_seed = LocalServerContainer::get_wallet_seed(&coinbase_wallet_config);
let _ = thread::spawn(move || {
let mut w = coinbase_wallet.lock().unwrap();
w.run_wallet(0);
@ -72,7 +70,6 @@ fn basic_wallet_transactions() {
let target_wallet = Arc::new(Mutex::new(LocalServerContainer::new(recp_config).unwrap()));
let target_wallet_cloned = target_wallet.clone();
let recp_wallet_config = { target_wallet.lock().unwrap().wallet_config.clone() };
let recp_seed = LocalServerContainer::get_wallet_seed(&recp_wallet_config);
//Start up a second wallet, to receive
let _ = thread::spawn(move || {
@ -80,16 +77,16 @@ fn basic_wallet_transactions() {
w.run_wallet(0);
});
let mut server_config = LocalServerContainerConfig::default();
server_config.name = String::from("server_one");
server_config.p2p_server_port = 30000;
server_config.api_server_port = 30001;
server_config.start_miner = true;
server_config.start_wallet = false;
server_config.coinbase_wallet_address =
String::from(format!("http://{}:{}", server_config.base_addr, 10002));
// Spawn server and let it run for a bit
let _ = thread::spawn(move || {
let mut server_config = LocalServerContainerConfig::default();
server_config.name = String::from("server_one");
server_config.p2p_server_port = 30000;
server_config.api_server_port = 30001;
server_config.start_miner = true;
server_config.start_wallet = false;
server_config.coinbase_wallet_address =
String::from(format!("http://{}:{}", server_config.base_addr, 10002));
let mut server_one = LocalServerContainer::new(server_config).unwrap();
server_one.run_server(120);
});

View file

@ -56,7 +56,7 @@ pub fn ban_peer(config: &ServerConfig, peer_addr: &SocketAddr) {
config.api_http_addr,
peer_addr.to_string()
);
match api::client::post(url.as_str(), &params).map_err(|e| Error::API(e)) {
match api::client::post_no_ret(url.as_str(), &params).map_err(|e| Error::API(e)) {
Ok(_) => writeln!(e, "Successfully banned peer {}", peer_addr.to_string()).unwrap(),
Err(_) => writeln!(e, "Failed to ban peer {}", peer_addr).unwrap(),
};
@ -71,7 +71,7 @@ pub fn unban_peer(config: &ServerConfig, peer_addr: &SocketAddr) {
config.api_http_addr,
peer_addr.to_string()
);
match api::client::post(url.as_str(), &params).map_err(|e| Error::API(e)) {
match api::client::post_no_ret(url.as_str(), &params).map_err(|e| Error::API(e)) {
Ok(_) => writeln!(e, "Successfully unbanned peer {}", peer_addr).unwrap(),
Err(_) => writeln!(e, "Failed to unban peer {}", peer_addr).unwrap(),
};

View file

@ -12,20 +12,19 @@ byteorder = "1"
failure = "0.1"
failure_derive = "0.1"
futures = "0.1"
hyper = "0.11"
iron = "0.5"
hyper = "0.12"
prettytable-rs = "0.7"
rand = "0.3"
router = "0.5"
serde = "1"
serde_derive = "1"
serde_json = "1"
slog = { version = "~2.2", features = ["max_level_trace", "release_max_level_trace"] }
term = "0.5"
tokio = "0.1.7"
tokio-core = "0.1"
tokio-retry = "0.1"
uuid = { version = "0.6", features = ["serde", "v4"] }
urlencoded = "0.5"
url = "1.7.0"
chrono = { version = "0.4.4", features = ["serde"] }
grin_api = { path = "../api" }

View file

@ -18,13 +18,6 @@
use failure::ResultExt;
use libwallet::types::*;
use std::collections::HashMap;
use std::io;
use futures::{Future, Stream};
use hyper::header::ContentType;
use hyper::{self, Method, Request};
use serde_json;
use tokio_core::reactor;
use api;
use error::{Error, ErrorKind};
@ -90,38 +83,9 @@ impl WalletClient for HTTPWalletClient {
let url = format!("{}/v1/wallet/foreign/receive_tx", dest);
debug!(LOGGER, "Posting transaction slate to {}", url);
let mut core = reactor::Core::new().context(libwallet::ErrorKind::ClientCallback(
"Sending transaction: Initialise API",
))?;
let client = hyper::Client::new(&core.handle());
let url_pool = url.to_owned();
let mut req = Request::new(
Method::Post,
url_pool
.parse::<hyper::Uri>()
.context(libwallet::ErrorKind::ClientCallback(
"Sending transaction: parsing URL",
))?,
);
req.headers_mut().set(ContentType::json());
let json = serde_json::to_string(&slate).context(libwallet::ErrorKind::ClientCallback(
"Sending transaction: parsing response",
))?;
req.set_body(json);
let work = client.request(req).and_then(|res| {
res.body().concat2().and_then(move |body| {
let slate: Slate = serde_json::from_slice(&body)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
Ok(slate)
})
});
let res = core.run(work)
.context(libwallet::ErrorKind::ClientCallback(
"Sending transaction: posting request",
))?;
let res = api::client::post(url.as_str(), slate).context(
libwallet::ErrorKind::ClientCallback("Posting transaction slate"),
)?;
Ok(res)
}
@ -134,7 +98,7 @@ impl WalletClient for HTTPWalletClient {
} else {
url = format!("{}/v1/pool/push", dest);
}
api::client::post(url.as_str(), tx).context(libwallet::ErrorKind::ClientCallback(
api::client::post_no_ret(url.as_str(), tx).context(libwallet::ErrorKind::ClientCallback(
"Posting transaction to node",
))?;
Ok(())
@ -247,30 +211,8 @@ pub fn create_coinbase(dest: &str, block_fees: &BlockFees) -> Result<CbData, Err
/// Makes a single request to the wallet API to create a new coinbase output.
fn single_create_coinbase(url: &str, block_fees: &BlockFees) -> Result<CbData, Error> {
let mut core = reactor::Core::new().context(ErrorKind::GenericError(
"Could not create reactor".to_owned(),
let res = api::client::post(url, block_fees).context(ErrorKind::GenericError(
"Posting create coinbase".to_string(),
))?;
let client = hyper::Client::new(&core.handle());
let mut req = Request::new(
Method::Post,
url.parse::<hyper::Uri>().context(ErrorKind::Uri)?,
);
req.headers_mut().set(ContentType::json());
let json = serde_json::to_string(&block_fees).context(ErrorKind::Format)?;
trace!(LOGGER, "Sending coinbase request: {:?}", json);
req.set_body(json);
let work = client.request(req).and_then(|res| {
res.body().concat2().and_then(move |body| {
trace!(LOGGER, "Returned Body: {:?}", body);
let coinbase: CbData =
serde_json::from_slice(&body).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
Ok(coinbase)
})
});
let res = core.run(work)
.context(ErrorKind::GenericError("Could not run core".to_owned()))?;
Ok(res)
}

View file

@ -27,7 +27,7 @@ extern crate serde_json;
extern crate slog;
extern crate chrono;
extern crate term;
extern crate urlencoded;
extern crate url;
extern crate uuid;
extern crate bodyparser;
@ -36,9 +36,7 @@ extern crate failure;
extern crate failure_derive;
extern crate futures;
extern crate hyper;
extern crate iron;
#[macro_use]
extern crate router;
extern crate tokio;
extern crate tokio_core;
extern crate tokio_retry;

View file

@ -15,18 +15,18 @@
//! Controller for wallet.. instantiates and handles listeners (or single-run
//! invocations) as needed.
//! Still experimental
use api::ApiServer;
use api::{ApiServer, Handler, ResponseFuture, Router};
use std::cell::RefCell;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use bodyparser;
use iron::prelude::{IronError, IronResult, Plugin, Request, Response};
use iron::{status, Handler, Headers};
use serde::Serialize;
use futures::future::{err, ok};
use futures::{Future, Stream};
use hyper::{Body, Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use serde_json;
use urlencoded::UrlEncodedQuery;
use failure::Fail;
use keychain::Keychain;
use libtx::slate::Slate;
@ -35,6 +35,7 @@ use libwallet::types::{
BlockFees, CbData, OutputData, SendTXArgs, TxLogEntry, WalletBackend, WalletClient, WalletInfo,
};
use libwallet::{Error, ErrorKind};
use url::form_urlencoded;
use util::LOGGER;
@ -64,72 +65,102 @@ where
Ok(())
}
thread_local!(static OWNER_ROUTER: RefCell<Option<Router>> = RefCell::new(None));
/// Listener version, providing same API but listening for requests on a
/// port and wrapping the calls
pub fn owner_listener<T: ?Sized, C, K>(wallet: Box<T>, addr: &str) -> Result<(), Error>
where
T: WalletBackend<C, K>,
OwnerAPIGetHandler<T, C, K>: Handler,
OwnerAPIPostHandler<T, C, K>: Handler,
C: WalletClient,
K: Keychain,
T: WalletBackend<C, K> + Send + Sync + 'static,
OwnerAPIHandler<T, C, K>: Handler,
C: WalletClient + 'static,
K: Keychain + 'static,
{
let wallet_arc = Arc::new(Mutex::new(wallet));
let api_get_handler = OwnerAPIGetHandler::new(wallet_arc.clone());
let api_post_handler = OwnerAPIPostHandler::new(wallet_arc);
let api_options_handler = OwnerAPIOptionsHandler {};
let api_handler = OwnerAPIHandler::new(wallet_arc);
let router = router!(
owner_options: options "/wallet/owner/*" => api_options_handler,
owner_get: get "/wallet/owner/*" => api_get_handler,
owner_post: post "/wallet/owner/*" => api_post_handler,
);
let mut orouter = Router::new();
orouter
.add_route("/v1/wallet/owner/**", Box::new(api_handler))
.map_err(|_| ErrorKind::GenericError("Router failed to add route".to_string()))?;
let mut apis = ApiServer::new("/v1".to_string());
apis.register_handler(router);
match apis.start(addr) {
Err(e) => error!(
LOGGER,
"Failed to start Grin wallet owner API listener: {}.", e
),
Ok(_) => info!(LOGGER, "Grin wallet owner API listener started at {}", addr),
};
OWNER_ROUTER.with(move |router| {
*router.borrow_mut() = Some(orouter);
let mut apis = ApiServer::new();
info!(LOGGER, "Starting HTTP Owner API server at {}.", addr);
let socket_addr: SocketAddr = addr.parse().expect("unable to parse socket address");
apis.start(socket_addr, &handle_owner).unwrap_or_else(|e| {
error!(LOGGER, "Failed to start API HTTP server: {}.", e);
})
});
Ok(())
}
fn handle_owner(req: Request<Body>) -> ResponseFuture {
OWNER_ROUTER.with(|router| match *router.borrow() {
Some(ref h) => h.handle(req),
None => {
error!(LOGGER, "No HTTP API router configured");
Box::new(ok(response(
StatusCode::INTERNAL_SERVER_ERROR,
"No router configured",
)))
}
})
}
thread_local!(static FOREIGN_ROUTER: RefCell<Option<Router>> = RefCell::new(None));
/// Listener version, providing same API but listening for requests on a
/// port and wrapping the calls
pub fn foreign_listener<T: ?Sized, C, K>(wallet: Box<T>, addr: &str) -> Result<(), Error>
where
T: WalletBackend<C, K>,
ForeignAPIHandler<T, C, K>: Handler,
C: WalletClient,
K: Keychain,
T: WalletBackend<C, K> + Send + Sync + 'static,
C: WalletClient + 'static,
K: Keychain + 'static,
{
let api_handler = ForeignAPIHandler::new(Arc::new(Mutex::new(wallet)));
let router = router!(
receive_tx: post "/wallet/foreign/*" => api_handler,
);
let mut router = Router::new();
router
.add_route("/v1/wallet/foreign/**", Box::new(api_handler))
.map_err(|_| ErrorKind::GenericError("Router failed to add route".to_string()))?;
FOREIGN_ROUTER.with(move |frouter| {
*frouter.borrow_mut() = Some(router);
let mut apis = ApiServer::new();
info!(LOGGER, "Starting HTTP Foreign API server at {}.", addr);
let socket_addr: SocketAddr = addr.parse().expect("unable to parse socket address");
apis.start(socket_addr, &handle_foreign)
.unwrap_or_else(|e| {
error!(LOGGER, "Failed to start API HTTP server: {}.", e);
});
});
let mut apis = ApiServer::new("/v1".to_string());
apis.register_handler(router);
match apis.start(addr) {
Err(e) => error!(
LOGGER,
"Failed to start Grin wallet foreign listener: {}.", e
),
Ok(_) => info!(LOGGER, "Grin wallet foreign listener started at {}", addr),
};
Ok(())
}
/// API Handler/Wrapper for owner functions
pub struct OwnerAPIGetHandler<T: ?Sized, C, K>
fn handle_foreign(req: Request<Body>) -> ResponseFuture {
FOREIGN_ROUTER.with(|router| match *router.borrow() {
Some(ref h) => h.handle(req),
None => {
error!(LOGGER, "No HTTP API router configured");
Box::new(ok(response(
StatusCode::INTERNAL_SERVER_ERROR,
"No router configured",
)))
}
})
}
type WalletResponseFuture = Box<Future<Item = Response<Body>, Error = Error> + Send>;
/// API Handler/Wrapper for owner functions
pub struct OwnerAPIHandler<T: ?Sized, C, K>
where
T: WalletBackend<C, K>,
C: WalletClient,
K: Keychain,
T: WalletBackend<C, K> + Send + Sync + 'static,
C: WalletClient + 'static,
K: Keychain + 'static,
{
/// Wallet instance
pub wallet: Arc<Mutex<Box<T>>>,
@ -137,15 +168,15 @@ where
phantom_c: PhantomData<C>,
}
impl<T: ?Sized, C, K> OwnerAPIGetHandler<T, C, K>
impl<T: ?Sized, C, K> OwnerAPIHandler<T, C, K>
where
T: WalletBackend<C, K>,
C: WalletClient,
K: Keychain,
T: WalletBackend<C, K> + Send + Sync + 'static,
C: WalletClient + 'static,
K: Keychain + 'static,
{
/// Create a new owner API handler for GET methods
pub fn new(wallet: Arc<Mutex<Box<T>>>) -> OwnerAPIGetHandler<T, C, K> {
OwnerAPIGetHandler {
pub fn new(wallet: Arc<Mutex<Box<T>>>) -> OwnerAPIHandler<T, C, K> {
OwnerAPIHandler {
wallet,
phantom: PhantomData,
phantom_c: PhantomData,
@ -154,23 +185,23 @@ where
fn retrieve_outputs(
&self,
req: &mut Request,
api: &mut APIOwner<T, C, K>,
req: &Request<Body>,
api: APIOwner<T, C, K>,
) -> Result<(bool, Vec<OutputData>), Error> {
let mut update_from_node = false;
let mut id = None;
let mut show_spent = false;
if let Ok(params) = req.get_ref::<UrlEncodedQuery>() {
if let Some(_) = params.get("refresh") {
update_from_node = true;
}
if let Some(_) = params.get("show_spent") {
show_spent = true;
}
if let Some(ids) = params.get("tx_id") {
for i in ids {
id = Some(i.parse().unwrap());
}
let params = parse_params(req);
if let Some(_) = params.get("refresh") {
update_from_node = true;
}
if let Some(_) = params.get("show_spent") {
show_spent = true;
}
if let Some(ids) = params.get("tx_id") {
for i in ids {
id = Some(i.parse().unwrap());
}
}
api.retrieve_outputs(show_spent, update_from_node, id)
@ -178,19 +209,20 @@ where
fn retrieve_txs(
&self,
req: &mut Request,
api: &mut APIOwner<T, C, K>,
req: &Request<Body>,
api: APIOwner<T, C, K>,
) -> Result<(bool, Vec<TxLogEntry>), Error> {
let mut id = None;
let mut update_from_node = false;
if let Ok(params) = req.get_ref::<UrlEncodedQuery>() {
if let Some(_) = params.get("refresh") {
update_from_node = true;
}
if let Some(ids) = params.get("id") {
for i in ids {
id = Some(i.parse().unwrap());
}
let params = parse_params(req);
if let Some(_) = params.get("refresh") {
update_from_node = true;
}
if let Some(ids) = params.get("id") {
for i in ids {
id = Some(i.parse().unwrap());
}
}
api.retrieve_txs(update_from_node, id)
@ -198,212 +230,129 @@ where
fn retrieve_summary_info(
&self,
req: &mut Request,
api: &mut APIOwner<T, C, K>,
req: &Request<Body>,
mut api: APIOwner<T, C, K>,
) -> Result<(bool, WalletInfo), Error> {
let mut update_from_node = false;
if let Ok(params) = req.get_ref::<UrlEncodedQuery>() {
if let Some(_) = params.get("refresh") {
update_from_node = true;
}
}
let update_from_node = param_exists(req, "refresh");
api.retrieve_summary_info(update_from_node)
}
fn node_height(
&self,
_req: &mut Request,
api: &mut APIOwner<T, C, K>,
_req: &Request<Body>,
mut api: APIOwner<T, C, K>,
) -> Result<(u64, bool), Error> {
api.node_height()
}
fn handle_request(
&self,
req: &mut Request,
api: &mut APIOwner<T, C, K>,
) -> IronResult<Response> {
let url = req.url.clone();
let path_elems = url.path();
match *path_elems.last().unwrap() {
"retrieve_outputs" => json_response(&self.retrieve_outputs(req, api)
.map_err(|e| IronError::new(Fail::compat(e), status::BadRequest))?),
"retrieve_txs" => json_response(&self.retrieve_txs(req, api)
.map_err(|e| IronError::new(Fail::compat(e), status::BadRequest))?),
"retrieve_summary_info" => json_response(&self.retrieve_summary_info(req, api)
.map_err(|e| IronError::new(Fail::compat(e), status::BadRequest))?),
"node_height" => json_response(&self.node_height(req, api)
.map_err(|e| IronError::new(Fail::compat(e), status::BadRequest))?),
_ => Err(IronError::new(
Fail::compat(ErrorKind::Hyper),
status::BadRequest,
)),
}
}
}
impl<T: ?Sized, C, K> Handler for OwnerAPIGetHandler<T, C, K>
where
T: WalletBackend<C, K> + Send + Sync + 'static,
C: WalletClient + 'static,
K: Keychain + 'static,
{
fn handle(&self, req: &mut Request) -> IronResult<Response> {
let mut api = APIOwner::new(self.wallet.clone());
let mut resp_json = self.handle_request(req, &mut api);
if !resp_json.is_err() {
resp_json
.as_mut()
.unwrap()
.headers
.set_raw("access-control-allow-origin", vec![b"*".to_vec()]);
}
resp_json
}
}
/// Handles all owner API POST requests
pub struct OwnerAPIPostHandler<T: ?Sized, C, K>
where
T: WalletBackend<C, K>,
C: WalletClient,
K: Keychain,
{
/// Wallet instance
pub wallet: Arc<Mutex<Box<T>>>,
phantom: PhantomData<K>,
phantom_c: PhantomData<C>,
}
impl<T: ?Sized, C, K> OwnerAPIPostHandler<T, C, K>
where
T: WalletBackend<C, K>,
C: WalletClient,
K: Keychain,
{
/// New POST handler
pub fn new(wallet: Arc<Mutex<Box<T>>>) -> OwnerAPIPostHandler<T, C, K> {
OwnerAPIPostHandler {
wallet,
phantom: PhantomData,
phantom_c: PhantomData,
}
fn handle_get_request(&self, req: &Request<Body>) -> Result<Response<Body>, Error> {
let api = APIOwner::new(self.wallet.clone());
Ok(match req.uri()
.path()
.trim_right_matches("/")
.rsplit("/")
.next()
.unwrap()
{
"retrieve_outputs" => json_response(&self.retrieve_outputs(req, api)?),
"retrieve_summary_info" => json_response(&self.retrieve_summary_info(req, api)?),
"node_height" => json_response(&self.node_height(req, api)?),
"retrieve_txs" => json_response(&self.retrieve_txs(req, api)?),
_ => response(StatusCode::BAD_REQUEST, ""),
})
}
fn issue_send_tx(
&self,
req: &mut Request,
api: &mut APIOwner<T, C, K>,
) -> Result<Slate, Error> {
let struct_body = req.get::<bodyparser::Struct<SendTXArgs>>();
match struct_body {
Ok(Some(args)) => api.issue_send_tx(
req: Request<Body>,
mut api: APIOwner<T, C, K>,
) -> Box<Future<Item = Slate, Error = Error> + Send> {
Box::new(parse_body(req).and_then(move |args: SendTXArgs| {
api.issue_send_tx(
args.amount,
args.minimum_confirmations,
&args.dest,
args.max_outputs,
args.selection_strategy_is_use_all,
),
Ok(None) => {
error!(LOGGER, "Missing request body: issue_send_tx");
Err(ErrorKind::GenericError(
"Invalid request body: issue_send_tx".to_owned(),
))?
}
Err(e) => {
error!(LOGGER, "Invalid request body: issue_send_tx {:?}", e);
Err(ErrorKind::GenericError(
"Invalid request body: issue_send_tx".to_owned(),
))?
}
}
)
}))
}
fn issue_burn_tx(&self, _req: &mut Request, api: &mut APIOwner<T, C, K>) -> Result<(), Error> {
// TODO: Args
api.issue_burn_tx(60, 10, 1000)
}
fn handle_request(
fn issue_burn_tx(
&self,
req: &mut Request,
api: &mut APIOwner<T, C, K>,
) -> Result<String, Error> {
let url = req.url.clone();
let path_elems = url.path();
match *path_elems.last().unwrap() {
"issue_send_tx" => json_response_pretty(&self.issue_send_tx(req, api)?),
"issue_burn_tx" => json_response_pretty(&self.issue_burn_tx(req, api)?),
_ => Err(ErrorKind::GenericError(
_req: Request<Body>,
mut api: APIOwner<T, C, K>,
) -> Box<Future<Item = (), Error = Error> + Send> {
// TODO: Args
Box::new(match api.issue_burn_tx(60, 10, 1000) {
Ok(_) => ok(()),
Err(e) => err(e),
})
}
fn handle_post_request(&self, req: Request<Body>) -> WalletResponseFuture {
let api = APIOwner::new(self.wallet.clone());
match req.uri()
.path()
.trim_right_matches("/")
.rsplit("/")
.next()
.unwrap()
{
"issue_send_tx" => Box::new(
self.issue_send_tx(req, api)
.and_then(|slate| ok(json_response_pretty(&slate))),
),
"issue_burn_tx" => Box::new(
self.issue_burn_tx(req, api)
.and_then(|_| ok(response(StatusCode::OK, ""))),
),
_ => Box::new(err(ErrorKind::GenericError(
"Unknown error handling post request".to_owned(),
))?,
).into())),
}
}
fn create_error_response(&self, e: Error) -> IronResult<Response> {
let mut headers = Headers::new();
headers.set_raw("access-control-allow-origin", vec![b"*".to_vec()]);
headers.set_raw(
"access-control-allow-headers",
vec![b"Content-Type".to_vec()],
);
let message = format!("{}", e.kind());
let mut r = Response::with((status::InternalServerError, message));
r.headers = headers;
Ok(r)
}
fn create_ok_response(&self, json: &str) -> IronResult<Response> {
let mut headers = Headers::new();
headers.set_raw("access-control-allow-origin", vec![b"*".to_vec()]);
let mut r = Response::with((status::Ok, json));
r.headers = headers;
Ok(r)
}
}
impl<T: ?Sized, C, K> Handler for OwnerAPIPostHandler<T, C, K>
impl<T: ?Sized, C, K> Handler for OwnerAPIHandler<T, C, K>
where
T: WalletBackend<C, K> + Send + Sync + 'static,
C: WalletClient + 'static,
K: Keychain + 'static,
{
fn handle(&self, req: &mut Request) -> IronResult<Response> {
let mut api = APIOwner::new(self.wallet.clone());
let resp = match self.handle_request(req, &mut api) {
Ok(r) => self.create_ok_response(&r),
fn get(&self, req: Request<Body>) -> ResponseFuture {
match self.handle_get_request(&req) {
Ok(r) => Box::new(ok(r)),
Err(e) => {
error!(LOGGER, "Request Error: {:?}", e);
self.create_error_response(e)
Box::new(ok(create_error_response(e)))
}
};
resp
}
}
fn post(&self, req: Request<Body>) -> ResponseFuture {
Box::new(
self.handle_post_request(req)
.and_then(|r| ok(r))
.or_else(|e| {
error!(LOGGER, "Request Error: {:?}", e);
ok(create_error_response(e))
}),
)
}
fn options(&self, _req: Request<Body>) -> ResponseFuture {
Box::new(ok(create_ok_response("{}")))
}
}
/// Options handler
pub struct OwnerAPIOptionsHandler {}
impl Handler for OwnerAPIOptionsHandler where {
fn handle(&self, _req: &mut Request) -> IronResult<Response> {
let mut resp_json = Ok(Response::with((status::Ok, "{}")));
let mut headers = Headers::new();
headers.set_raw("access-control-allow-origin", vec![b"*".to_vec()]);
headers.set_raw(
"access-control-allow-headers",
vec![b"Content-Type".to_vec()],
);
resp_json.as_mut().unwrap().headers = headers;
resp_json
}
}
/// API Handler/Wrapper for foreign functions
pub struct ForeignAPIHandler<T: ?Sized, C, K>
where
T: WalletBackend<C, K>,
C: WalletClient,
K: Keychain,
T: WalletBackend<C, K> + Send + Sync + 'static,
C: WalletClient + 'static,
K: Keychain + 'static,
{
/// Wallet instance
pub wallet: Arc<Mutex<Box<T>>>,
@ -413,9 +362,9 @@ where
impl<T: ?Sized, C, K> ForeignAPIHandler<T, C, K>
where
T: WalletBackend<C, K>,
C: WalletClient,
K: Keychain,
T: WalletBackend<C, K> + Send + Sync + 'static,
C: WalletClient + 'static,
K: Keychain + 'static,
{
/// create a new api handler
pub fn new(wallet: Arc<Mutex<Box<T>>>) -> ForeignAPIHandler<T, C, K> {
@ -428,55 +377,43 @@ where
fn build_coinbase(
&self,
req: &mut Request,
api: &mut APIForeign<T, C, K>,
) -> Result<CbData, Error> {
let struct_body = req.get::<bodyparser::Struct<BlockFees>>();
match struct_body {
Ok(Some(block_fees)) => api.build_coinbase(&block_fees),
Ok(None) => {
error!(LOGGER, "Missing request body: build_coinbase");
Err(ErrorKind::GenericError(
"Invalid request body: build_coinbase".to_owned(),
))?
}
Err(e) => {
error!(LOGGER, "Invalid request body: build_coinbase: {:?}", e);
Err(ErrorKind::GenericError(
"Invalid request body: build_coinbase".to_owned(),
))?
}
}
req: Request<Body>,
mut api: APIForeign<T, C, K>,
) -> Box<Future<Item = CbData, Error = Error> + Send> {
Box::new(parse_body(req).and_then(move |block_fees| api.build_coinbase(&block_fees)))
}
fn receive_tx(&self, req: &mut Request, api: &mut APIForeign<T, C, K>) -> Result<Slate, Error> {
let struct_body = req.get::<bodyparser::Struct<Slate>>();
if let Ok(Some(mut slate)) = struct_body {
api.receive_tx(&mut slate)?;
Ok(slate.clone())
} else {
Err(ErrorKind::GenericError(
"Invalid request body: receive_tx".to_owned(),
))?
}
}
fn handle_request(
fn receive_tx(
&self,
req: &mut Request,
api: &mut APIForeign<T, C, K>,
) -> IronResult<Response> {
let url = req.url.clone();
let path_elems = url.path();
match *path_elems.last().unwrap() {
"build_coinbase" => json_response(&self.build_coinbase(req, api)
.map_err(|e| IronError::new(Fail::compat(e), status::BadRequest))?),
"receive_tx" => json_response(&self.receive_tx(req, api)
.map_err(|e| IronError::new(Fail::compat(e), status::BadRequest))?),
_ => Err(IronError::new(
Fail::compat(ErrorKind::Hyper),
status::BadRequest,
)),
req: Request<Body>,
mut api: APIForeign<T, C, K>,
) -> Box<Future<Item = Slate, Error = Error> + Send> {
Box::new(
parse_body(req).and_then(move |mut slate| match api.receive_tx(&mut slate) {
Ok(_) => ok(slate.clone()),
Err(e) => err(e),
}),
)
}
fn handle_request(&self, req: Request<Body>) -> WalletResponseFuture {
let api = *APIForeign::new(self.wallet.clone());
match req.uri()
.path()
.trim_right_matches("/")
.rsplit("/")
.next()
.unwrap()
{
"build_coinbase" => Box::new(
self.build_coinbase(req, api)
.and_then(|res| ok(json_response(&res))),
),
"receive_tx" => Box::new(
self.receive_tx(req, api)
.and_then(|res| ok(json_response(&res))),
),
_ => Box::new(ok(response(StatusCode::BAD_REQUEST, "unknown action"))),
}
}
}
@ -486,32 +423,92 @@ where
C: WalletClient + Send + Sync + 'static,
K: Keychain + 'static,
{
fn handle(&self, req: &mut Request) -> IronResult<Response> {
let mut api = APIForeign::new(self.wallet.clone());
let resp_json = self.handle_request(req, &mut *api);
resp_json
fn post(&self, req: Request<Body>) -> ResponseFuture {
Box::new(self.handle_request(req).and_then(|r| ok(r)).or_else(|e| {
error!(LOGGER, "Request Error: {:?}", e);
ok(create_error_response(e))
}))
}
}
// Utility to serialize a struct into JSON and produce a sensible IronResult
// Utility to serialize a struct into JSON and produce a sensible Response
// out of it.
fn json_response<T>(s: &T) -> IronResult<Response>
fn json_response<T>(s: &T) -> Response<Body>
where
T: Serialize,
{
match serde_json::to_string(s) {
Ok(json) => Ok(Response::with((status::Ok, json))),
Err(_) => Ok(Response::with((status::InternalServerError, ""))),
Ok(json) => response(StatusCode::OK, json),
Err(_) => response(StatusCode::INTERNAL_SERVER_ERROR, ""),
}
}
// pretty-printed version of above
fn json_response_pretty<T>(s: &T) -> Result<String, Error>
fn json_response_pretty<T>(s: &T) -> Response<Body>
where
T: Serialize,
{
match serde_json::to_string_pretty(s) {
Ok(json) => Ok(json),
Err(_) => Err(ErrorKind::Format)?,
Ok(json) => response(StatusCode::OK, json),
Err(_) => response(StatusCode::INTERNAL_SERVER_ERROR, ""),
}
}
fn create_error_response(e: Error) -> Response<Body> {
Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.header("access-control-allow-origin", "*")
.header("access-control-allow-headers", "Content-Type")
.body(format!("{}", e.kind()).into())
.unwrap()
}
fn create_ok_response(json: &str) -> Response<Body> {
Response::builder()
.status(StatusCode::OK)
.header("access-control-allow-origin", "*")
.body(json.to_string().into())
.unwrap()
}
fn response<T: Into<Body>>(status: StatusCode, text: T) -> Response<Body> {
Response::builder()
.status(status)
.header("access-control-allow-origin", "*")
.body(text.into())
.unwrap()
//let mut resp = Response::new(text.into());
//*resp.status_mut() = status;
//resp
}
fn parse_params(req: &Request<Body>) -> HashMap<String, Vec<String>> {
match req.uri().query() {
Some(query_string) => form_urlencoded::parse(query_string.as_bytes())
.into_owned()
.fold(HashMap::new(), |mut hm, (k, v)| {
hm.entry(k).or_insert(vec![]).push(v);
hm
}),
None => HashMap::new(),
}
}
fn param_exists(req: &Request<Body>, param: &str) -> bool {
parse_params(req).get(param).is_some()
}
fn parse_body<T>(req: Request<Body>) -> Box<Future<Item = T, Error = Error> + Send>
where
for<'de> T: Deserialize<'de> + Send + 'static,
{
Box::new(
req.into_body()
.concat2()
.map_err(|_| ErrorKind::GenericError("Failed to read request".to_owned()).into())
.and_then(|body| match serde_json::from_reader(&body.to_vec()[..]) {
Ok(obj) => ok(obj),
Err(_) => err(ErrorKind::GenericError("Invalid request body".to_owned()).into()),
}),
)
}