cleanup server to improve testability

This commit is contained in:
scilio 2022-07-28 17:53:48 -04:00
parent e902f78e32
commit 326eb943f3
3 changed files with 67 additions and 82 deletions

View file

@ -9,9 +9,6 @@ use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use tokio::runtime::Runtime; use tokio::runtime::Runtime;
#[macro_use]
extern crate lazy_static;
#[macro_use] #[macro_use]
extern crate clap; extern crate clap;
@ -141,12 +138,7 @@ fn real_main() -> Result<(), Box<dyn std::error::Error>> {
}); });
// Start the mwixnet server // Start the mwixnet server
server::listen( server::listen(server_config, Arc::new(wallet), Arc::new(node), stop_state)
&server_config,
Arc::new(wallet),
Arc::new(node),
&stop_state,
)
} }
async fn build_signals_fut() { async fn build_signals_fut() {

View file

@ -15,10 +15,14 @@ use std::fmt;
type HmacSha256 = Hmac<Sha256>; type HmacSha256 = Hmac<Sha256>;
type RawBytes = Vec<u8>; type RawBytes = Vec<u8>;
/// A data packet with layers of encryption
#[derive(Clone, Debug, PartialEq)] #[derive(Clone, Debug, PartialEq)]
pub struct Onion { pub struct Onion {
/// The onion originator's portion of the shared secret
pub ephemeral_pubkey: PublicKey, pub ephemeral_pubkey: PublicKey,
/// The pedersen commitment before adjusting the excess and subtracting the fee
pub commit: Commitment, pub commit: Commitment,
/// The encrypted payloads which represent the layers of the onion
pub enc_payloads: Vec<RawBytes>, pub enc_payloads: Vec<RawBytes>,
} }

View file

@ -15,14 +15,21 @@ use jsonrpc_http_server::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::Duration;
#[derive(Clone, Debug, PartialEq)] #[derive(Clone, Debug, PartialEq)]
struct Submission { struct Submission {
/// The total excess for the output commitment
excess: SecretKey, excess: SecretKey,
/// The derived output commitment after applying excess and fee
output_commit: Commitment, output_commit: Commitment,
/// The rangeproof, included only for the final hop (node N)
rangeproof: Option<RangeProof>, rangeproof: Option<RangeProof>,
/// Transaction input being spent
input: Input, input: Input,
/// Transaction fee
fee: u64, fee: u64,
/// The remaining onion after peeling off our layer
onion: Onion, onion: Onion,
} }
@ -33,12 +40,8 @@ pub struct SwapReq {
comsig: ComSignature, comsig: ComSignature,
} }
lazy_static! {
static ref SERVER_STATE: Mutex<HashMap<Commitment, Submission>> = Mutex::new(HashMap::new());
}
#[rpc(server)] #[rpc(server)]
pub trait Server { pub trait RPCServer {
#[rpc(name = "swap")] #[rpc(name = "swap")]
fn swap(&self, swap: SwapReq) -> Result<Value>; fn swap(&self, swap: SwapReq) -> Result<Value>;
@ -48,28 +51,41 @@ pub trait Server {
} }
#[derive(Clone)] #[derive(Clone)]
struct ServerImpl { struct Server {
server_config: ServerConfig, server_config: ServerConfig,
stop_state: Arc<StopState>,
wallet: Arc<dyn Wallet>, wallet: Arc<dyn Wallet>,
node: Arc<dyn GrinNode>, node: Arc<dyn GrinNode>,
submissions: Arc<Mutex<HashMap<Commitment, Submission>>>,
} }
impl ServerImpl { impl Server {
fn new( fn new(server_config: ServerConfig, wallet: Arc<dyn Wallet>, node: Arc<dyn GrinNode>) -> Self {
server_config: ServerConfig, Server {
stop_state: Arc<StopState>,
wallet: Arc<dyn Wallet>,
node: Arc<dyn GrinNode>,
) -> Self {
ServerImpl {
server_config, server_config,
stop_state,
wallet, wallet,
node, node,
submissions: Arc::new(Mutex::new(HashMap::new())),
} }
} }
/// Spin up an instance of the JSON-RPC HTTP server.
fn start_rpc(&self) -> jsonrpc_http_server::Server {
let mut io = IoHandler::new();
io.extend_with(Server::to_delegate(self.clone()));
ServerBuilder::new(io)
.cors(DomainsValidation::Disabled)
.request_middleware(|request: hyper::Request<hyper::Body>| {
if request.uri() == "/v1" {
request.into()
} else {
jsonrpc_http_server::Response::bad_request("Only v1 supported").into()
}
})
.start_http(&self.server_config.addr)
.expect("Unable to start RPC server")
}
/// The fee base to use. For now, just using the default. /// The fee base to use. For now, just using the default.
fn get_fee_base(&self) -> u64 { fn get_fee_base(&self) -> u64 {
DEFAULT_ACCEPT_FEE_BASE DEFAULT_ACCEPT_FEE_BASE
@ -86,7 +102,7 @@ impl ServerImpl {
/// ///
/// Currently only a single mix node is used. Milestone 3 will include support for multiple mix nodes. /// Currently only a single mix node is used. Milestone 3 will include support for multiple mix nodes.
fn execute_round(&self) -> crate::error::Result<()> { fn execute_round(&self) -> crate::error::Result<()> {
let mut locked_state = SERVER_STATE.lock().unwrap(); let mut locked_state = self.submissions.lock().unwrap();
let next_block_height = self.node.get_chain_height()? + 1; let next_block_height = self.node.get_chain_height()? + 1;
let spendable: Vec<Submission> = locked_state let spendable: Vec<Submission> = locked_state
@ -142,7 +158,7 @@ impl ServerImpl {
} }
} }
impl Server for ServerImpl { impl RPCServer for Server {
/// Implements the 'swap' API /// Implements the 'swap' API
fn swap(&self, swap: SwapReq) -> Result<Value> { fn swap(&self, swap: SwapReq) -> Result<Value> {
// milestone 3: check that enc_payloads length matches number of configured servers // milestone 3: check that enc_payloads length matches number of configured servers
@ -205,7 +221,7 @@ impl Server for ServerImpl {
return Err(Error::invalid_params("Rangeproof expected")); return Err(Error::invalid_params("Rangeproof expected"));
} }
let mut locked = SERVER_STATE.lock().unwrap(); let mut locked = self.submissions.lock().unwrap();
if locked.contains_key(&swap.onion.commit) { if locked.contains_key(&swap.onion.commit) {
return Err(Error::invalid_params("swap already called for coin")); return Err(Error::invalid_params("swap already called for coin"));
} }
@ -227,56 +243,40 @@ impl Server for ServerImpl {
/// Spin up the JSON-RPC web server /// Spin up the JSON-RPC web server
pub fn listen( pub fn listen(
server_config: &ServerConfig, server_config: ServerConfig,
wallet: Arc<dyn Wallet>, wallet: Arc<dyn Wallet>,
node: Arc<dyn GrinNode>, node: Arc<dyn GrinNode>,
stop_state: &Arc<StopState>, stop_state: Arc<StopState>,
) -> std::result::Result<(), Box<dyn std::error::Error>> { ) -> std::result::Result<(), Box<dyn std::error::Error>> {
let server_impl = Arc::new(ServerImpl::new( let server = Arc::new(Server::new(
server_config.clone(), server_config.clone(),
stop_state.clone(),
wallet.clone(), wallet.clone(),
node.clone(), node.clone(),
)); ));
let mut io = IoHandler::new(); let rpc_server = server.start_rpc();
io.extend_with(ServerImpl::to_delegate(server_impl.as_ref().clone())); println!("Server listening on {}", server.server_config.addr);
let server = ServerBuilder::new(io) let close_handle = rpc_server.close_handle();
.cors(DomainsValidation::Disabled)
.request_middleware(|request: hyper::Request<hyper::Body>| {
if request.uri() == "/v1" {
request.into()
} else {
jsonrpc_http_server::Response::bad_request("Only v1 supported").into()
}
})
.start_http(&server_config.addr)
.expect("Unable to start RPC server");
println!("Server listening on {}", server_config.addr);
let close_handle = server.close_handle();
let config = server_config.clone();
let round_handle = std::thread::spawn(move || { let round_handle = std::thread::spawn(move || {
let mut secs = 0; let mut secs = 0;
loop { loop {
if server_impl.as_ref().stop_state.is_stopped() { if stop_state.is_stopped() {
close_handle.close(); close_handle.close();
break; break;
} }
std::thread::sleep(std::time::Duration::from_secs(1)); std::thread::sleep(Duration::from_secs(1));
secs = (secs + 1) % config.interval_s; secs = (secs + 1) % server.server_config.interval_s;
if secs == 0 { if secs == 0 {
let _ = server_impl.as_ref().execute_round(); let _ = server.execute_round();
secs = 0; secs = 0;
} }
} }
}); });
server.wait(); rpc_server.wait();
round_handle.join().unwrap(); round_handle.join().unwrap();
Ok(()) Ok(())
@ -293,11 +293,8 @@ mod tests {
use crate::wallet::{MockWallet, Wallet}; use crate::wallet::{MockWallet, Wallet};
use grin_core::core::{Committed, FeeFields, Transaction}; use grin_core::core::{Committed, FeeFields, Transaction};
use grin_util::StopState;
use std::net::TcpListener; use std::net::TcpListener;
use std::sync::Arc; use std::sync::Arc;
use std::thread;
use std::time::Duration;
use hyper::{Body, Client, Request, Response}; use hyper::{Body, Client, Request, Response};
use tokio::runtime; use tokio::runtime;
@ -324,26 +321,18 @@ mod tests {
wallet_owner_secret_path: None, wallet_owner_secret_path: None,
}; };
let threaded_rt = runtime::Runtime::new()?; let server_impl = Arc::new(server::ServerImpl::new(
let (shutdown_sender, shutdown_receiver) = futures::channel::oneshot::channel(); server_config.clone(),
wallet.clone(),
node.clone(),
));
// Start the server
let rpc_server = server_impl.start_server();
let uri = format!("http://{}/v1", server_config.addr); let uri = format!("http://{}/v1", server_config.addr);
let stop_state = Arc::new(StopState::new()); let threaded_rt = runtime::Runtime::new()?;
let stop_state_clone = stop_state.clone();
// Spawn the server task
threaded_rt.spawn(async move {
server::listen(&server_config, wallet, node, &stop_state).unwrap()
});
threaded_rt.spawn(async move {
futures::executor::block_on(shutdown_receiver).unwrap();
stop_state_clone.stop();
});
// Wait for listener
thread::sleep(Duration::from_millis(500));
let do_request = async move { let do_request = async move {
let request = Request::post(uri) let request = Request::post(uri)
.header("Content-Type", "application/json") .header("Content-Type", "application/json")
@ -356,15 +345,15 @@ mod tests {
let response = threaded_rt.block_on(do_request)?; let response = threaded_rt.block_on(do_request)?;
let response_str: String = threaded_rt.block_on(body_to_string(response)); let response_str: String = threaded_rt.block_on(body_to_string(response));
// Wait for at least one round to execute
thread::sleep(Duration::from_millis(1500));
shutdown_sender.send(()).ok();
// Wait for shutdown // Wait for shutdown
thread::sleep(Duration::from_millis(500));
threaded_rt.shutdown_background(); threaded_rt.shutdown_background();
// Execute one round
server_impl.execute_round()?;
// Stop the server
rpc_server.close();
Ok(response_str) Ok(response_str)
} }