diff --git a/src/main.rs b/src/main.rs index e880030..ff9f871 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,9 +9,6 @@ use std::path::PathBuf; use std::sync::Arc; use tokio::runtime::Runtime; -#[macro_use] -extern crate lazy_static; - #[macro_use] extern crate clap; @@ -141,12 +138,7 @@ fn real_main() -> Result<(), Box> { }); // Start the mwixnet server - server::listen( - &server_config, - Arc::new(wallet), - Arc::new(node), - &stop_state, - ) + server::listen(server_config, Arc::new(wallet), Arc::new(node), stop_state) } async fn build_signals_fut() { diff --git a/src/onion.rs b/src/onion.rs index fe409d8..68b3833 100644 --- a/src/onion.rs +++ b/src/onion.rs @@ -15,10 +15,14 @@ use std::fmt; type HmacSha256 = Hmac; type RawBytes = Vec; +/// A data packet with layers of encryption #[derive(Clone, Debug, PartialEq)] pub struct Onion { + /// The onion originator's portion of the shared secret pub ephemeral_pubkey: PublicKey, + /// The pedersen commitment before adjusting the excess and subtracting the fee pub commit: Commitment, + /// The encrypted payloads which represent the layers of the onion pub enc_payloads: Vec, } diff --git a/src/server.rs b/src/server.rs index 483cbf5..5cfe8c5 100644 --- a/src/server.rs +++ b/src/server.rs @@ -15,14 +15,21 @@ use jsonrpc_http_server::*; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::{Arc, Mutex}; +use std::time::Duration; #[derive(Clone, Debug, PartialEq)] struct Submission { + /// The total excess for the output commitment excess: SecretKey, + /// The derived output commitment after applying excess and fee output_commit: Commitment, + /// The rangeproof, included only for the final hop (node N) rangeproof: Option, + /// Transaction input being spent input: Input, + /// Transaction fee fee: u64, + /// The remaining onion after peeling off our layer onion: Onion, } @@ -33,12 +40,8 @@ pub struct SwapReq { comsig: ComSignature, } -lazy_static! { - static ref SERVER_STATE: Mutex> = Mutex::new(HashMap::new()); -} - #[rpc(server)] -pub trait Server { +pub trait RPCServer { #[rpc(name = "swap")] fn swap(&self, swap: SwapReq) -> Result; @@ -48,28 +51,41 @@ pub trait Server { } #[derive(Clone)] -struct ServerImpl { +struct Server { server_config: ServerConfig, - stop_state: Arc, wallet: Arc, node: Arc, + submissions: Arc>>, } -impl ServerImpl { - fn new( - server_config: ServerConfig, - stop_state: Arc, - wallet: Arc, - node: Arc, - ) -> Self { - ServerImpl { +impl Server { + fn new(server_config: ServerConfig, wallet: Arc, node: Arc) -> Self { + Server { server_config, - stop_state, wallet, node, + submissions: Arc::new(Mutex::new(HashMap::new())), } } + /// Spin up an instance of the JSON-RPC HTTP server. + fn start_rpc(&self) -> jsonrpc_http_server::Server { + let mut io = IoHandler::new(); + io.extend_with(Server::to_delegate(self.clone())); + + ServerBuilder::new(io) + .cors(DomainsValidation::Disabled) + .request_middleware(|request: hyper::Request| { + if request.uri() == "/v1" { + request.into() + } else { + jsonrpc_http_server::Response::bad_request("Only v1 supported").into() + } + }) + .start_http(&self.server_config.addr) + .expect("Unable to start RPC server") + } + /// The fee base to use. For now, just using the default. fn get_fee_base(&self) -> u64 { DEFAULT_ACCEPT_FEE_BASE @@ -86,7 +102,7 @@ impl ServerImpl { /// /// Currently only a single mix node is used. Milestone 3 will include support for multiple mix nodes. fn execute_round(&self) -> crate::error::Result<()> { - let mut locked_state = SERVER_STATE.lock().unwrap(); + let mut locked_state = self.submissions.lock().unwrap(); let next_block_height = self.node.get_chain_height()? + 1; let spendable: Vec = locked_state @@ -142,7 +158,7 @@ impl ServerImpl { } } -impl Server for ServerImpl { +impl RPCServer for Server { /// Implements the 'swap' API fn swap(&self, swap: SwapReq) -> Result { // milestone 3: check that enc_payloads length matches number of configured servers @@ -205,7 +221,7 @@ impl Server for ServerImpl { return Err(Error::invalid_params("Rangeproof expected")); } - let mut locked = SERVER_STATE.lock().unwrap(); + let mut locked = self.submissions.lock().unwrap(); if locked.contains_key(&swap.onion.commit) { return Err(Error::invalid_params("swap already called for coin")); } @@ -227,56 +243,40 @@ impl Server for ServerImpl { /// Spin up the JSON-RPC web server pub fn listen( - server_config: &ServerConfig, + server_config: ServerConfig, wallet: Arc, node: Arc, - stop_state: &Arc, + stop_state: Arc, ) -> std::result::Result<(), Box> { - let server_impl = Arc::new(ServerImpl::new( + let server = Arc::new(Server::new( server_config.clone(), - stop_state.clone(), wallet.clone(), node.clone(), )); - let mut io = IoHandler::new(); - io.extend_with(ServerImpl::to_delegate(server_impl.as_ref().clone())); + let rpc_server = server.start_rpc(); + println!("Server listening on {}", server.server_config.addr); - let server = ServerBuilder::new(io) - .cors(DomainsValidation::Disabled) - .request_middleware(|request: hyper::Request| { - if request.uri() == "/v1" { - request.into() - } else { - jsonrpc_http_server::Response::bad_request("Only v1 supported").into() - } - }) - .start_http(&server_config.addr) - .expect("Unable to start RPC server"); - println!("Server listening on {}", server_config.addr); - - let close_handle = server.close_handle(); - - let config = server_config.clone(); + let close_handle = rpc_server.close_handle(); let round_handle = std::thread::spawn(move || { let mut secs = 0; loop { - if server_impl.as_ref().stop_state.is_stopped() { + if stop_state.is_stopped() { close_handle.close(); break; } - std::thread::sleep(std::time::Duration::from_secs(1)); - secs = (secs + 1) % config.interval_s; + std::thread::sleep(Duration::from_secs(1)); + secs = (secs + 1) % server.server_config.interval_s; if secs == 0 { - let _ = server_impl.as_ref().execute_round(); + let _ = server.execute_round(); secs = 0; } } }); - server.wait(); + rpc_server.wait(); round_handle.join().unwrap(); Ok(()) @@ -293,11 +293,8 @@ mod tests { use crate::wallet::{MockWallet, Wallet}; use grin_core::core::{Committed, FeeFields, Transaction}; - use grin_util::StopState; use std::net::TcpListener; use std::sync::Arc; - use std::thread; - use std::time::Duration; use hyper::{Body, Client, Request, Response}; use tokio::runtime; @@ -324,26 +321,18 @@ mod tests { wallet_owner_secret_path: None, }; - let threaded_rt = runtime::Runtime::new()?; - let (shutdown_sender, shutdown_receiver) = futures::channel::oneshot::channel(); + let server_impl = Arc::new(server::ServerImpl::new( + server_config.clone(), + wallet.clone(), + node.clone(), + )); + + // Start the server + let rpc_server = server_impl.start_server(); + let uri = format!("http://{}/v1", server_config.addr); - let stop_state = Arc::new(StopState::new()); - let stop_state_clone = stop_state.clone(); - - // Spawn the server task - threaded_rt.spawn(async move { - server::listen(&server_config, wallet, node, &stop_state).unwrap() - }); - - threaded_rt.spawn(async move { - futures::executor::block_on(shutdown_receiver).unwrap(); - stop_state_clone.stop(); - }); - - // Wait for listener - thread::sleep(Duration::from_millis(500)); - + let threaded_rt = runtime::Runtime::new()?; let do_request = async move { let request = Request::post(uri) .header("Content-Type", "application/json") @@ -356,15 +345,15 @@ mod tests { let response = threaded_rt.block_on(do_request)?; let response_str: String = threaded_rt.block_on(body_to_string(response)); - // Wait for at least one round to execute - thread::sleep(Duration::from_millis(1500)); - - shutdown_sender.send(()).ok(); - // Wait for shutdown - thread::sleep(Duration::from_millis(500)); threaded_rt.shutdown_background(); + // Execute one round + server_impl.execute_round()?; + + // Stop the server + rpc_server.close(); + Ok(response_str) }