Merge pull request #1557 from yourowncrypto/api-refactor

Refactor hyper router
This commit is contained in:
hashmap 2018-09-19 19:33:48 +02:00 committed by GitHub
commit 521ce901e4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 90 additions and 105 deletions

View file

@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::cell::RefCell;
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt::Debug; use std::fmt::Debug;
use std::net::SocketAddr; use std::net::SocketAddr;
@ -23,7 +22,6 @@ use failure::ResultExt;
use futures::future::{err, ok}; use futures::future::{err, ok};
use futures::{Future, Stream}; use futures::{Future, Stream};
use hyper::{Body, Request, Response, StatusCode}; use hyper::{Body, Request, Response, StatusCode};
use rest::{Error, ErrorKind};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json; use serde_json;
@ -609,8 +607,6 @@ impl Handler for HeaderHandler {
} }
} }
/// Gets block details given either a hash or an unspent commit /// Gets block details given either a hash or an unspent commit
/// GET /v1/blocks/<hash> /// GET /v1/blocks/<hash>
/// GET /v1/blocks/<height> /// GET /v1/blocks/<height>
@ -846,8 +842,6 @@ fn just_response<T: Into<Body> + Debug>(status: StatusCode, text: T) -> Response
resp resp
} }
thread_local!( static ROUTER: RefCell<Option<Router>> = RefCell::new(None) );
/// Start all server HTTP handlers. Register all of them with Router /// Start all server HTTP handlers. Register all of them with Router
/// and runs the corresponding HTTP server. /// and runs the corresponding HTTP server.
/// ///
@ -867,29 +861,16 @@ pub fn start_rest_apis(
.spawn(move || { .spawn(move || {
let mut apis = ApiServer::new(); let mut apis = ApiServer::new();
ROUTER.with(|router| { let router = build_router(chain, tx_pool, peers).expect("unable to build API router");
*router.borrow_mut() =
Some(build_router(chain, tx_pool, peers).expect("unable to build API router"));
info!(LOGGER, "Starting HTTP API server at {}.", addr); info!(LOGGER, "Starting HTTP API server at {}.", addr);
let socket_addr: SocketAddr = addr.parse().expect("unable to parse socket address"); let socket_addr: SocketAddr = addr.parse().expect("unable to parse socket address");
apis.start(socket_addr, &handle).unwrap_or_else(|e| { apis.start(socket_addr, router).unwrap_or_else(|e| {
error!(LOGGER, "Failed to start API HTTP server: {}.", e); error!(LOGGER, "Failed to start API HTTP server: {}.", e);
});
}); });
}); });
} }
pub fn handle(req: Request<Body>) -> ResponseFuture {
ROUTER.with(|router| match *router.borrow() {
Some(ref h) => h.handle(req),
None => {
error!(LOGGER, "No HTTP API router configured");
response(StatusCode::INTERNAL_SERVER_ERROR, "No router configured")
}
})
}
fn parse_body<T>(req: Request<Body>) -> Box<Future<Item = T, Error = Error> + Send> fn parse_body<T>(req: Request<Body>) -> Box<Future<Item = T, Error = Error> + Send>
where where
for<'de> T: Deserialize<'de> + Send + 'static, for<'de> T: Deserialize<'de> + Send + 'static,
@ -977,7 +958,11 @@ pub fn build_router(
}; };
let mut router = Router::new(); let mut router = Router::new();
router.add_route("/v1/", Box::new(index_handler))?; // example how we can use midlleware
router.add_route(
"/v1/",
Box::new(LoggingMiddleware::new(Box::new(index_handler))),
)?;
router.add_route("/v1/blocks/*", Box::new(block_handler))?; router.add_route("/v1/blocks/*", Box::new(block_handler))?;
router.add_route("/v1/headers/*", Box::new(header_handler))?; router.add_route("/v1/headers/*", Box::new(header_handler))?;
router.add_route("/v1/chain", Box::new(chain_tip_handler))?; router.add_route("/v1/chain", Box::new(chain_tip_handler))?;

