Miner process wait until sync. Fix #185

This commit is contained in:
Ignotus Peverell 2017-10-17 12:18:21 +00:00
parent 60efa62896
commit efdbcd600a
No known key found for this signature in database
GPG key ID: 99CD25F39F8F8211
5 changed files with 95 additions and 74 deletions

View file

@ -167,8 +167,11 @@ impl NetAdapter for NetToChainAdapter {
/// Find good peers we know with the provided capability and return their /// Find good peers we know with the provided capability and return their
/// addresses. /// addresses.
fn find_peer_addrs(&self, capab: p2p::Capabilities) -> Vec<SocketAddr> { fn find_peer_addrs(&self, capab: p2p::Capabilities) -> Vec<SocketAddr> {
let peers = self.peer_store let peers = self.peer_store.find_peers(
.find_peers(State::Healthy, capab, p2p::MAX_PEER_ADDRS as usize); State::Healthy,
capab,
p2p::MAX_PEER_ADDRS as usize,
);
debug!(LOGGER, "Got {} peer addrs to send.", peers.len()); debug!(LOGGER, "Got {} peer addrs to send.", peers.len());
map_vec!(peers, |p| p.addr) map_vec!(peers, |p| p.addr)
} }
@ -210,10 +213,11 @@ impl NetAdapter for NetToChainAdapter {
} }
impl NetToChainAdapter { impl NetToChainAdapter {
pub fn new(chain_ref: Arc<chain::Chain>, pub fn new(
chain_ref: Arc<chain::Chain>,
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>, tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
peer_store: Arc<PeerStore>) peer_store: Arc<PeerStore>,
-> NetToChainAdapter { ) -> NetToChainAdapter {
NetToChainAdapter { NetToChainAdapter {
chain: chain_ref, chain: chain_ref,
peer_store: peer_store, peer_store: peer_store,
@ -227,19 +231,15 @@ impl NetToChainAdapter {
pub fn start_sync(&self, sync: sync::Syncer) { pub fn start_sync(&self, sync: sync::Syncer) {
let arc_sync = Arc::new(sync); let arc_sync = Arc::new(sync);
self.syncer.init(arc_sync.clone()); self.syncer.init(arc_sync.clone());
let spawn_result = thread::Builder::new() let _ = thread::Builder::new().name("syncer".to_string()).spawn(
.name("syncer".to_string()) move || {
.spawn(move || { let _ = arc_sync.run();
let sync_run_result = arc_sync.run(); },
match sync_run_result { );
Ok(_) => {}
Err(_) => {}
}
});
match spawn_result {
Ok(_) => {}
Err(_) => {}
} }
pub fn syncing(&self) -> bool {
self.syncer.borrow().syncing()
} }
/// Prepare options for the chain pipeline /// Prepare options for the chain pipeline
@ -284,8 +284,9 @@ impl ChainAdapter for ChainToPoolAndNetAdapter {
} }
impl ChainToPoolAndNetAdapter { impl ChainToPoolAndNetAdapter {
pub fn new(tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>) pub fn new(
-> ChainToPoolAndNetAdapter { tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
) -> ChainToPoolAndNetAdapter {
ChainToPoolAndNetAdapter { ChainToPoolAndNetAdapter {
tx_pool: tx_pool, tx_pool: tx_pool,
p2p: OneTime::new(), p2p: OneTime::new(),
@ -317,19 +318,19 @@ impl PoolToChainAdapter {
impl pool::BlockChain for PoolToChainAdapter { impl pool::BlockChain for PoolToChainAdapter {
fn get_unspent(&self, output_ref: &Commitment) -> Result<Output, pool::PoolError> { fn get_unspent(&self, output_ref: &Commitment) -> Result<Output, pool::PoolError> {
self.chain self.chain.borrow().get_unspent(output_ref).map_err(
.borrow() |e| match e {
.get_unspent(output_ref)
.map_err(|e| match e {
chain::types::Error::OutputNotFound => pool::PoolError::OutputNotFound, chain::types::Error::OutputNotFound => pool::PoolError::OutputNotFound,
chain::types::Error::OutputSpent => pool::PoolError::OutputSpent, chain::types::Error::OutputSpent => pool::PoolError::OutputSpent,
_ => pool::PoolError::GenericPoolError, _ => pool::PoolError::GenericPoolError,
}) },
)
} }
fn get_block_header_by_output_commit(&self, fn get_block_header_by_output_commit(
commit: &Commitment) &self,
-> Result<BlockHeader, pool::PoolError> { commit: &Commitment,
) -> Result<BlockHeader, pool::PoolError> {
self.chain self.chain
.borrow() .borrow()
.get_block_header_by_output_commit(commit) .get_block_header_by_output_commit(commit)
@ -337,9 +338,8 @@ impl pool::BlockChain for PoolToChainAdapter {
} }
fn head_header(&self) -> Result<BlockHeader, pool::PoolError> { fn head_header(&self) -> Result<BlockHeader, pool::PoolError> {
self.chain self.chain.borrow().head_header().map_err(|_| {
.borrow() pool::PoolError::GenericPoolError
.head_header() })
.map_err(|_| pool::PoolError::GenericPoolError)
} }
} }

View file

@ -45,10 +45,11 @@ pub struct Seeder {
} }
impl Seeder { impl Seeder {
pub fn new(capabilities: p2p::Capabilities, pub fn new(
capabilities: p2p::Capabilities,
peer_store: Arc<p2p::PeerStore>, peer_store: Arc<p2p::PeerStore>,
p2p: Arc<p2p::Server>) p2p: Arc<p2p::Server>,
-> Seeder { ) -> Seeder {
Seeder { Seeder {
peer_store: peer_store, peer_store: peer_store,
p2p: p2p, p2p: p2p,
@ -56,17 +57,20 @@ impl Seeder {
} }
} }
pub fn connect_and_monitor(&self, pub fn connect_and_monitor(
&self,
h: reactor::Handle, h: reactor::Handle,
seed_list: Box<Future<Item = Vec<SocketAddr>, Error = String>>) { seed_list: Box<Future<Item = Vec<SocketAddr>, Error = String>>,
) {
// open a channel with a listener that connects every peer address sent below // open a channel with a listener that connects every peer address sent below
// max peer count // max peer count
let (tx, rx) = futures::sync::mpsc::unbounded(); let (tx, rx) = futures::sync::mpsc::unbounded();
h.spawn(self.listen_for_addrs(h.clone(), rx)); h.spawn(self.listen_for_addrs(h.clone(), rx));
// check seeds and start monitoring connections // check seeds and start monitoring connections
let seeder = self.connect_to_seeds(tx.clone(), seed_list) let seeder = self.connect_to_seeds(tx.clone(), seed_list).join(
.join(self.monitor_peers(tx.clone())); self.monitor_peers(tx.clone()),
);
h.spawn(seeder.map(|_| ()).map_err(|e| { h.spawn(seeder.map(|_| ()).map_err(|e| {
error!(LOGGER, "Seeding or peer monitoring error: {}", e); error!(LOGGER, "Seeding or peer monitoring error: {}", e);
@ -74,9 +78,10 @@ impl Seeder {
})); }));
} }
fn monitor_peers(&self, fn monitor_peers(
tx: mpsc::UnboundedSender<SocketAddr>) &self,
-> Box<Future<Item = (), Error = String>> { tx: mpsc::UnboundedSender<SocketAddr>,
) -> Box<Future<Item = (), Error = String>> {
let peer_store = self.peer_store.clone(); let peer_store = self.peer_store.clone();
let p2p_server = self.p2p.clone(); let p2p_server = self.p2p.clone();
@ -92,8 +97,8 @@ impl Seeder {
for p in disconnected { for p in disconnected {
if p.is_banned() { if p.is_banned() {
debug!(LOGGER, "Marking peer {} as banned.", p.info.addr); debug!(LOGGER, "Marking peer {} as banned.", p.info.addr);
let update_result = peer_store let update_result =
.update_state(p.info.addr, p2p::State::Banned); peer_store.update_state(p.info.addr, p2p::State::Banned);
match update_result { match update_result {
Ok(()) => {} Ok(()) => {}
Err(_) => {} Err(_) => {}
@ -131,10 +136,11 @@ impl Seeder {
// Check if we have any pre-existing peer in db. If so, start with those, // Check if we have any pre-existing peer in db. If so, start with those,
// otherwise use the seeds provided. // otherwise use the seeds provided.
fn connect_to_seeds(&self, fn connect_to_seeds(
&self,
tx: mpsc::UnboundedSender<SocketAddr>, tx: mpsc::UnboundedSender<SocketAddr>,
seed_list: Box<Future<Item = Vec<SocketAddr>, Error = String>>) seed_list: Box<Future<Item = Vec<SocketAddr>, Error = String>>,
-> Box<Future<Item = (), Error = String>> { ) -> Box<Future<Item = (), Error = String>> {
let peer_store = self.peer_store.clone(); let peer_store = self.peer_store.clone();
// a thread pool is required so we don't block the event loop with a // a thread pool is required so we don't block the event loop with a
@ -178,10 +184,11 @@ impl Seeder {
/// addresses to and initiate a connection if the max peer count isn't /// addresses to and initiate a connection if the max peer count isn't
/// exceeded. A request for more peers is also automatically sent after /// exceeded. A request for more peers is also automatically sent after
/// connection. /// connection.
fn listen_for_addrs(&self, fn listen_for_addrs(
&self,
h: reactor::Handle, h: reactor::Handle,
rx: mpsc::UnboundedReceiver<SocketAddr>) rx: mpsc::UnboundedReceiver<SocketAddr>,
-> Box<Future<Item = (), Error = ()>> { ) -> Box<Future<Item = (), Error = ()>> {
let capab = self.capabilities; let capab = self.capabilities;
let p2p_store = self.peer_store.clone(); let p2p_store = self.peer_store.clone();
let p2p_server = self.p2p.clone(); let p2p_server = self.p2p.clone();
@ -224,10 +231,8 @@ pub fn web_seeds(h: reactor::Handle) -> Box<Future<Item = Vec<SocketAddr>, Error
}) })
.and_then(|res| { .and_then(|res| {
// collect all chunks and split around whitespace to get a list of SocketAddr // collect all chunks and split around whitespace to get a list of SocketAddr
res.body() res.body().collect().map_err(|e| e.to_string()).and_then(
.collect() |chunks| {
.map_err(|e| e.to_string())
.and_then(|chunks| {
let res = chunks.iter().fold("".to_string(), |acc, ref chunk| { let res = chunks.iter().fold("".to_string(), |acc, ref chunk| {
acc + str::from_utf8(&chunk[..]).unwrap() acc + str::from_utf8(&chunk[..]).unwrap()
}); });
@ -235,7 +240,8 @@ pub fn web_seeds(h: reactor::Handle) -> Box<Future<Item = Vec<SocketAddr>, Error
.map(|s| s.parse().unwrap()) .map(|s| s.parse().unwrap())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
Ok(addrs) Ok(addrs)
}) },
)
}) })
}); });
Box::new(seeds) Box::new(seeds)
@ -243,8 +249,9 @@ pub fn web_seeds(h: reactor::Handle) -> Box<Future<Item = Vec<SocketAddr>, Error
/// Convenience function when the seed list is immediately known. Mostly used /// Convenience function when the seed list is immediately known. Mostly used
/// for tests. /// for tests.
pub fn predefined_seeds(addrs_str: Vec<String>) pub fn predefined_seeds(
-> Box<Future<Item = Vec<SocketAddr>, Error = String>> { addrs_str: Vec<String>,
) -> Box<Future<Item = Vec<SocketAddr>, Error = String>> {
let seeds = future::ok(()).and_then(move |_| { let seeds = future::ok(()).and_then(move |_| {
Ok( Ok(
addrs_str addrs_str
@ -256,12 +263,13 @@ pub fn predefined_seeds(addrs_str: Vec<String>)
Box::new(seeds) Box::new(seeds)
} }
fn connect_and_req(capab: p2p::Capabilities, fn connect_and_req(
capab: p2p::Capabilities,
peer_store: Arc<p2p::PeerStore>, peer_store: Arc<p2p::PeerStore>,
p2p: Arc<p2p::Server>, p2p: Arc<p2p::Server>,
h: reactor::Handle, h: reactor::Handle,
addr: SocketAddr) addr: SocketAddr,
-> Box<Future<Item = (), Error = ()>> { ) -> Box<Future<Item = (), Error = ()>> {
let fut = p2p.connect_peer(addr, h).then(move |p| { let fut = p2p.connect_peer(addr, h).then(move |p| {
match p { match p {
Ok(Some(p)) => { Ok(Some(p)) => {

View file

@ -51,6 +51,7 @@ pub struct Server {
chain: Arc<chain::Chain>, chain: Arc<chain::Chain>,
/// in-memory transaction pool /// in-memory transaction pool
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>, tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
net_adapter: Arc<NetToChainAdapter>,
} }
impl Server { impl Server {
@ -148,6 +149,7 @@ impl Server {
p2p: p2p_server, p2p: p2p_server,
chain: shared_chain, chain: shared_chain,
tx_pool: tx_pool, tx_pool: tx_pool,
net_adapter: net_adapter,
}) })
} }
@ -173,10 +175,17 @@ impl Server {
pub fn start_miner(&self, config: pow::types::MinerConfig) { pub fn start_miner(&self, config: pow::types::MinerConfig) {
let cuckoo_size = global::sizeshift(); let cuckoo_size = global::sizeshift();
let proof_size = global::proofsize(); let proof_size = global::proofsize();
let net_adapter = self.net_adapter.clone();
let mut miner = miner::Miner::new(config.clone(), self.chain.clone(), self.tx_pool.clone()); let mut miner = miner::Miner::new(config.clone(), self.chain.clone(), self.tx_pool.clone());
miner.set_debug_output_id(format!("Port {}", self.config.p2p_config.unwrap().port)); miner.set_debug_output_id(format!("Port {}", self.config.p2p_config.unwrap().port));
thread::spawn(move || { miner.run_loop(config.clone(), cuckoo_size as u32, proof_size); }); thread::spawn(move || {
let secs_5 = time::Duration::from_secs(5);
while net_adapter.syncing() {
thread::sleep(secs_5);
}
miner.run_loop(config.clone(), cuckoo_size as u32, proof_size);
});
} }
/// The chain head /// The chain head

View file

@ -146,12 +146,16 @@ impl Syncer {
// clean up potentially dead downloads // clean up potentially dead downloads
let twenty_sec_ago = Instant::now() - Duration::from_secs(20); let twenty_sec_ago = Instant::now() - Duration::from_secs(20);
let too_old_pos = (0..blocks_downloading.len()).filter(|p| { let too_old_pos = (0..blocks_downloading.len())
blocks_downloading[*p].1 < twenty_sec_ago .filter(|p| blocks_downloading[*p].1 < twenty_sec_ago)
}).collect::<Vec<_>>(); .collect::<Vec<_>>();
for too_old in too_old_pos { for too_old in too_old_pos {
let block_h = blocks_downloading.remove(too_old); let block_h = blocks_downloading.remove(too_old);
debug!(LOGGER, "Download request expired for {}, will re-issue.", block_h.0); debug!(
LOGGER,
"Download request expired for {}, will re-issue.",
block_h.0
);
blocks_to_download.insert(0, block_h.0); blocks_to_download.insert(0, block_h.0);
} }

View file

@ -21,7 +21,7 @@ git = "https://github.com/mimblewimble/cuckoo-miner"
tag="grin_integration_14" tag="grin_integration_14"
#path = "../../cuckoo-miner" #path = "../../cuckoo-miner"
#uncomment this feature to turn off plugin builds #uncomment this feature to turn off plugin builds
#features=["no-plugin-build"] features=["no-plugin-build"]
#uncomment this feature to enable cuda builds (cuda toolkit must be installed) #uncomment this feature to enable cuda builds (cuda toolkit must be installed)
#features=["build-cuda-plugins"] #features=["build-cuda-plugins"]