Refactor hyper router

* Make it simpler to implement middleware
* Switch from the current thread runtime to the default one. It enables us to inject TLS support later one and potentially more scalable, unfortunately it involves some additonal cloning of the router, because we can't rely on thread local vars anymore
* Introduce `call` entrypoint for Handler, so it's possible to handle any HTTP method in one place, handy for middleware
* Implement example of middleware
This commit is contained in:
hashmap 2018-09-19 17:10:52 +02:00
parent ba72e6e29e
commit d5ef3d9d12
No known key found for this signature in database
GPG key ID: 5EA3C2D2455ED9C8
4 changed files with 90 additions and 105 deletions

View file

@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::cell::RefCell;
use std::collections::HashMap;
use std::fmt::Debug;
use std::net::SocketAddr;
@ -23,7 +22,6 @@ use failure::ResultExt;
use futures::future::{err, ok};
use futures::{Future, Stream};
use hyper::{Body, Request, Response, StatusCode};
use rest::{Error, ErrorKind};
use serde::{Deserialize, Serialize};
use serde_json;
@ -609,8 +607,6 @@ impl Handler for HeaderHandler {
}
}
/// Gets block details given either a hash or an unspent commit
/// GET /v1/blocks/<hash>
/// GET /v1/blocks/<height>
@ -846,8 +842,6 @@ fn just_response<T: Into<Body> + Debug>(status: StatusCode, text: T) -> Response
resp
}
thread_local!( static ROUTER: RefCell<Option<Router>> = RefCell::new(None) );
/// Start all server HTTP handlers. Register all of them with Router
/// and runs the corresponding HTTP server.
///
@ -867,29 +861,16 @@ pub fn start_rest_apis(
.spawn(move || {
let mut apis = ApiServer::new();
ROUTER.with(|router| {
*router.borrow_mut() =
Some(build_router(chain, tx_pool, peers).expect("unable to build API router"));
let router = build_router(chain, tx_pool, peers).expect("unable to build API router");
info!(LOGGER, "Starting HTTP API server at {}.", addr);
let socket_addr: SocketAddr = addr.parse().expect("unable to parse socket address");
apis.start(socket_addr, &handle).unwrap_or_else(|e| {
error!(LOGGER, "Failed to start API HTTP server: {}.", e);
});
info!(LOGGER, "Starting HTTP API server at {}.", addr);
let socket_addr: SocketAddr = addr.parse().expect("unable to parse socket address");
apis.start(socket_addr, router).unwrap_or_else(|e| {
error!(LOGGER, "Failed to start API HTTP server: {}.", e);
});
});
}
pub fn handle(req: Request<Body>) -> ResponseFuture {
ROUTER.with(|router| match *router.borrow() {
Some(ref h) => h.handle(req),
None => {
error!(LOGGER, "No HTTP API router configured");
response(StatusCode::INTERNAL_SERVER_ERROR, "No router configured")
}
})
}
fn parse_body<T>(req: Request<Body>) -> Box<Future<Item = T, Error = Error> + Send>
where
for<'de> T: Deserialize<'de> + Send + 'static,
@ -977,7 +958,11 @@ pub fn build_router(
};
let mut router = Router::new();
router.add_route("/v1/", Box::new(index_handler))?;
// example how we can use midlleware
router.add_route(
"/v1/",
Box::new(LoggingMiddleware::new(Box::new(index_handler))),
)?;
router.add_route("/v1/blocks/*", Box::new(block_handler))?;
router.add_route("/v1/headers/*", Box::new(header_handler))?;
router.add_route("/v1/chain", Box::new(chain_tip_handler))?;

View file

@ -19,14 +19,13 @@
//! register them on a ApiServer.
use hyper::rt::Future;
use hyper::service::service_fn;
use hyper::{Body, Request, Server};
use router::ResponseFuture;
use hyper::{rt, Body, Request, Server};
use router::{Handler, HandlerObj, ResponseFuture, Router};
use std::fmt::{self, Display};
use std::net::SocketAddr;
use tokio::runtime::current_thread::Runtime;
use failure::{Backtrace, Context, Fail};
use util::LOGGER;
/// Errors that can be returned by an ApiEndpoint implementation.
#[derive(Debug)]
@ -95,18 +94,16 @@ impl ApiServer {
}
/// Starts the ApiServer at the provided address.
pub fn start<F>(&mut self, addr: SocketAddr, f: &'static F) -> Result<(), String>
where
F: Fn(Request<Body>) -> ResponseFuture + Send + Sync + 'static,
{
pub fn start(&mut self, addr: SocketAddr, router: Router) -> Result<(), String> {
let server = Server::bind(&addr)
.serve(move || service_fn(f))
.serve(router)
.map_err(|e| eprintln!("server error: {}", e));
let mut rt = Runtime::new().unwrap();
if rt.block_on(server).is_err() {
return Err("tokio block_on error".to_owned());
}
//let mut rt = Runtime::new().unwrap();
//if rt.block_on(server).is_err() {
// return Err("tokio block_on error".to_owned());
//}
rt::run(server);
Ok(())
}
@ -119,3 +116,21 @@ impl ApiServer {
// }
}
}
// Simple example of middleware
pub struct LoggingMiddleware {
next: HandlerObj,
}
impl LoggingMiddleware {
pub fn new(next: HandlerObj) -> LoggingMiddleware {
LoggingMiddleware { next }
}
}
impl Handler for LoggingMiddleware {
fn call(&self, req: Request<Body>) -> ResponseFuture {
debug!(LOGGER, "REST call: {} {}", req.method(), req.uri().path());
self.next.call(req)
}
}

View file

@ -1,6 +1,7 @@
use futures::future;
use hyper;
use hyper::rt::Future;
use hyper::service::{NewService, Service};
use hyper::{Body, Method, Request, Response, StatusCode};
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
@ -49,6 +50,21 @@ pub trait Handler {
fn connect(&self, _req: Request<Body>) -> ResponseFuture {
not_found()
}
fn call(&self, req: Request<Body>) -> ResponseFuture {
match req.method() {
&Method::GET => self.get(req),
&Method::POST => self.post(req),
&Method::PUT => self.put(req),
&Method::DELETE => self.delete(req),
&Method::PATCH => self.patch(req),
&Method::OPTIONS => self.options(req),
&Method::CONNECT => self.connect(req),
&Method::TRACE => self.trace(req),
&Method::HEAD => self.head(req),
_ => not_found(),
}
}
}
#[derive(Fail, Debug)]
pub enum RouterError {
@ -70,7 +86,7 @@ struct NodeId(usize);
const MAX_CHILDREN: usize = 16;
type HandlerObj = Box<Handler>;
pub type HandlerObj = Box<Handler + Send + Sync>;
#[derive(Clone)]
struct Node {
@ -122,7 +138,8 @@ impl Router {
let keys = generate_path(route);
let mut node_id = self.root();
for key in keys {
node_id = self.find(node_id, key)
node_id = self
.find(node_id, key)
.unwrap_or_else(|| self.add_empty_node(node_id, key));
}
match self.node(node_id).value() {
@ -145,26 +162,34 @@ impl Router {
}
self.node(node_id).value().ok_or(RouterError::NoValue)
}
}
pub fn handle(&self, req: Request<Body>) -> ResponseFuture {
impl Service for Router {
type ReqBody = Body;
type ResBody = Body;
type Error = hyper::Error;
type Future = ResponseFuture;
fn call(&mut self, req: Request<Self::ReqBody>) -> Self::Future {
match self.get(req.uri().path()) {
Err(_) => not_found(),
Ok(h) => match req.method() {
&Method::GET => h.get(req),
&Method::POST => h.post(req),
&Method::PUT => h.put(req),
&Method::DELETE => h.delete(req),
&Method::PATCH => h.patch(req),
&Method::OPTIONS => h.options(req),
&Method::CONNECT => h.connect(req),
&Method::TRACE => h.trace(req),
&Method::HEAD => h.head(req),
_ => not_found(),
},
Ok(h) => h.call(req),
}
}
}
impl NewService for Router {
type ReqBody = Body;
type ResBody = Body;
type Error = hyper::Error;
type InitError = hyper::Error;
type Service = Router;
type Future = Box<Future<Item = Self::Service, Error = Self::InitError> + Send>;
fn new_service(&self) -> Self::Future {
Box::new(future::ok(self.clone()))
}
}
impl Node {
fn new(key: u64, value: Option<Arc<HandlerObj>>) -> Node {
Node {
@ -303,5 +328,4 @@ mod tests {
assert_eq!(call_handler("/v1/zzz/2"), 103);
assert_eq!(call_handler("/v1/zzz/2/zzz"), 106);
}
}

View file

@ -16,7 +16,6 @@
//! invocations) as needed.
//! Still experimental
use api::{ApiServer, Handler, ResponseFuture, Router};
use std::cell::RefCell;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::net::SocketAddr;
@ -67,8 +66,6 @@ where
Ok(())
}
thread_local!(static OWNER_ROUTER: RefCell<Option<Router>> = RefCell::new(None));
/// Listener version, providing same API but listening for requests on a
/// port and wrapping the calls
pub fn owner_listener<T: ?Sized, C, K>(wallet: Box<T>, addr: &str) -> Result<(), Error>
@ -81,38 +78,20 @@ where
let wallet_arc = Arc::new(Mutex::new(wallet));
let api_handler = OwnerAPIHandler::new(wallet_arc);
let mut orouter = Router::new();
orouter
let mut router = Router::new();
router
.add_route("/v1/wallet/owner/**", Box::new(api_handler))
.map_err(|_| ErrorKind::GenericError("Router failed to add route".to_string()))?;
OWNER_ROUTER.with(move |router| {
*router.borrow_mut() = Some(orouter);
let mut apis = ApiServer::new();
info!(LOGGER, "Starting HTTP Owner API server at {}.", addr);
let socket_addr: SocketAddr = addr.parse().expect("unable to parse socket address");
apis.start(socket_addr, &handle_owner).unwrap_or_else(|e| {
error!(LOGGER, "Failed to start API HTTP server: {}.", e);
})
let mut apis = ApiServer::new();
info!(LOGGER, "Starting HTTP Owner API server at {}.", addr);
let socket_addr: SocketAddr = addr.parse().expect("unable to parse socket address");
apis.start(socket_addr, router).unwrap_or_else(|e| {
error!(LOGGER, "Failed to start API HTTP server: {}.", e);
});
Ok(())
}
fn handle_owner(req: Request<Body>) -> ResponseFuture {
OWNER_ROUTER.with(|router| match *router.borrow() {
Some(ref h) => h.handle(req),
None => {
error!(LOGGER, "No HTTP API router configured");
Box::new(ok(response(
StatusCode::INTERNAL_SERVER_ERROR,
"No router configured",
)))
}
})
}
thread_local!(static FOREIGN_ROUTER: RefCell<Option<Router>> = RefCell::new(None));
/// Listener version, providing same API but listening for requests on a
/// port and wrapping the calls
pub fn foreign_listener<T: ?Sized, C, K>(wallet: Box<T>, addr: &str) -> Result<(), Error>
@ -128,33 +107,15 @@ where
.add_route("/v1/wallet/foreign/**", Box::new(api_handler))
.map_err(|_| ErrorKind::GenericError("Router failed to add route".to_string()))?;
FOREIGN_ROUTER.with(move |frouter| {
*frouter.borrow_mut() = Some(router);
let mut apis = ApiServer::new();
info!(LOGGER, "Starting HTTP Foreign API server at {}.", addr);
let socket_addr: SocketAddr = addr.parse().expect("unable to parse socket address");
apis.start(socket_addr, &handle_foreign)
.unwrap_or_else(|e| {
error!(LOGGER, "Failed to start API HTTP server: {}.", e);
});
let mut apis = ApiServer::new();
info!(LOGGER, "Starting HTTP Foreign API server at {}.", addr);
let socket_addr: SocketAddr = addr.parse().expect("unable to parse socket address");
apis.start(socket_addr, router).unwrap_or_else(|e| {
error!(LOGGER, "Failed to start API HTTP server: {}.", e);
});
Ok(())
}
fn handle_foreign(req: Request<Body>) -> ResponseFuture {
FOREIGN_ROUTER.with(|router| match *router.borrow() {
Some(ref h) => h.handle(req),
None => {
error!(LOGGER, "No HTTP API router configured");
Box::new(ok(response(
StatusCode::INTERNAL_SERVER_ERROR,
"No router configured",
)))
}
})
}
type WalletResponseFuture = Box<Future<Item = Response<Body>, Error = Error> + Send>;
/// API Handler/Wrapper for owner functions