View file

@ -19,14 +19,13 @@
//! register them on a ApiServer. //! register them on a ApiServer.
use hyper::rt::Future; use hyper::rt::Future;
use hyper::service::service_fn; use hyper::{rt, Body, Request, Server};
use hyper::{Body, Request, Server}; use router::{Handler, HandlerObj, ResponseFuture, Router};
use router::ResponseFuture;
use std::fmt::{self, Display}; use std::fmt::{self, Display};
use std::net::SocketAddr; use std::net::SocketAddr;
use tokio::runtime::current_thread::Runtime;
use failure::{Backtrace, Context, Fail}; use failure::{Backtrace, Context, Fail};
use util::LOGGER;
/// Errors that can be returned by an ApiEndpoint implementation. /// Errors that can be returned by an ApiEndpoint implementation.
#[derive(Debug)] #[derive(Debug)]
@ -95,18 +94,16 @@ impl ApiServer {
} }
/// Starts the ApiServer at the provided address. /// Starts the ApiServer at the provided address.
pub fn start<F>(&mut self, addr: SocketAddr, f: &'static F) -> Result<(), String> pub fn start(&mut self, addr: SocketAddr, router: Router) -> Result<(), String> {
where
F: Fn(Request<Body>) -> ResponseFuture + Send + Sync + 'static,
{
let server = Server::bind(&addr) let server = Server::bind(&addr)
.serve(move || service_fn(f)) .serve(router)
.map_err(|e| eprintln!("server error: {}", e)); .map_err(|e| eprintln!("server error: {}", e));
let mut rt = Runtime::new().unwrap(); //let mut rt = Runtime::new().unwrap();
if rt.block_on(server).is_err() { //if rt.block_on(server).is_err() {
return Err("tokio block_on error".to_owned()); // return Err("tokio block_on error".to_owned());
} //}
rt::run(server);
Ok(()) Ok(())
} }
@ -119,3 +116,21 @@ impl ApiServer {
// } // }
} }
} }
// Simple example of middleware
pub struct LoggingMiddleware {
next: HandlerObj,
}
impl LoggingMiddleware {
pub fn new(next: HandlerObj) -> LoggingMiddleware {
LoggingMiddleware { next }
}
}
impl Handler for LoggingMiddleware {
fn call(&self, req: Request<Body>) -> ResponseFuture {
debug!(LOGGER, "REST call: {} {}", req.method(), req.uri().path());
self.next.call(req)
}
}

View file

