From efdbcd600a8745170ed394814c6c7904b039833b Mon Sep 17 00:00:00 2001 From: Ignotus Peverell Date: Tue, 17 Oct 2017 12:18:21 +0000 Subject: [PATCH] Miner process wait until sync. Fix #185 --- grin/src/adapters.rs | 66 ++++++++++++++++++------------------- grin/src/seed.rs | 78 ++++++++++++++++++++++++-------------------- grin/src/server.rs | 11 ++++++- grin/src/sync.rs | 12 ++++--- pow/Cargo.toml | 2 +- 5 files changed, 95 insertions(+), 74 deletions(-) diff --git a/grin/src/adapters.rs b/grin/src/adapters.rs index 216825cb7..ac63f4dc8 100644 --- a/grin/src/adapters.rs +++ b/grin/src/adapters.rs @@ -167,8 +167,11 @@ impl NetAdapter for NetToChainAdapter { /// Find good peers we know with the provided capability and return their /// addresses. fn find_peer_addrs(&self, capab: p2p::Capabilities) -> Vec { - let peers = self.peer_store - .find_peers(State::Healthy, capab, p2p::MAX_PEER_ADDRS as usize); + let peers = self.peer_store.find_peers( + State::Healthy, + capab, + p2p::MAX_PEER_ADDRS as usize, + ); debug!(LOGGER, "Got {} peer addrs to send.", peers.len()); map_vec!(peers, |p| p.addr) } @@ -210,10 +213,11 @@ impl NetAdapter for NetToChainAdapter { } impl NetToChainAdapter { - pub fn new(chain_ref: Arc, - tx_pool: Arc>>, - peer_store: Arc) - -> NetToChainAdapter { + pub fn new( + chain_ref: Arc, + tx_pool: Arc>>, + peer_store: Arc, + ) -> NetToChainAdapter { NetToChainAdapter { chain: chain_ref, peer_store: peer_store, @@ -227,19 +231,15 @@ impl NetToChainAdapter { pub fn start_sync(&self, sync: sync::Syncer) { let arc_sync = Arc::new(sync); self.syncer.init(arc_sync.clone()); - let spawn_result = thread::Builder::new() - .name("syncer".to_string()) - .spawn(move || { - let sync_run_result = arc_sync.run(); - match sync_run_result { - Ok(_) => {} - Err(_) => {} - } - }); - match spawn_result { - Ok(_) => {} - Err(_) => {} - } + let _ = thread::Builder::new().name("syncer".to_string()).spawn( + move || { + let _ = arc_sync.run(); + }, + ); + } + + pub fn syncing(&self) -> bool { + self.syncer.borrow().syncing() } /// Prepare options for the chain pipeline @@ -284,8 +284,9 @@ impl ChainAdapter for ChainToPoolAndNetAdapter { } impl ChainToPoolAndNetAdapter { - pub fn new(tx_pool: Arc>>) - -> ChainToPoolAndNetAdapter { + pub fn new( + tx_pool: Arc>>, + ) -> ChainToPoolAndNetAdapter { ChainToPoolAndNetAdapter { tx_pool: tx_pool, p2p: OneTime::new(), @@ -317,19 +318,19 @@ impl PoolToChainAdapter { impl pool::BlockChain for PoolToChainAdapter { fn get_unspent(&self, output_ref: &Commitment) -> Result { - self.chain - .borrow() - .get_unspent(output_ref) - .map_err(|e| match e { + self.chain.borrow().get_unspent(output_ref).map_err( + |e| match e { chain::types::Error::OutputNotFound => pool::PoolError::OutputNotFound, chain::types::Error::OutputSpent => pool::PoolError::OutputSpent, _ => pool::PoolError::GenericPoolError, - }) + }, + ) } - fn get_block_header_by_output_commit(&self, - commit: &Commitment) - -> Result { + fn get_block_header_by_output_commit( + &self, + commit: &Commitment, + ) -> Result { self.chain .borrow() .get_block_header_by_output_commit(commit) @@ -337,9 +338,8 @@ impl pool::BlockChain for PoolToChainAdapter { } fn head_header(&self) -> Result { - self.chain - .borrow() - .head_header() - .map_err(|_| pool::PoolError::GenericPoolError) + self.chain.borrow().head_header().map_err(|_| { + pool::PoolError::GenericPoolError + }) } } diff --git a/grin/src/seed.rs b/grin/src/seed.rs index 574dfbc54..0860ec3e0 100644 --- a/grin/src/seed.rs +++ b/grin/src/seed.rs @@ -45,10 +45,11 @@ pub struct Seeder { } impl Seeder { - pub fn new(capabilities: p2p::Capabilities, - peer_store: Arc, - p2p: Arc) - -> Seeder { + pub fn new( + capabilities: p2p::Capabilities, + peer_store: Arc, + p2p: Arc, + ) -> Seeder { Seeder { peer_store: peer_store, p2p: p2p, @@ -56,17 +57,20 @@ impl Seeder { } } - pub fn connect_and_monitor(&self, - h: reactor::Handle, - seed_list: Box, Error = String>>) { + pub fn connect_and_monitor( + &self, + h: reactor::Handle, + seed_list: Box, Error = String>>, + ) { // open a channel with a listener that connects every peer address sent below // max peer count let (tx, rx) = futures::sync::mpsc::unbounded(); h.spawn(self.listen_for_addrs(h.clone(), rx)); // check seeds and start monitoring connections - let seeder = self.connect_to_seeds(tx.clone(), seed_list) - .join(self.monitor_peers(tx.clone())); + let seeder = self.connect_to_seeds(tx.clone(), seed_list).join( + self.monitor_peers(tx.clone()), + ); h.spawn(seeder.map(|_| ()).map_err(|e| { error!(LOGGER, "Seeding or peer monitoring error: {}", e); @@ -74,9 +78,10 @@ impl Seeder { })); } - fn monitor_peers(&self, - tx: mpsc::UnboundedSender) - -> Box> { + fn monitor_peers( + &self, + tx: mpsc::UnboundedSender, + ) -> Box> { let peer_store = self.peer_store.clone(); let p2p_server = self.p2p.clone(); @@ -92,8 +97,8 @@ impl Seeder { for p in disconnected { if p.is_banned() { debug!(LOGGER, "Marking peer {} as banned.", p.info.addr); - let update_result = peer_store - .update_state(p.info.addr, p2p::State::Banned); + let update_result = + peer_store.update_state(p.info.addr, p2p::State::Banned); match update_result { Ok(()) => {} Err(_) => {} @@ -131,10 +136,11 @@ impl Seeder { // Check if we have any pre-existing peer in db. If so, start with those, // otherwise use the seeds provided. - fn connect_to_seeds(&self, - tx: mpsc::UnboundedSender, - seed_list: Box, Error = String>>) - -> Box> { + fn connect_to_seeds( + &self, + tx: mpsc::UnboundedSender, + seed_list: Box, Error = String>>, + ) -> Box> { let peer_store = self.peer_store.clone(); // a thread pool is required so we don't block the event loop with a @@ -178,10 +184,11 @@ impl Seeder { /// addresses to and initiate a connection if the max peer count isn't /// exceeded. A request for more peers is also automatically sent after /// connection. - fn listen_for_addrs(&self, - h: reactor::Handle, - rx: mpsc::UnboundedReceiver) - -> Box> { + fn listen_for_addrs( + &self, + h: reactor::Handle, + rx: mpsc::UnboundedReceiver, + ) -> Box> { let capab = self.capabilities; let p2p_store = self.peer_store.clone(); let p2p_server = self.p2p.clone(); @@ -224,10 +231,8 @@ pub fn web_seeds(h: reactor::Handle) -> Box, Error }) .and_then(|res| { // collect all chunks and split around whitespace to get a list of SocketAddr - res.body() - .collect() - .map_err(|e| e.to_string()) - .and_then(|chunks| { + res.body().collect().map_err(|e| e.to_string()).and_then( + |chunks| { let res = chunks.iter().fold("".to_string(), |acc, ref chunk| { acc + str::from_utf8(&chunk[..]).unwrap() }); @@ -235,7 +240,8 @@ pub fn web_seeds(h: reactor::Handle) -> Box, Error .map(|s| s.parse().unwrap()) .collect::>(); Ok(addrs) - }) + }, + ) }) }); Box::new(seeds) @@ -243,8 +249,9 @@ pub fn web_seeds(h: reactor::Handle) -> Box, Error /// Convenience function when the seed list is immediately known. Mostly used /// for tests. -pub fn predefined_seeds(addrs_str: Vec) - -> Box, Error = String>> { +pub fn predefined_seeds( + addrs_str: Vec, +) -> Box, Error = String>> { let seeds = future::ok(()).and_then(move |_| { Ok( addrs_str @@ -256,12 +263,13 @@ pub fn predefined_seeds(addrs_str: Vec) Box::new(seeds) } -fn connect_and_req(capab: p2p::Capabilities, - peer_store: Arc, - p2p: Arc, - h: reactor::Handle, - addr: SocketAddr) - -> Box> { +fn connect_and_req( + capab: p2p::Capabilities, + peer_store: Arc, + p2p: Arc, + h: reactor::Handle, + addr: SocketAddr, +) -> Box> { let fut = p2p.connect_peer(addr, h).then(move |p| { match p { Ok(Some(p)) => { diff --git a/grin/src/server.rs b/grin/src/server.rs index ebeb35512..0fa361223 100644 --- a/grin/src/server.rs +++ b/grin/src/server.rs @@ -51,6 +51,7 @@ pub struct Server { chain: Arc, /// in-memory transaction pool tx_pool: Arc>>, + net_adapter: Arc, } impl Server { @@ -148,6 +149,7 @@ impl Server { p2p: p2p_server, chain: shared_chain, tx_pool: tx_pool, + net_adapter: net_adapter, }) } @@ -173,10 +175,17 @@ impl Server { pub fn start_miner(&self, config: pow::types::MinerConfig) { let cuckoo_size = global::sizeshift(); let proof_size = global::proofsize(); + let net_adapter = self.net_adapter.clone(); let mut miner = miner::Miner::new(config.clone(), self.chain.clone(), self.tx_pool.clone()); miner.set_debug_output_id(format!("Port {}", self.config.p2p_config.unwrap().port)); - thread::spawn(move || { miner.run_loop(config.clone(), cuckoo_size as u32, proof_size); }); + thread::spawn(move || { + let secs_5 = time::Duration::from_secs(5); + while net_adapter.syncing() { + thread::sleep(secs_5); + } + miner.run_loop(config.clone(), cuckoo_size as u32, proof_size); + }); } /// The chain head diff --git a/grin/src/sync.rs b/grin/src/sync.rs index 6b59a771b..b66662a33 100644 --- a/grin/src/sync.rs +++ b/grin/src/sync.rs @@ -146,12 +146,16 @@ impl Syncer { // clean up potentially dead downloads let twenty_sec_ago = Instant::now() - Duration::from_secs(20); - let too_old_pos = (0..blocks_downloading.len()).filter(|p| { - blocks_downloading[*p].1 < twenty_sec_ago - }).collect::>(); + let too_old_pos = (0..blocks_downloading.len()) + .filter(|p| blocks_downloading[*p].1 < twenty_sec_ago) + .collect::>(); for too_old in too_old_pos { let block_h = blocks_downloading.remove(too_old); - debug!(LOGGER, "Download request expired for {}, will re-issue.", block_h.0); + debug!( + LOGGER, + "Download request expired for {}, will re-issue.", + block_h.0 + ); blocks_to_download.insert(0, block_h.0); } diff --git a/pow/Cargo.toml b/pow/Cargo.toml index a45f03743..fc3370631 100644 --- a/pow/Cargo.toml +++ b/pow/Cargo.toml @@ -21,7 +21,7 @@ git = "https://github.com/mimblewimble/cuckoo-miner" tag="grin_integration_14" #path = "../../cuckoo-miner" #uncomment this feature to turn off plugin builds -#features=["no-plugin-build"] +features=["no-plugin-build"] #uncomment this feature to enable cuda builds (cuda toolkit must be installed) #features=["build-cuda-plugins"]