small improvement on the servers test (#1737)

* cherry-pick from master for #1736
This commit is contained in:
Gary Yu 2018-10-14 20:13:49 +08:00 committed by GitHub
parent 43f4f92730
commit 2e6a242827
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 81 additions and 47 deletions

View file

@ -268,7 +268,7 @@ impl MessageHandler for Protocol {
let mut tmp_zip = BufWriter::new(File::create(file)?); let mut tmp_zip = BufWriter::new(File::create(file)?);
let total_size = sm_arch.bytes as usize; let total_size = sm_arch.bytes as usize;
let mut downloaded_size: usize = 0; let mut downloaded_size: usize = 0;
let mut request_size = cmp::min(48_000, sm_arch.bytes) as usize; let mut request_size = cmp::min(48_000, total_size);
while request_size > 0 { while request_size > 0 {
downloaded_size += msg.copy_attachment(request_size, &mut tmp_zip)?; downloaded_size += msg.copy_attachment(request_size, &mut tmp_zip)?;
request_size = cmp::min(48_000, total_size - downloaded_size); request_size = cmp::min(48_000, total_size - downloaded_size);

View file

@ -58,7 +58,7 @@ pub struct Server {
/// To be passed around to collect stats and info /// To be passed around to collect stats and info
state_info: ServerStateInfo, state_info: ServerStateInfo,
/// Stop flag /// Stop flag
stop: Arc<AtomicBool>, pub stop: Arc<AtomicBool>,
} }
impl Server { impl Server {
@ -89,7 +89,7 @@ impl Server {
if let Some(s) = enable_test_miner { if let Some(s) = enable_test_miner {
if s { if s {
serv.start_test_miner(test_miner_wallet_url); serv.start_test_miner(test_miner_wallet_url, serv.stop.clone());
} }
} }
@ -334,7 +334,8 @@ impl Server {
/// Start mining for blocks internally on a separate thread. Relies on /// Start mining for blocks internally on a separate thread. Relies on
/// internal miner, and should only be used for automated testing. Burns /// internal miner, and should only be used for automated testing. Burns
/// reward if wallet_listener_url is 'None' /// reward if wallet_listener_url is 'None'
pub fn start_test_miner(&self, wallet_listener_url: Option<String>) { pub fn start_test_miner(&self, wallet_listener_url: Option<String>, stop: Arc<AtomicBool>) {
info!(LOGGER, "start_test_miner - start",);
let sync_state = self.sync_state.clone(); let sync_state = self.sync_state.clone();
let config_wallet_url = match wallet_listener_url.clone() { let config_wallet_url = match wallet_listener_url.clone() {
Some(u) => u, Some(u) => u,
@ -355,7 +356,7 @@ impl Server {
self.chain.clone(), self.chain.clone(),
self.tx_pool.clone(), self.tx_pool.clone(),
self.verifier_cache.clone(), self.verifier_cache.clone(),
self.stop.clone(), stop,
); );
miner.set_debug_output_id(format!("Port {}", self.config.p2p_config.port)); miner.set_debug_output_id(format!("Port {}", self.config.p2p_config.port));
let _ = thread::Builder::new() let _ = thread::Builder::new()
@ -462,7 +463,8 @@ impl Server {
} }
/// Stops the test miner without stopping the p2p layer /// Stops the test miner without stopping the p2p layer
pub fn stop_test_miner(&self) { pub fn stop_test_miner(&self, stop: Arc<AtomicBool>) {
self.stop.store(true, Ordering::Relaxed); stop.store(true, Ordering::Relaxed);
info!(LOGGER, "stop_test_miner - stop",);
} }
} }

View file

@ -135,7 +135,7 @@ impl Miner {
// nothing has changed. We only want to create a new key_id for each new block. // nothing has changed. We only want to create a new key_id for each new block.
let mut key_id = None; let mut key_id = None;
loop { while !self.stop.load(Ordering::Relaxed) {
trace!(LOGGER, "in miner loop. key_id: {:?}", key_id); trace!(LOGGER, "in miner loop. key_id: {:?}", key_id);
// get the latest chain state and build a block on top of it // get the latest chain state and build a block on top of it
@ -183,10 +183,11 @@ impl Miner {
); );
key_id = block_fees.key_id(); key_id = block_fees.key_id();
} }
}
if self.stop.load(Ordering::Relaxed) { info!(
break; LOGGER,
} "(Server ID: {}) test miner exit.", self.debug_output_id
} );
} }
} }

View file

@ -222,7 +222,7 @@ impl LocalServerContainer {
"starting test Miner on port {}", "starting test Miner on port {}",
self.config.p2p_server_port self.config.p2p_server_port
); );
s.start_test_miner(wallet_url); s.start_test_miner(wallet_url, s.stop.clone());
} }
for p in &mut self.peer_list { for p in &mut self.peer_list {
@ -266,7 +266,7 @@ impl LocalServerContainer {
let client = HTTPWalletClient::new(&self.wallet_config.check_node_api_http_addr, None); let client = HTTPWalletClient::new(&self.wallet_config.check_node_api_http_addr, None);
if let Err(e) = r { if let Err(_e) = r {
//panic!("Error initializing wallet seed: {}", e); //panic!("Error initializing wallet seed: {}", e);
} }
@ -322,7 +322,7 @@ impl LocalServerContainer {
minimum_confirmations: u64, minimum_confirmations: u64,
selection_strategy: &str, selection_strategy: &str,
dest: &str, dest: &str,
fluff: bool, _fluff: bool,
) { ) {
let amount = core::core::amount_from_hr_string(amount) let amount = core::core::amount_from_hr_string(amount)
.expect("Could not parse amount as a number with optional decimal point."); .expect("Could not parse amount as a number with optional decimal point.");

View file

@ -20,16 +20,20 @@ extern crate grin_p2p as p2p;
extern crate grin_servers as servers; extern crate grin_servers as servers;
extern crate grin_util as util; extern crate grin_util as util;
extern crate grin_wallet as wallet; extern crate grin_wallet as wallet;
#[macro_use]
extern crate slog;
mod framework; mod framework;
use std::default::Default; use std::default::Default;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::{thread, time}; use std::{thread, time};
use core::core::hash::Hashed; use core::core::hash::Hashed;
use core::global::{self, ChainTypes}; use core::global::{self, ChainTypes};
use util::LOGGER;
use wallet::controller; use wallet::controller;
use wallet::libtx::slate::Slate; use wallet::libtx::slate::Slate;
use wallet::libwallet::types::{WalletBackend, WalletInst}; use wallet::libwallet::types::{WalletBackend, WalletInst};
@ -86,7 +90,7 @@ fn simulate_seeding() {
// Create a server pool // Create a server pool
let mut pool_config = LocalServerContainerPoolConfig::default(); let mut pool_config = LocalServerContainerPoolConfig::default();
pool_config.base_name = String::from(test_name_dir); pool_config.base_name = test_name_dir.to_string();
pool_config.run_length_in_seconds = 30; pool_config.run_length_in_seconds = 30;
// have to use different ports because of tests being run in parallel // have to use different ports because of tests being run in parallel
@ -110,10 +114,10 @@ fn simulate_seeding() {
// point next servers at first seed // point next servers at first seed
server_config.is_seeding = false; server_config.is_seeding = false;
server_config.seed_addr = String::from(format!( server_config.seed_addr = format!(
"{}:{}", "{}:{}",
server_config.base_addr, server_config.p2p_server_port server_config.base_addr, server_config.p2p_server_port
)); );
for _ in 0..4 { for _ in 0..4 {
pool.create_server(&mut server_config); pool.create_server(&mut server_config);
@ -138,15 +142,13 @@ fn simulate_seeding() {
} }
/// Create 1 server, start it mining, then connect 4 other peers mining and /// Create 1 server, start it mining, then connect 4 other peers mining and
/// using the first /// using the first as a seed. Meant to test the evolution of mining difficulty with miners
/// as a seed. Meant to test the evolution of mining difficulty with miners /// running at different rates.
/// running at ///
/// different rates /// TODO: Just going to comment this out as an automatically run test for the time
// Just going to comment this out as an automatically run test for the time /// being, As it's more for actively testing and hurts CI a lot
// being, #[ignore]
// As it's more for actively testing and hurts CI a lot #[test]
//#[test]
#[allow(dead_code)]
fn simulate_parallel_mining() { fn simulate_parallel_mining() {
global::set_mining_mode(ChainTypes::AutomatedTesting); global::set_mining_mode(ChainTypes::AutomatedTesting);
@ -155,7 +157,7 @@ fn simulate_parallel_mining() {
// Create a server pool // Create a server pool
let mut pool_config = LocalServerContainerPoolConfig::default(); let mut pool_config = LocalServerContainerPoolConfig::default();
pool_config.base_name = String::from(test_name_dir); pool_config.base_name = test_name_dir.to_string();
pool_config.run_length_in_seconds = 60; pool_config.run_length_in_seconds = 60;
// have to use different ports because of tests being run in parallel // have to use different ports because of tests being run in parallel
pool_config.base_api_port = 30040; pool_config.base_api_port = 30040;
@ -174,10 +176,10 @@ fn simulate_parallel_mining() {
// point next servers at first seed // point next servers at first seed
server_config.is_seeding = false; server_config.is_seeding = false;
server_config.seed_addr = String::from(format!( server_config.seed_addr = format!(
"{}:{}", "{}:{}",
server_config.base_addr, server_config.p2p_server_port server_config.base_addr, server_config.p2p_server_port
)); );
// And create 4 more, then let them run for a while // And create 4 more, then let them run for a while
for i in 1..4 { for i in 1..4 {
@ -219,7 +221,8 @@ fn simulate_block_propagation() {
} }
// start mining // start mining
servers[0].start_test_miner(None); let stop = Arc::new(AtomicBool::new(false));
servers[0].start_test_miner(None, stop.clone());
// monitor for a change of head on a different server and check whether // monitor for a change of head on a different server and check whether
// chain height has changed // chain height has changed
@ -238,9 +241,15 @@ fn simulate_block_propagation() {
} }
thread::sleep(time::Duration::from_millis(1_000)); thread::sleep(time::Duration::from_millis(1_000));
time_spent += 1; time_spent += 1;
if time_spent >= 60 { if time_spent >= 30 {
info!(LOGGER, "simulate_block_propagation - fail on timeout",);
break; break;
} }
// stop mining after 8s
if time_spent == 8 {
servers[0].stop_test_miner(stop.clone());
}
} }
for n in 0..5 { for n in 0..5 {
servers[n].stop(); servers[n].stop();
@ -265,21 +274,30 @@ fn simulate_full_sync() {
let s1 = servers::Server::new(framework::config(1000, "grin-sync", 1000)).unwrap(); let s1 = servers::Server::new(framework::config(1000, "grin-sync", 1000)).unwrap();
// mine a few blocks on server 1 // mine a few blocks on server 1
s1.start_test_miner(None); let stop = Arc::new(AtomicBool::new(false));
s1.start_test_miner(None, stop.clone());
thread::sleep(time::Duration::from_secs(8)); thread::sleep(time::Duration::from_secs(8));
s1.stop_test_miner(stop);
let s2 = servers::Server::new(framework::config(1001, "grin-sync", 1000)).unwrap(); let s2 = servers::Server::new(framework::config(1001, "grin-sync", 1000)).unwrap();
// Get the current header from s1. // Get the current header from s1.
let s1_header = s1.chain.head_header().unwrap(); let s1_header = s1.chain.head_header().unwrap();
info!(
LOGGER,
"simulate_full_sync - s1 header head: {} at {}",
s1_header.hash(),
s1_header.height
);
// Wait for s2 to sync up to and including the header from s1. // Wait for s2 to sync up to and including the header from s1.
let mut time_spent = 0; let mut time_spent = 0;
while s2.head().height < s1_header.height { while s2.head().height < s1_header.height {
thread::sleep(time::Duration::from_millis(1_000)); thread::sleep(time::Duration::from_millis(1_000));
time_spent += 1; time_spent += 1;
if time_spent >= 60 { if time_spent >= 30 {
println!( info!(
LOGGER,
"sync fail. s2.head().height: {}, s1_header.height: {}", "sync fail. s2.head().height: {}, s1_header.height: {}",
s2.head().height, s2.head().height,
s1_header.height s1_header.height
@ -314,29 +332,36 @@ fn simulate_fast_sync() {
// start s1 and mine enough blocks to get beyond the fast sync horizon // start s1 and mine enough blocks to get beyond the fast sync horizon
let s1 = servers::Server::new(framework::config(2000, "grin-fast", 2000)).unwrap(); let s1 = servers::Server::new(framework::config(2000, "grin-fast", 2000)).unwrap();
s1.start_test_miner(None); let stop = Arc::new(AtomicBool::new(false));
s1.start_test_miner(None, stop.clone());
while s1.head().height < 20 { while s1.head().height < 20 {
thread::sleep(time::Duration::from_millis(1_000)); thread::sleep(time::Duration::from_millis(1_000));
} }
s1.stop_test_miner(stop);
let mut conf = config(2001, "grin-fast", 2000); let mut conf = config(2001, "grin-fast", 2000);
conf.archive_mode = Some(false); conf.archive_mode = Some(false);
let s2 = servers::Server::new(conf).unwrap(); let s2 = servers::Server::new(conf).unwrap();
while s2.header_head().height < 1 {
let _ = s2.ping_peers();
thread::sleep(time::Duration::from_millis(1_000));
}
s1.stop_test_miner();
// Get the current header from s1. // Get the current header from s1.
let s1_header = s1.chain.head_header().unwrap(); let s1_header = s1.chain.head_header().unwrap();
// Wait for s2 to sync up to and including the header from s1. // Wait for s2 to sync up to and including the header from s1.
let mut total_wait = 0;
while s2.head().height < s1_header.height { while s2.head().height < s1_header.height {
thread::sleep(time::Duration::from_millis(1_000)); thread::sleep(time::Duration::from_millis(1_000));
total_wait += 1;
if total_wait >= 30 {
error!(
LOGGER,
"simulate_fast_sync test fail on timeout! s2 height: {}, s1 height: {}",
s2.head().height,
s1_header.height,
);
break;
}
} }
// Confirm both s1 and s2 see a consistent header at that height. // Confirm both s1 and s2 see a consistent header at that height.
@ -364,7 +389,7 @@ fn simulate_fast_sync_double() {
let s1 = servers::Server::new(framework::config(3000, "grin-double-fast1", 3000)).unwrap(); let s1 = servers::Server::new(framework::config(3000, "grin-double-fast1", 3000)).unwrap();
// mine a few blocks on server 1 // mine a few blocks on server 1
s1.start_test_miner(None); s1.start_test_miner(None, s1.stop.clone());
thread::sleep(time::Duration::from_secs(8)); thread::sleep(time::Duration::from_secs(8));
{ {
@ -447,7 +472,7 @@ fn replicate_tx_fluff_failure() {
s1_config.dandelion_config.relay_secs = Some(1); s1_config.dandelion_config.relay_secs = Some(1);
let s1 = servers::Server::new(s1_config.clone()).unwrap(); let s1 = servers::Server::new(s1_config.clone()).unwrap();
// Mine off of server 1 // Mine off of server 1
s1.start_test_miner(s1_config.test_miner_wallet_url); s1.start_test_miner(s1_config.test_miner_wallet_url, s1.stop.clone());
thread::sleep(time::Duration::from_secs(5)); thread::sleep(time::Duration::from_secs(5));
// Server 2 (another node) // Server 2 (another node)

View file

@ -33,6 +33,8 @@ use std::io::prelude::{BufRead, Write};
use std::net::TcpStream; use std::net::TcpStream;
use std::process; use std::process;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::{thread, time}; use std::{thread, time};
use core::global::{self, ChainTypes}; use core::global::{self, ChainTypes};
@ -149,12 +151,13 @@ fn basic_stratum_server() {
info!(LOGGER, "stratum server and worker stats verification ok"); info!(LOGGER, "stratum server and worker stats verification ok");
// Start mining blocks // Start mining blocks
s.start_test_miner(None); let stop = Arc::new(AtomicBool::new(false));
s.start_test_miner(None, stop.clone());
info!(LOGGER, "test miner started"); info!(LOGGER, "test miner started");
// This test is supposed to complete in 3 seconds, // This test is supposed to complete in 3 seconds,
// so let's set a timeout on 10s to avoid infinite waiting happened in Travis-CI. // so let's set a timeout on 10s to avoid infinite waiting happened in Travis-CI.
let handler = thread::spawn(|| { let _handler = thread::spawn(|| {
thread::sleep(time::Duration::from_secs(10)); thread::sleep(time::Duration::from_secs(10));
error!(LOGGER, "basic_stratum_server test fail on timeout!"); error!(LOGGER, "basic_stratum_server test fail on timeout!");
thread::sleep(time::Duration::from_millis(100)); thread::sleep(time::Duration::from_millis(100));
@ -164,9 +167,12 @@ fn basic_stratum_server() {
// Simulate a worker lost connection // Simulate a worker lost connection
workers.remove(1); workers.remove(1);
// Wait for a few mined blocks
thread::sleep(time::Duration::from_secs(3));
s.stop_test_miner(stop);
// Verify blocks are being broadcast to workers // Verify blocks are being broadcast to workers
let expected = String::from("job"); let expected = String::from("job");
thread::sleep(time::Duration::from_secs(3)); // Wait for a few mined blocks
let mut jobtemplate = String::new(); let mut jobtemplate = String::new();
let _st = workers[2].read_line(&mut jobtemplate); let _st = workers[2].read_line(&mut jobtemplate);
let job_template: Value = serde_json::from_str(&jobtemplate).unwrap(); let job_template: Value = serde_json::from_str(&jobtemplate).unwrap();