From d5ef3d9d12ccc55c33f58ae685e982c176a572ef Mon Sep 17 00:00:00 2001 From: hashmap Date: Wed, 19 Sep 2018 17:10:52 +0200 Subject: [PATCH] Refactor hyper router * Make it simpler to implement middleware * Switch from the current thread runtime to the default one. It enables us to inject TLS support later one and potentially more scalable, unfortunately it involves some additonal cloning of the router, because we can't rely on thread local vars anymore * Introduce `call` entrypoint for Handler, so it's possible to handle any HTTP method in one place, handy for middleware * Implement example of middleware --- api/src/handlers.rs | 35 +++++------------ api/src/rest.rs | 41 +++++++++++++------ api/src/router.rs | 56 ++++++++++++++++++-------- wallet/src/libwallet/controller.rs | 63 ++++++------------------------ 4 files changed, 90 insertions(+), 105 deletions(-) diff --git a/api/src/handlers.rs b/api/src/handlers.rs index e65f05237..e6c265d9b 100644 --- a/api/src/handlers.rs +++ b/api/src/handlers.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::cell::RefCell; use std::collections::HashMap; use std::fmt::Debug; use std::net::SocketAddr; @@ -23,7 +22,6 @@ use failure::ResultExt; use futures::future::{err, ok}; use futures::{Future, Stream}; use hyper::{Body, Request, Response, StatusCode}; -use rest::{Error, ErrorKind}; use serde::{Deserialize, Serialize}; use serde_json; @@ -609,8 +607,6 @@ impl Handler for HeaderHandler { } } - - /// Gets block details given either a hash or an unspent commit /// GET /v1/blocks/ /// GET /v1/blocks/ @@ -846,8 +842,6 @@ fn just_response + Debug>(status: StatusCode, text: T) -> Response resp } -thread_local!( static ROUTER: RefCell> = RefCell::new(None) ); - /// Start all server HTTP handlers. Register all of them with Router /// and runs the corresponding HTTP server. /// @@ -867,29 +861,16 @@ pub fn start_rest_apis( .spawn(move || { let mut apis = ApiServer::new(); - ROUTER.with(|router| { - *router.borrow_mut() = - Some(build_router(chain, tx_pool, peers).expect("unable to build API router")); + let router = build_router(chain, tx_pool, peers).expect("unable to build API router"); - info!(LOGGER, "Starting HTTP API server at {}.", addr); - let socket_addr: SocketAddr = addr.parse().expect("unable to parse socket address"); - apis.start(socket_addr, &handle).unwrap_or_else(|e| { - error!(LOGGER, "Failed to start API HTTP server: {}.", e); - }); + info!(LOGGER, "Starting HTTP API server at {}.", addr); + let socket_addr: SocketAddr = addr.parse().expect("unable to parse socket address"); + apis.start(socket_addr, router).unwrap_or_else(|e| { + error!(LOGGER, "Failed to start API HTTP server: {}.", e); }); }); } -pub fn handle(req: Request) -> ResponseFuture { - ROUTER.with(|router| match *router.borrow() { - Some(ref h) => h.handle(req), - None => { - error!(LOGGER, "No HTTP API router configured"); - response(StatusCode::INTERNAL_SERVER_ERROR, "No router configured") - } - }) -} - fn parse_body(req: Request) -> Box + Send> where for<'de> T: Deserialize<'de> + Send + 'static, @@ -977,7 +958,11 @@ pub fn build_router( }; let mut router = Router::new(); - router.add_route("/v1/", Box::new(index_handler))?; + // example how we can use midlleware + router.add_route( + "/v1/", + Box::new(LoggingMiddleware::new(Box::new(index_handler))), + )?; router.add_route("/v1/blocks/*", Box::new(block_handler))?; router.add_route("/v1/headers/*", Box::new(header_handler))?; router.add_route("/v1/chain", Box::new(chain_tip_handler))?; diff --git a/api/src/rest.rs b/api/src/rest.rs index 3513474a9..14d4dcb93 100644 --- a/api/src/rest.rs +++ b/api/src/rest.rs @@ -19,14 +19,13 @@ //! register them on a ApiServer. use hyper::rt::Future; -use hyper::service::service_fn; -use hyper::{Body, Request, Server}; -use router::ResponseFuture; +use hyper::{rt, Body, Request, Server}; +use router::{Handler, HandlerObj, ResponseFuture, Router}; use std::fmt::{self, Display}; use std::net::SocketAddr; -use tokio::runtime::current_thread::Runtime; use failure::{Backtrace, Context, Fail}; +use util::LOGGER; /// Errors that can be returned by an ApiEndpoint implementation. #[derive(Debug)] @@ -95,18 +94,16 @@ impl ApiServer { } /// Starts the ApiServer at the provided address. - pub fn start(&mut self, addr: SocketAddr, f: &'static F) -> Result<(), String> - where - F: Fn(Request) -> ResponseFuture + Send + Sync + 'static, - { + pub fn start(&mut self, addr: SocketAddr, router: Router) -> Result<(), String> { let server = Server::bind(&addr) - .serve(move || service_fn(f)) + .serve(router) .map_err(|e| eprintln!("server error: {}", e)); - let mut rt = Runtime::new().unwrap(); - if rt.block_on(server).is_err() { - return Err("tokio block_on error".to_owned()); - } + //let mut rt = Runtime::new().unwrap(); + //if rt.block_on(server).is_err() { + // return Err("tokio block_on error".to_owned()); + //} + rt::run(server); Ok(()) } @@ -119,3 +116,21 @@ impl ApiServer { // } } } + +// Simple example of middleware +pub struct LoggingMiddleware { + next: HandlerObj, +} + +impl LoggingMiddleware { + pub fn new(next: HandlerObj) -> LoggingMiddleware { + LoggingMiddleware { next } + } +} + +impl Handler for LoggingMiddleware { + fn call(&self, req: Request) -> ResponseFuture { + debug!(LOGGER, "REST call: {} {}", req.method(), req.uri().path()); + self.next.call(req) + } +} diff --git a/api/src/router.rs b/api/src/router.rs index 3d2c4ebf1..47555f0cf 100644 --- a/api/src/router.rs +++ b/api/src/router.rs @@ -1,6 +1,7 @@ use futures::future; use hyper; use hyper::rt::Future; +use hyper::service::{NewService, Service}; use hyper::{Body, Method, Request, Response, StatusCode}; use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher}; @@ -49,6 +50,21 @@ pub trait Handler { fn connect(&self, _req: Request) -> ResponseFuture { not_found() } + + fn call(&self, req: Request) -> ResponseFuture { + match req.method() { + &Method::GET => self.get(req), + &Method::POST => self.post(req), + &Method::PUT => self.put(req), + &Method::DELETE => self.delete(req), + &Method::PATCH => self.patch(req), + &Method::OPTIONS => self.options(req), + &Method::CONNECT => self.connect(req), + &Method::TRACE => self.trace(req), + &Method::HEAD => self.head(req), + _ => not_found(), + } + } } #[derive(Fail, Debug)] pub enum RouterError { @@ -70,7 +86,7 @@ struct NodeId(usize); const MAX_CHILDREN: usize = 16; -type HandlerObj = Box; +pub type HandlerObj = Box; #[derive(Clone)] struct Node { @@ -122,7 +138,8 @@ impl Router { let keys = generate_path(route); let mut node_id = self.root(); for key in keys { - node_id = self.find(node_id, key) + node_id = self + .find(node_id, key) .unwrap_or_else(|| self.add_empty_node(node_id, key)); } match self.node(node_id).value() { @@ -145,26 +162,34 @@ impl Router { } self.node(node_id).value().ok_or(RouterError::NoValue) } +} - pub fn handle(&self, req: Request) -> ResponseFuture { +impl Service for Router { + type ReqBody = Body; + type ResBody = Body; + type Error = hyper::Error; + type Future = ResponseFuture; + + fn call(&mut self, req: Request) -> Self::Future { match self.get(req.uri().path()) { Err(_) => not_found(), - Ok(h) => match req.method() { - &Method::GET => h.get(req), - &Method::POST => h.post(req), - &Method::PUT => h.put(req), - &Method::DELETE => h.delete(req), - &Method::PATCH => h.patch(req), - &Method::OPTIONS => h.options(req), - &Method::CONNECT => h.connect(req), - &Method::TRACE => h.trace(req), - &Method::HEAD => h.head(req), - _ => not_found(), - }, + Ok(h) => h.call(req), } } } +impl NewService for Router { + type ReqBody = Body; + type ResBody = Body; + type Error = hyper::Error; + type InitError = hyper::Error; + type Service = Router; + type Future = Box + Send>; + fn new_service(&self) -> Self::Future { + Box::new(future::ok(self.clone())) + } +} + impl Node { fn new(key: u64, value: Option>) -> Node { Node { @@ -303,5 +328,4 @@ mod tests { assert_eq!(call_handler("/v1/zzz/2"), 103); assert_eq!(call_handler("/v1/zzz/2/zzz"), 106); } - } diff --git a/wallet/src/libwallet/controller.rs b/wallet/src/libwallet/controller.rs index b9aebdc12..5bdd86903 100644 --- a/wallet/src/libwallet/controller.rs +++ b/wallet/src/libwallet/controller.rs @@ -16,7 +16,6 @@ //! invocations) as needed. //! Still experimental use api::{ApiServer, Handler, ResponseFuture, Router}; -use std::cell::RefCell; use std::collections::HashMap; use std::marker::PhantomData; use std::net::SocketAddr; @@ -67,8 +66,6 @@ where Ok(()) } -thread_local!(static OWNER_ROUTER: RefCell> = RefCell::new(None)); - /// Listener version, providing same API but listening for requests on a /// port and wrapping the calls pub fn owner_listener(wallet: Box, addr: &str) -> Result<(), Error> @@ -81,38 +78,20 @@ where let wallet_arc = Arc::new(Mutex::new(wallet)); let api_handler = OwnerAPIHandler::new(wallet_arc); - let mut orouter = Router::new(); - orouter + let mut router = Router::new(); + router .add_route("/v1/wallet/owner/**", Box::new(api_handler)) .map_err(|_| ErrorKind::GenericError("Router failed to add route".to_string()))?; - OWNER_ROUTER.with(move |router| { - *router.borrow_mut() = Some(orouter); - let mut apis = ApiServer::new(); - info!(LOGGER, "Starting HTTP Owner API server at {}.", addr); - let socket_addr: SocketAddr = addr.parse().expect("unable to parse socket address"); - apis.start(socket_addr, &handle_owner).unwrap_or_else(|e| { - error!(LOGGER, "Failed to start API HTTP server: {}.", e); - }) + let mut apis = ApiServer::new(); + info!(LOGGER, "Starting HTTP Owner API server at {}.", addr); + let socket_addr: SocketAddr = addr.parse().expect("unable to parse socket address"); + apis.start(socket_addr, router).unwrap_or_else(|e| { + error!(LOGGER, "Failed to start API HTTP server: {}.", e); }); Ok(()) } -fn handle_owner(req: Request) -> ResponseFuture { - OWNER_ROUTER.with(|router| match *router.borrow() { - Some(ref h) => h.handle(req), - None => { - error!(LOGGER, "No HTTP API router configured"); - Box::new(ok(response( - StatusCode::INTERNAL_SERVER_ERROR, - "No router configured", - ))) - } - }) -} - -thread_local!(static FOREIGN_ROUTER: RefCell> = RefCell::new(None)); - /// Listener version, providing same API but listening for requests on a /// port and wrapping the calls pub fn foreign_listener(wallet: Box, addr: &str) -> Result<(), Error> @@ -128,33 +107,15 @@ where .add_route("/v1/wallet/foreign/**", Box::new(api_handler)) .map_err(|_| ErrorKind::GenericError("Router failed to add route".to_string()))?; - FOREIGN_ROUTER.with(move |frouter| { - *frouter.borrow_mut() = Some(router); - let mut apis = ApiServer::new(); - info!(LOGGER, "Starting HTTP Foreign API server at {}.", addr); - let socket_addr: SocketAddr = addr.parse().expect("unable to parse socket address"); - apis.start(socket_addr, &handle_foreign) - .unwrap_or_else(|e| { - error!(LOGGER, "Failed to start API HTTP server: {}.", e); - }); + let mut apis = ApiServer::new(); + info!(LOGGER, "Starting HTTP Foreign API server at {}.", addr); + let socket_addr: SocketAddr = addr.parse().expect("unable to parse socket address"); + apis.start(socket_addr, router).unwrap_or_else(|e| { + error!(LOGGER, "Failed to start API HTTP server: {}.", e); }); - Ok(()) } -fn handle_foreign(req: Request) -> ResponseFuture { - FOREIGN_ROUTER.with(|router| match *router.borrow() { - Some(ref h) => h.handle(req), - None => { - error!(LOGGER, "No HTTP API router configured"); - Box::new(ok(response( - StatusCode::INTERNAL_SERVER_ERROR, - "No router configured", - ))) - } - }) -} - type WalletResponseFuture = Box, Error = Error> + Send>; /// API Handler/Wrapper for owner functions