diff --git a/api/src/rest.rs b/api/src/rest.rs index 529327f0d..c402f34d7 100644 --- a/api/src/rest.rs +++ b/api/src/rest.rs @@ -24,11 +24,13 @@ use std::io::Read; use std::net::ToSocketAddrs; use std::string::ToString; use std::str::FromStr; +use std::mem; -use iron::{Iron, Request, Response, IronResult, IronError, status, headers}; +use iron::{Iron, Request, Response, IronResult, IronError, status, headers, Listening}; use iron::method::Method; use iron::modifiers::Header; use iron::middleware::Handler; +use iron::error::HttpResult; use router::Router; use serde::{Serialize, Deserialize}; use serde::de::DeserializeOwned; @@ -227,6 +229,8 @@ fn extract_param(req: &mut Request, param: &'static str) -> IronResult pub struct ApiServer { root: String, router: Router, + server_listener: Option, + } impl ApiServer { @@ -236,12 +240,24 @@ impl ApiServer { ApiServer { root: root, router: Router::new(), + server_listener: None, } } /// Starts the ApiServer at the provided address. - pub fn start(self, addr: A) -> Result<(), String> { - Iron::new(self.router).http(addr).map(|_| ()).map_err(|e| e.to_string()) + pub fn start(&mut self, addr: A) -> Result<(), String> { + //replace this value to satisfy borrow checker + let r = mem::replace(&mut self.router, Router::new()); + let result = Iron::new(r).http(addr); + let return_value = result.as_ref().map(|_| ()).map_err(|e| e.to_string()); + self.server_listener = Some(result.unwrap()); + return_value + } + + /// Stops the API server + pub fn stop(&mut self){ + let r = mem::replace(&mut self.server_listener, None); + r.unwrap().close().unwrap(); } /// Register a new API endpoint, providing a relative URL for the new diff --git a/grin/Cargo.toml b/grin/Cargo.toml index 8f40b3d43..a23a8023a 100644 --- a/grin/Cargo.toml +++ b/grin/Cargo.toml @@ -12,6 +12,7 @@ grin_store = { path = "../store" } grin_p2p = { path = "../p2p" } grin_pool = { path = "../pool" } grin_util = { path = "../util" } +grin_wallet = { path = "../wallet" } secp256k1zkp = { path = "../secp256k1zkp" } env_logger="^0.3.5" @@ -25,3 +26,4 @@ serde_derive = "~1.0.8" tokio-core="^0.1.1" tokio-timer="^0.1.0" rand = "^0.3" +tiny-keccak = "1.1" diff --git a/grin/tests/simulnet.rs b/grin/tests/simulnet.rs index 2858994f7..fe0328572 100644 --- a/grin/tests/simulnet.rs +++ b/grin/tests/simulnet.rs @@ -16,6 +16,10 @@ extern crate grin_grin as grin; extern crate grin_core as core; extern crate grin_p2p as p2p; extern crate grin_chain as chain; +extern crate grin_api as api; +extern crate grin_wallet as wallet; +extern crate secp256k1zkp as secp; +extern crate tiny_keccak; extern crate env_logger; extern crate futures; @@ -26,12 +30,249 @@ use std::io; use std::thread; use std::time; use std::default::Default; +use std::mem; use futures::{Future, Poll, Async}; use futures::task::park; use tokio_core::reactor; use tokio_timer::Timer; +use secp::Secp256k1; +use secp::key::SecretKey; +use tiny_keccak::Keccak; + +use wallet::WalletConfig; + + +/// Errors that can be returned by LocalServerContainer +#[derive(Debug)] +pub enum Error { + Internal(String), + Argument(String), + NotFound, +} + +/// A top-level container to hold everything that might be running +/// on a server, i.e. server, wallet in send or recieve mode + +struct LocalServerContainer { + pub working_dir: String, + pub server : grin::Server, + + pub enable_mining: bool, + pub enable_wallet: bool, + + pub wallet_port: u16, + wallet_is_running: bool, + + apis: api::ApiServer, +} + +impl LocalServerContainer { + pub fn new(api_addr:String, server_port: u16, event_loop: &reactor::Core) -> Result { + let working_dir = format!("target/test_servers/server-{}", server_port); + let mut s = grin::Server::future( + grin::ServerConfig{ + api_http_addr: api_addr, + db_root: format!("{}/grin-prop", working_dir), + cuckoo_size: 12, + p2p_config: p2p::P2PConfig{port: server_port, ..p2p::P2PConfig::default()}, + ..Default::default() + }, &event_loop.handle()).unwrap(); + Ok((LocalServerContainer { + server: s, + enable_mining: false, + enable_wallet: false, + wallet_port: 30000, + wallet_is_running: false, + working_dir: working_dir, + + apis: api::ApiServer::new("/v1".to_string()), + })) + } + + /// Starts a wallet daemon to receive and returns the + /// listening server url + + pub fn start_wallet(&mut self) -> String { + + //Just use the server address and port number for the wallet seed now + let url = format!("{}:{}", self.server.config.p2p_config.host, + self.wallet_port); + + let mut sha3 = Keccak::new_sha3_256(); + sha3.update(url.as_bytes()); + let mut seed = [0; 32]; + sha3.finalize(&mut seed); + + let s = Secp256k1::new(); + let key = wallet::ExtendedKey::from_seed(&s, &seed[..]) + .expect("Error deriving extended key from seed."); + + println!("Starting the Grin wallet receiving daemon on {} ", self.wallet_port ); + + let mut wallet_config = WalletConfig::default(); + wallet_config.data_file_dir=self.working_dir.clone(); + + self.apis.register_endpoint("/receive".to_string(), wallet::WalletReceiver { + key: key, + config: wallet_config, + }); + + let return_url = url.clone(); + self.apis.start(url).unwrap_or_else(|e| { + println!("Failed to start Grin wallet receiver: {}.", e); + }); + + self.wallet_is_running = true; + return return_url; + } + + /// Stops the wallet daemon + pub fn stop_wallet(&mut self){ + self.apis.stop(); + } +} + +struct LocalServerContainerPool { + event_loop: reactor::Core, + + base_http_addr: String, + base_port_server: u16, + base_port_api: u16, + base_port_wallet: u16, + server_containers: Vec, +} + +impl LocalServerContainerPool { + pub fn new() -> Result { + let servers = Vec::new(); + let mut evtlp = reactor::Core::new().unwrap(); + + Ok((LocalServerContainerPool{ + event_loop: evtlp, + base_http_addr : String::from("0.0.0.0"), + base_port_server: 15000, + base_port_api: 16000, + base_port_wallet: 17000, + server_containers: servers, + })) + } + + pub fn create_server(&mut self, enable_mining:bool, enable_wallet:bool ) { + + let server_port = self.base_port_server+self.server_containers.len() as u16; + let api_port = self.base_port_api+self.server_containers.len() as u16; + + + let api_addr = format!("{}:{}", self.base_http_addr, api_port); + + let mut server_container = LocalServerContainer::new(api_addr, server_port, &self.event_loop).unwrap(); + + server_container.enable_mining = enable_mining; + server_container.enable_wallet = enable_wallet; + + //if we want to start a wallet, use this port + server_container.wallet_port = self.base_port_wallet+self.server_containers.len() as u16; + + self.server_containers.push(server_container); + } + + /// Connects every server to each other as peers + /// + + pub fn connect_all_peers(&self){ + /// just pull out all currently active servers, build a list, + /// and feed into all servers + + let mut server_addresses:Vec = Vec::new(); + for s in &self.server_containers { + let server_address = format!("{}:{}", + s.server.config.p2p_config.host, + s.server.config.p2p_config.port); + server_addresses.push(server_address); + } + + for a in server_addresses { + for s in &self.server_containers { + if format!("{}", s.server.config.p2p_config.host) != a { + s.server.connect_peer(a.parse().unwrap()).unwrap(); + } + } + } + } + + ///Starts all servers, with or without mining + ///TODO: This should accept a closure so tests can determine what + ///to do when the run is finished + + pub fn start_all_servers(&mut self) { + + for s in &mut self.server_containers { + let mut wallet_url = String::from("http://localhost:13416"); + if s.enable_wallet == true { + wallet_url=s.start_wallet(); + //Instead of making all sorts of changes to the api server + //to support futures, just going to pause this thread for + //half a second for the wallet to start + //before continuing + + thread::sleep(time::Duration::from_millis(500)); + } + let mut miner_config = grin::MinerConfig{ + enable_mining: true, + burn_reward: true, + wallet_receiver_url : format!("http://{}", wallet_url), + ..Default::default() + }; + if s.enable_wallet == true { + miner_config.burn_reward = false; + } + if s.enable_mining == true { + println!("starting Miner on port {}", s.server.config.p2p_config.port); + s.server.start_miner(miner_config); + } + + } + + //borrow copy to allow access in closure + let mut server_containers = mem::replace(&mut self.server_containers, Vec::new()); + //let &mut server_containers = self.server_containers; + + self.event_loop.run(Timer::default().sleep(time::Duration::from_secs(60)).and_then(|_| { + //Stop any assocated wallet servers + for s in &mut server_containers { + if s.wallet_is_running{ + s.stop_wallet(); + } + } + //for s in &mut self.server_containers { + // occasionally 2 peers will connect to each other at the same time + //assert!(s.peer_count() >= 4); + //} + Ok(()) + })); + } + +} + +/// Just exercises the structures above, creates 5 servers, all starting wallets, +/// mining and connecting to each other in their own directories + +#[test] +fn simulate_parallel_miners(){ + env_logger::init(); + let num_servers=5; + + let mut server_pool = LocalServerContainerPool::new().unwrap(); + for n in 0..num_servers { + server_pool.create_server(true, true); + } + + server_pool.connect_all_peers(); + server_pool.start_all_servers(); +} + /// Create a network of 5 servers and mine a block, verifying that the block /// gets propagated to all. #[test] @@ -52,6 +293,7 @@ fn simulate_block_propagation() { for n in 0..5 { let s = grin::Server::future( grin::ServerConfig{ + api_http_addr: format!("127.0.0.1:{}", 20000+n), db_root: format!("target/grin-prop-{}", n), cuckoo_size: 12, p2p_config: p2p::P2PConfig{port: 10000+n, ..p2p::P2PConfig::default()}, diff --git a/src/bin/grin.rs b/src/bin/grin.rs index f1ae51934..8415ffe7f 100644 --- a/src/bin/grin.rs +++ b/src/bin/grin.rs @@ -42,6 +42,8 @@ use daemonize::Daemonize; use secp::Secp256k1; +use wallet::WalletConfig; + fn main() { env_logger::init().unwrap(); @@ -68,6 +70,11 @@ fn main() { .short("m") .long("mine") .help("Starts the debugging mining loop")) + .arg(Arg::with_name("wallet_url") + .short("w") + .long("wallet_url") + .help("A listening wallet receiver to which mining rewards will be sent") + .takes_value(true)) .arg(Arg::with_name("config") .short("c") .long("config") @@ -95,6 +102,16 @@ fn main() { .long("pass") .help("Wallet passphrase used to generate the private key seed") .takes_value(true)) + .arg(Arg::with_name("dir") + .short("d") + .long("dir") + .help("Directory in which to store wallet files (defaults to current directory)") + .takes_value(true)) + .arg(Arg::with_name("port") + .short("r") + .long("port") + .help("Port on which to run the wallet receiver when in receiver mode") + .takes_value(true)) .subcommand(SubCommand::with_name("receive") .about("Run the wallet in receiving mode. If an input file is provided, will process it, otherwise runs in server mode waiting for send requests.") .arg(Arg::with_name("input") @@ -154,6 +171,10 @@ fn server_command(server_args: &ArgMatches) { if server_args.is_present("mine") { server_config.mining_config.enable_mining = true; } + if let Some(wallet_url) = server_args.value_of("wallet_url") { + server_config.mining_config.wallet_receiver_url = wallet_url.to_string(); + } + if let Some(seeds) = server_args.values_of("seed") { server_config.seeding_type = grin::Seeding::List(seeds.map(|s| s.to_string()).collect()); } @@ -199,18 +220,35 @@ fn wallet_command(wallet_args: &ArgMatches) { let key = wallet::ExtendedKey::from_seed(&s, &seed[..]) .expect("Error deriving extended key from seed."); + let default_ip = "127.0.0.1"; + let mut addr = format!("{}:13416", default_ip); + + let mut wallet_config = WalletConfig::default(); + if let Some(port) = wallet_args.value_of("port") { + addr = format!("{}:{}", default_ip, port); + wallet_config.api_http_addr = format!("http://{}", addr).to_string(); + } + + if let Some(dir) = wallet_args.value_of("dir") { + wallet_config.data_file_dir = dir.to_string().clone(); + } + match wallet_args.subcommand() { + ("receive", Some(receive_args)) => { if let Some(f) = receive_args.value_of("input") { let mut file = File::open(f).expect("Unable to open transaction file."); let mut contents = String::new(); file.read_to_string(&mut contents).expect("Unable to read transaction file."); - wallet::receive_json_tx(&key, contents.as_str()).unwrap(); + wallet::receive_json_tx(&wallet_config, &key, contents.as_str()).unwrap(); } else { - info!("Starting the Grin wallet receiving daemon..."); + info!("Starting the Grin wallet receiving daemon at {}...", wallet_config.api_http_addr); let mut apis = api::ApiServer::new("/v1".to_string()); - apis.register_endpoint("/receive".to_string(), wallet::WalletReceiver { key: key }); - apis.start("127.0.0.1:13416").unwrap_or_else(|e| { + apis.register_endpoint("/receive".to_string(), wallet::WalletReceiver { + key: key, + config: wallet_config + }); + apis.start(addr).unwrap_or_else(|e| { error!("Failed to start Grin wallet receiver: {}.", e); }); } @@ -224,7 +262,7 @@ fn wallet_command(wallet_args: &ArgMatches) { if let Some(d) = send_args.value_of("dest") { dest = d; } - wallet::issue_send_tx(&key, amount, dest.to_string()).unwrap(); + wallet::issue_send_tx(&wallet_config, &key, amount, dest.to_string()).unwrap(); } _ => panic!("Unknown wallet command, use 'grin help wallet' for details"), } diff --git a/wallet/src/checker.rs b/wallet/src/checker.rs index 77d775e6b..fcba9748c 100644 --- a/wallet/src/checker.rs +++ b/wallet/src/checker.rs @@ -29,7 +29,7 @@ pub fn refresh_outputs(config: &WalletConfig, ext_key: &ExtendedKey) { let secp = secp::Secp256k1::with_caps(secp::ContextFlag::Commit); // operate within a lock on wallet data - WalletData::with_wallet(|wallet_data| { + WalletData::with_wallet(&config.data_file_dir, |wallet_data| { // check each output that's not spent for out in &mut wallet_data.outputs { diff --git a/wallet/src/lib.rs b/wallet/src/lib.rs index 59c4110c1..dca3decfb 100644 --- a/wallet/src/lib.rs +++ b/wallet/src/lib.rs @@ -38,3 +38,4 @@ mod types; pub use extkey::ExtendedKey; pub use receiver::{WalletReceiver, receive_json_tx}; pub use sender::issue_send_tx; +pub use types::WalletConfig; diff --git a/wallet/src/receiver.rs b/wallet/src/receiver.rs index 7a0fafd2f..672357b8b 100644 --- a/wallet/src/receiver.rs +++ b/wallet/src/receiver.rs @@ -69,12 +69,13 @@ struct TxWrapper { /// Receive an already well formed JSON transaction issuance and finalize the /// transaction, adding our receiving output, to broadcast to the rest of the /// network. -pub fn receive_json_tx(ext_key: &ExtendedKey, partial_tx_str: &str) -> Result<(), Error> { +pub fn receive_json_tx(config: &WalletConfig, ext_key: &ExtendedKey, partial_tx_str: &str) -> Result<(), Error> { + let (amount, blinding, partial_tx) = partial_tx_from_json(partial_tx_str)?; - let final_tx = receive_transaction(ext_key, amount, blinding, partial_tx)?; + let final_tx = receive_transaction(&config, ext_key, amount, blinding, partial_tx)?; let tx_hex = util::to_hex(ser::ser_vec(&final_tx).unwrap()); - let config = WalletConfig::default(); + let url = format!("{}/v1/pool/push", config.api_http_addr.as_str()); api::client::post(url.as_str(), &TxWrapper { tx_hex: tx_hex })?; Ok(()) @@ -98,6 +99,7 @@ pub struct CbData { #[derive(Clone)] pub struct WalletReceiver { pub key: ExtendedKey, + pub config: WalletConfig, } impl ApiEndpoint for WalletReceiver { @@ -118,7 +120,7 @@ impl ApiEndpoint for WalletReceiver { match op.as_str() { "coinbase" => { let (out, kern) = - receive_coinbase(&self.key, input.amount).map_err(|e| { + receive_coinbase(&self.config, &self.key, input.amount).map_err(|e| { api::Error::Internal(format!("Error building coinbase: {:?}", e)) })?; let out_bin = @@ -140,11 +142,11 @@ impl ApiEndpoint for WalletReceiver { } /// Build a coinbase output and the corresponding kernel -fn receive_coinbase(ext_key: &ExtendedKey, amount: u64) -> Result<(Output, TxKernel), Error> { +fn receive_coinbase(config: &WalletConfig, ext_key: &ExtendedKey, amount: u64) -> Result<(Output, TxKernel), Error> { let secp = secp::Secp256k1::with_caps(secp::ContextFlag::Commit); // operate within a lock on wallet data - WalletData::with_wallet(|wallet_data| { + WalletData::with_wallet(&config.data_file_dir, |wallet_data| { // derive a new private for the reward let next_child = wallet_data.next_child(ext_key.fingerprint); @@ -165,7 +167,8 @@ fn receive_coinbase(ext_key: &ExtendedKey, amount: u64) -> Result<(Output, TxKer } /// Builds a full transaction from the partial one sent to us for transfer -fn receive_transaction(ext_key: &ExtendedKey, +fn receive_transaction(config: &WalletConfig, + ext_key: &ExtendedKey, amount: u64, blinding: SecretKey, partial: Transaction) @@ -174,7 +177,7 @@ fn receive_transaction(ext_key: &ExtendedKey, let secp = secp::Secp256k1::with_caps(secp::ContextFlag::Commit); // operate within a lock on wallet data - WalletData::with_wallet(|wallet_data| { + WalletData::with_wallet(&config.data_file_dir, |wallet_data| { let next_child = wallet_data.next_child(ext_key.fingerprint); let out_key = ext_key.derive(&secp, next_child).map_err(|e| Error::Key(e))?; diff --git a/wallet/src/sender.rs b/wallet/src/sender.rs index 6898b0f0c..1674e92d7 100644 --- a/wallet/src/sender.rs +++ b/wallet/src/sender.rs @@ -25,10 +25,10 @@ use types::*; /// wallet /// UTXOs. The destination can be "stdout" (for command line) or a URL to the /// recipients wallet receiver (to be implemented). -pub fn issue_send_tx(ext_key: &ExtendedKey, amount: u64, dest: String) -> Result<(), Error> { +pub fn issue_send_tx(config: &WalletConfig, ext_key: &ExtendedKey, amount: u64, dest: String) -> Result<(), Error> { checker::refresh_outputs(&WalletConfig::default(), ext_key); - let (tx, blind_sum) = build_send_tx(ext_key, amount)?; + let (tx, blind_sum) = build_send_tx(config, ext_key, amount)?; let json_tx = partial_tx_to_json(amount, blind_sum, tx); if dest == "stdout" { println!("{}", json_tx); @@ -42,12 +42,12 @@ pub fn issue_send_tx(ext_key: &ExtendedKey, amount: u64, dest: String) -> Result /// Builds a transaction to send to someone from the HD seed associated with the /// wallet and the amount to send. Handles reading through the wallet data file, /// selecting outputs to spend and building the change. -fn build_send_tx(ext_key: &ExtendedKey, amount: u64) -> Result<(Transaction, SecretKey), Error> { +fn build_send_tx(config: &WalletConfig, ext_key: &ExtendedKey, amount: u64) -> Result<(Transaction, SecretKey), Error> { // first, rebuild the private key from the seed let secp = secp::Secp256k1::with_caps(secp::ContextFlag::Commit); // operate within a lock on wallet data - WalletData::with_wallet(|wallet_data| { + WalletData::with_wallet(&config.data_file_dir, |wallet_data| { // second, check from our local wallet data for outputs to spend let (coins, change) = wallet_data.select(ext_key.fingerprint, amount); diff --git a/wallet/src/types.rs b/wallet/src/types.rs index 17c192648..493b03e19 100644 --- a/wallet/src/types.rs +++ b/wallet/src/types.rs @@ -17,6 +17,7 @@ use std::fs::{self, File, OpenOptions}; use std::io::Write; use std::num; use std::path::Path; +use std::path::MAIN_SEPARATOR; use serde_json; @@ -78,11 +79,15 @@ impl From for Error { #[derive(Debug, Clone)] pub struct WalletConfig { pub api_http_addr: String, + pub data_file_dir: String, } impl Default for WalletConfig { fn default() -> WalletConfig { - WalletConfig { api_http_addr: "http://127.0.0.1:13415".to_string() } + WalletConfig { + api_http_addr: "http://127.0.0.1:13415".to_string(), + data_file_dir: ".".to_string(), + } } } @@ -140,23 +145,31 @@ impl WalletData { /// Note that due to the impossibility to do an actual file lock easily /// across operating systems, this just creates a lock file with a "should /// not exist" option. - pub fn with_wallet(f: F) -> Result + pub fn with_wallet(data_file_dir:&str, f: F) -> Result where F: FnOnce(&mut WalletData) -> T { + //create directory if it doesn't exist + fs::create_dir_all(data_file_dir).unwrap_or_else(|why| { + info!("! {:?}", why.kind()); + }); + + let data_file_path = &format!("{}{}{}", data_file_dir, MAIN_SEPARATOR, DAT_FILE); + let lock_file_path = &format!("{}{}{}", data_file_dir, MAIN_SEPARATOR, LOCK_FILE); + // create the lock files, if it already exists, will produce an error - OpenOptions::new().write(true).create_new(true).open(LOCK_FILE).map_err(|e| { + OpenOptions::new().write(true).create_new(true).open(lock_file_path).map_err(|e| { Error::WalletData(format!("Could not create wallet lock file. Either \ some other process is using the wallet or there's a write access \ issue.")) - })?; + })?; // do what needs to be done - let mut wdat = WalletData::read_or_create()?; + let mut wdat = WalletData::read_or_create(data_file_path)?; let res = f(&mut wdat); - wdat.write()?; + wdat.write(data_file_path)?; // delete the lock file - fs::remove_file(LOCK_FILE).map_err(|e| { + fs::remove_file(lock_file_path).map_err(|e| { Error::WalletData(format!("Could not remove wallet lock file. Maybe insufficient \ rights?")) })?; @@ -165,9 +178,9 @@ impl WalletData { } /// Read the wallet data or created a brand new one if it doesn't exist yet - fn read_or_create() -> Result { - if Path::new(DAT_FILE).exists() { - WalletData::read() + fn read_or_create(data_file_path:&str) -> Result { + if Path::new(data_file_path).exists() { + WalletData::read(data_file_path) } else { // just create a new instance, it will get written afterward Ok(WalletData { outputs: vec![] }) @@ -175,21 +188,21 @@ impl WalletData { } /// Read the wallet data from disk. - fn read() -> Result { - let data_file = File::open(DAT_FILE) - .map_err(|e| Error::WalletData(format!("Could not open {}: {}", DAT_FILE, e)))?; + fn read(data_file_path:&str) -> Result { + let data_file = File::open(data_file_path) + .map_err(|e| Error::WalletData(format!("Could not open {}: {}", data_file_path, e)))?; serde_json::from_reader(data_file) - .map_err(|e| Error::WalletData(format!("Error reading {}: {}", DAT_FILE, e))) + .map_err(|e| Error::WalletData(format!("Error reading {}: {}", data_file_path, e))) } /// Write the wallet data to disk. - fn write(&self) -> Result<(), Error> { - let mut data_file = File::create(DAT_FILE) - .map_err(|e| Error::WalletData(format!("Could not create {}: {}", DAT_FILE, e)))?; + fn write(&self, data_file_path:&str) -> Result<(), Error> { + let mut data_file = File::create(data_file_path) + .map_err(|e| Error::WalletData(format!("Could not create {}: {}", data_file_path, e)))?; let res_json = serde_json::to_vec_pretty(self) .map_err(|_| Error::WalletData(format!("Error serializing wallet data.")))?; data_file.write_all(res_json.as_slice()) - .map_err(|e| Error::WalletData(format!("Error writing {}: {}", DAT_FILE, e))) + .map_err(|e| Error::WalletData(format!("Error writing {}: {}", data_file_path, e))) } /// Append a new output information to the wallet data.