@ -1,6 +1,7 @@
use futures::future; use futures::future;
use hyper; use hyper;
use hyper::rt::Future; use hyper::rt::Future;
use hyper::service::{NewService, Service};
use hyper::{Body, Method, Request, Response, StatusCode}; use hyper::{Body, Method, Request, Response, StatusCode};
use std::collections::hash_map::DefaultHasher; use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
@ -49,6 +50,21 @@ pub trait Handler {
fn connect(&self, _req: Request<Body>) -> ResponseFuture { fn connect(&self, _req: Request<Body>) -> ResponseFuture {
not_found() not_found()
} }
fn call(&self, req: Request<Body>) -> ResponseFuture {
match req.method() {
&Method::GET => self.get(req),
&Method::POST => self.post(req),
&Method::PUT => self.put(req),
&Method::DELETE => self.delete(req),
&Method::PATCH => self.patch(req),
&Method::OPTIONS => self.options(req),
&Method::CONNECT => self.connect(req),
&Method::TRACE => self.trace(req),
&Method::HEAD => self.head(req),
_ => not_found(),
}
}
} }
#[derive(Fail, Debug)] #[derive(Fail, Debug)]
pub enum RouterError { pub enum RouterError {
@ -70,7 +86,7 @@ struct NodeId(usize);
const MAX_CHILDREN: usize = 16; const MAX_CHILDREN: usize = 16;
type HandlerObj = Box<Handler>; pub type HandlerObj = Box<Handler + Send + Sync>;
#[derive(Clone)] #[derive(Clone)]
struct Node { struct Node {
@ -122,7 +138,8 @@ impl Router {
let keys = generate_path(route); let keys = generate_path(route);
let mut node_id = self.root(); let mut node_id = self.root();
for key in keys { for key in keys {
node_id = self.find(node_id, key) node_id = self
.find(node_id, key)
.unwrap_or_else(|| self.add_empty_node(node_id, key)); .unwrap_or_else(|| self.add_empty_node(node_id, key));
} }
match self.node(node_id).value() { match self.node(node_id).value() {
@ -145,26 +162,34 @@ impl Router {
} }
self.node(node_id).value().ok_or(RouterError::NoValue) self.node(node_id).value().ok_or(RouterError::NoValue)
} }
}
pub fn handle(&self, req: Request<Body>) -> ResponseFuture { impl Service for Router {
type ReqBody = Body;
type ResBody = Body;
type Error = hyper::Error;
type Future = ResponseFuture;
fn call(&mut self, req: Request<Self::ReqBody>) -> Self::Future {
match self.get(req.uri().path()) { match self.get(req.uri().path()) {
Err(_) => not_found(), Err(_) => not_found(),
Ok(h) => match req.method() { Ok(h) => h.call(req),
&Method::GET => h.get(req),
&Method::POST => h.post(req),
&Method::PUT => h.put(req),
&Method::DELETE => h.delete(req),
&Method::PATCH => h.patch(req),
&Method::OPTIONS => h.options(req),
&Method::CONNECT => h.connect(req),
&Method::TRACE => h.trace(req),
&Method::HEAD => h.head(req),
_ => not_found(),
},
} }
} }
} }
impl NewService for Router {
type ReqBody = Body;
type ResBody = Body;
type Error = hyper::Error;
type InitError = hyper::Error;
type Service = Router;
type Future = Box<Future<Item = Self::Service, Error = Self::InitError> + Send>;
fn new_service(&self) -> Self::Future {
Box::new(future::ok(self.clone()))
}
}
impl Node { impl Node {
fn new(key: u64, value: Option<Arc<HandlerObj>>) -> Node { fn new(key: u64, value: Option<Arc<HandlerObj>>) -> Node {
Node { Node {
@ -303,5 +328,4 @@ mod tests {
assert_eq!(call_handler("/v1/zzz/2"), 103); assert_eq!(call_handler("/v1/zzz/2"), 103);
assert_eq!(call_handler("/v1/zzz/2/zzz"), 106); assert_eq!(call_handler("/v1/zzz/2/zzz"), 106);
} }
} }

View file

@ -16,7 +16,6 @@
//! invocations) as needed. //! invocations) as needed.
//! Still experimental //! Still experimental
use api::{ApiServer, Handler, ResponseFuture, Router}; use api::{ApiServer, Handler, ResponseFuture, Router};
use std::cell::RefCell;
use std::collections::HashMap; use std::collections::HashMap;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::net::SocketAddr; use std::net::SocketAddr;
@ -67,8 +66,6 @@ where
Ok(()) Ok(())
} }
thread_local!(static OWNER_ROUTER: RefCell<Option<Router>> = RefCell::new(None));
/// Listener version, providing same API but listening for requests on a /// Listener version, providing same API but listening for requests on a
/// port and wrapping the calls /// port and wrapping the calls
pub fn owner_listener<T: ?Sized, C, K>(wallet: Box<T>, addr: &str) -> Result<(), Error> pub fn owner_listener<T: ?Sized, C, K>(wallet: Box<T>, addr: &str) -> Result<(), Error>
@ -81,38 +78,20 @@ where
let wallet_arc = Arc::new(Mutex::new(wallet)); let wallet_arc = Arc::new(Mutex::new(wallet));
let api_handler = OwnerAPIHandler::new(wallet_arc); let api_handler = OwnerAPIHandler::new(wallet_arc);
let mut orouter = Router::new(); let mut router = Router::new();
orouter router
.add_route("/v1/wallet/owner/**", Box::new(api_handler)) .add_route("/v1/wallet/owner/**", Box::new(api_handler))
.map_err(|_| ErrorKind::GenericError("Router failed to add route".to_string()))?; .map_err(|_| ErrorKind::GenericError("Router failed to add route".to_string()))?;
OWNER_ROUTER.with(move |router| { let mut apis = ApiServer::new();
*router.borrow_mut() = Some(orouter); info!(LOGGER, "Starting HTTP Owner API server at {}.", addr);
let mut apis = ApiServer::new(); let socket_addr: SocketAddr = addr.parse().expect("unable to parse socket address");
info!(LOGGER, "Starting HTTP Owner API server at {}.", addr); apis.start(socket_addr, router).unwrap_or_else(|e| {
let socket_addr: SocketAddr = addr.parse().expect("unable to parse socket address"); error!(LOGGER, "Failed to start API HTTP server: {}.", e);
apis.start(socket_addr, &handle_owner).unwrap_or_else(|e| {
error!(LOGGER, "Failed to start API HTTP server: {}.", e);
})
}); });
Ok(()) Ok(())
} }
fn handle_owner(req: Request<Body>) -> ResponseFuture {
OWNER_ROUTER.with(|router| match *router.borrow() {
Some(ref h) => h.handle(req),
None => {
error!(LOGGER, "No HTTP API router configured");
Box::new(ok(response(
StatusCode::INTERNAL_SERVER_ERROR,
"No router configured",
)))
}
})
}
thread_local!(static FOREIGN_ROUTER: RefCell<Option<Router>> = RefCell::new(None));
/// Listener version, providing same API but listening for requests on a /// Listener version, providing same API but listening for requests on a
/// port and wrapping the calls /// port and wrapping the calls
pub fn foreign_listener<T: ?Sized, C, K>(wallet: Box<T>, addr: &str) -> Result<(), Error> pub fn foreign_listener<T: ?Sized, C, K>(wallet: Box<T>, addr: &str) -> Result<(), Error>
@ -128,33 +107,15 @@ where
.add_route("/v1/wallet/foreign/**", Box::new(api_handler)) .add_route("/v1/wallet/foreign/**", Box::new(api_handler))
.map_err(|_| ErrorKind::GenericError("Router failed to add route".to_string()))?; .map_err(|_| ErrorKind::GenericError("Router failed to add route".to_string()))?;
FOREIGN_ROUTER.with(move |frouter| { let mut apis = ApiServer::new();
*frouter.borrow_mut() = Some(router); info!(LOGGER, "Starting HTTP Foreign API server at {}.", addr);
let mut apis = ApiServer::new(); let socket_addr: SocketAddr = addr.parse().expect("unable to parse socket address");
info!(LOGGER, "Starting HTTP Foreign API server at {}.", addr); apis.start(socket_addr, router).unwrap_or_else(|e| {
let socket_addr: SocketAddr = addr.parse().expect("unable to parse socket address"); error!(LOGGER, "Failed to start API HTTP server: {}.", e);
apis.start(socket_addr, &handle_foreign)
.unwrap_or_else(|e| {
error!(LOGGER, "Failed to start API HTTP server: {}.", e);
});
}); });
Ok(()) Ok(())
} }
fn handle_foreign(req: Request<Body>) -> ResponseFuture {
FOREIGN_ROUTER.with(|router| match *router.borrow() {
Some(ref h) => h.handle(req),
None => {
error!(LOGGER, "No HTTP API router configured");
Box::new(ok(response(
StatusCode::INTERNAL_SERVER_ERROR,
"No router configured",
)))
}
})
}
type WalletResponseFuture = Box<Future<Item = Response<Body>, Error = Error> + Send>; type WalletResponseFuture = Box<Future<Item = Response<Body>, Error = Error> + Send>;
/// API Handler/Wrapper for owner functions /// API Handler/Wrapper for owner functions