diff --git a/api/src/handlers.rs b/api/src/handlers.rs index 3cd1ef6e6..e2960ddf8 100644 --- a/api/src/handlers.rs +++ b/api/src/handlers.rs @@ -791,9 +791,9 @@ impl Handler for PoolPushHandler { /// except during tests). pub fn start_rest_apis( addr: String, - chain: Weak, - tx_pool: Weak>, - peers: Weak, + chain: Arc, + tx_pool: Arc>, + peers: Arc, api_secret: Option, tls_config: Option, ) -> bool { @@ -813,9 +813,9 @@ pub fn start_rest_apis( } pub fn build_router( - chain: Weak, - tx_pool: Weak>, - peers: Weak, + chain: Arc, + tx_pool: Arc>, + peers: Arc, ) -> Result { let route_list = vec![ "get blocks".to_string(), @@ -840,45 +840,45 @@ pub fn build_router( let index_handler = IndexHandler { list: route_list }; let output_handler = OutputHandler { - chain: chain.clone(), + chain: Arc::downgrade(&chain), }; let block_handler = BlockHandler { - chain: chain.clone(), + chain: Arc::downgrade(&chain), }; let header_handler = HeaderHandler { - chain: chain.clone(), + chain: Arc::downgrade(&chain), }; let chain_tip_handler = ChainHandler { - chain: chain.clone(), + chain: Arc::downgrade(&chain), }; let chain_compact_handler = ChainCompactHandler { - chain: chain.clone(), + chain: Arc::downgrade(&chain), }; let chain_validation_handler = ChainValidationHandler { - chain: chain.clone(), + chain: Arc::downgrade(&chain), }; let status_handler = StatusHandler { - chain: chain.clone(), - peers: peers.clone(), + chain: Arc::downgrade(&chain), + peers: Arc::downgrade(&peers), }; let txhashset_handler = TxHashSetHandler { - chain: chain.clone(), + chain: Arc::downgrade(&chain), }; let pool_info_handler = PoolInfoHandler { - tx_pool: tx_pool.clone(), + tx_pool: Arc::downgrade(&tx_pool), }; let pool_push_handler = PoolPushHandler { - tx_pool: tx_pool.clone(), + tx_pool: Arc::downgrade(&tx_pool), }; let peers_all_handler = PeersAllHandler { - peers: peers.clone(), + peers: Arc::downgrade(&peers), }; let peers_connected_handler = PeersConnectedHandler { - peers: peers.clone(), + peers: Arc::downgrade(&peers), }; let peer_handler = PeerHandler { - peers: peers.clone(), + peers: Arc::downgrade(&peers), }; let mut router = Router::new(); diff --git a/pool/src/transaction_pool.rs b/pool/src/transaction_pool.rs index 8ede35797..218e02948 100644 --- a/pool/src/transaction_pool.rs +++ b/pool/src/transaction_pool.rs @@ -61,6 +61,10 @@ impl TransactionPool { } } + pub fn chain_head(&self) -> Result { + self.blockchain.chain_head() + } + fn add_to_stempool(&mut self, entry: PoolEntry, header: &BlockHeader) -> Result<(), PoolError> { // Add tx to stempool (passing in all txs from txpool to validate against). self.stempool diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index 2be2142c1..71f27d396 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -17,7 +17,6 @@ use std::fs::File; use std::net::SocketAddr; -use std::ops::Deref; use std::sync::{Arc, RwLock, Weak}; use std::thread; use std::time::Instant; @@ -35,17 +34,6 @@ use pool; use store; use util::{OneTime, LOGGER}; -// All adapters use `Weak` references instead of `Arc` to avoid cycles that -// can never be destroyed. These 2 functions are simple helpers to reduce the -// boilerplate of dealing with `Weak`. -fn w(weak: &Weak) -> Arc { - weak.upgrade().unwrap() -} - -fn wo(weak_one: &OneTime>) -> Arc { - w(weak_one.borrow().deref()) -} - /// Implementation of the NetAdapter for the . Gets notified when new /// blocks and transactions are received and forwards to the chain and pool /// implementations. @@ -61,11 +49,11 @@ pub struct NetToChainAdapter { impl p2p::ChainAdapter for NetToChainAdapter { fn total_difficulty(&self) -> Difficulty { - w(&self.chain).head().unwrap().total_difficulty + self.chain().head().unwrap().total_difficulty } fn total_height(&self) -> u64 { - w(&self.chain).head().unwrap().height + self.chain().head().unwrap().height } fn transaction_received(&self, tx: core::Transaction, stem: bool) { @@ -80,7 +68,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { }; let tx_hash = tx.hash(); - let header = w(&self.chain).head_header().unwrap(); + let header = self.chain().head_header().unwrap(); debug!( LOGGER, @@ -140,7 +128,10 @@ impl p2p::ChainAdapter for NetToChainAdapter { } } else { // check at least the header is valid before hydrating - if let Err(e) = w(&self.chain).process_block_header(&cb.header, self.chain_opts()) { + if let Err(e) = self + .chain() + .process_block_header(&cb.header, self.chain_opts()) + { debug!(LOGGER, "Invalid compact block header {}: {}", cb_hash, e); return !e.is_bad_data(); } @@ -170,12 +161,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { } }; - let chain = self - .chain - .upgrade() - .expect("failed to upgrade weak ref to chain"); - - if let Ok(prev) = chain.get_block_header(&cb.header.previous) { + if let Ok(prev) = self.chain().get_block_header(&cb.header.previous) { if block .validate( &prev.total_kernel_offset, @@ -220,7 +206,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { // pushing the new block header through the header chain pipeline // we will go ask for the block if this is a new header - let res = w(&self.chain).process_block_header(&bh, self.chain_opts()); + let res = self.chain().process_block_header(&bh, self.chain_opts()); if let &Err(ref e) = &res { debug!( @@ -234,7 +220,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { LOGGER, "header_received: {} is a bad header, resetting header head", bhash ); - let _ = w(&self.chain).reset_head(); + let _ = self.chain().reset_head(); return false; } else { // we got an error when trying to process the block header @@ -264,7 +250,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { } // try to add headers to our header chain - let res = w(&self.chain).sync_block_headers(&bhs, self.chain_opts()); + let res = self.chain().sync_block_headers(&bhs, self.chain_opts()); if let &Err(ref e) = &res { debug!(LOGGER, "Block headers refused by chain: {:?}", e); @@ -289,7 +275,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { let hh = header.height; let mut headers = vec![]; for h in (hh + 1)..(hh + (p2p::MAX_BLOCK_HEADERS as u64)) { - let header = w(&self.chain).get_header_by_height(h); + let header = self.chain().get_header_by_height(h); match header { Ok(head) => headers.push(head), Err(e) => match e.kind() { @@ -313,7 +299,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { /// Gets a full block by its hash. fn get_block(&self, h: Hash) -> Option { - let b = w(&self.chain).get_block(&h); + let b = self.chain().get_block(&h); match b { Ok(b) => Some(b), _ => None, @@ -324,7 +310,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { /// the required indexes for a consumer to rewind to a consistent state /// at the provided block hash. fn txhashset_read(&self, h: Hash) -> Option { - match w(&self.chain).txhashset_read(h.clone()) { + match self.chain().txhashset_read(h.clone()) { Ok((out_index, kernel_index, read)) => Some(p2p::TxHashSetRead { output_index: out_index, kernel_index: kernel_index, @@ -354,7 +340,9 @@ impl p2p::ChainAdapter for NetToChainAdapter { return true; } - if let Err(e) = w(&self.chain).txhashset_write(h, txhashset_data, self.sync_state.as_ref()) + if let Err(e) = self + .chain() + .txhashset_write(h, txhashset_data, self.sync_state.as_ref()) { error!(LOGGER, "Failed to save txhashset archive: {}", e); let is_good_data = !e.is_bad_data(); @@ -372,7 +360,7 @@ impl NetToChainAdapter { pub fn new( sync_state: Arc, archive_mode: bool, - chain_ref: Weak, + chain: Arc, tx_pool: Arc>, verifier_cache: Arc>, config: ServerConfig, @@ -380,7 +368,7 @@ impl NetToChainAdapter { NetToChainAdapter { sync_state, archive_mode, - chain: chain_ref, + chain: Arc::downgrade(&chain), tx_pool, verifier_cache, peers: OneTime::new(), @@ -390,8 +378,21 @@ impl NetToChainAdapter { /// Initialize a NetToChainAdaptor with reference to a Peers object. /// Should only be called once. - pub fn init(&self, peers: Weak) { - self.peers.init(peers); + pub fn init(&self, peers: Arc) { + self.peers.init(Arc::downgrade(&peers)); + } + + fn peers(&self) -> Arc { + self.peers + .borrow() + .upgrade() + .expect("Failed to upgrade weak ref to our peers.") + } + + fn chain(&self) -> Arc { + self.chain + .upgrade() + .expect("Failed to upgrade weak ref to our chain.") } // recursively go back through the locator vector and stop when we find @@ -402,13 +403,12 @@ impl NetToChainAdapter { return None; } - let chain = w(&self.chain); - let known = chain.get_block_header(&locator[0]); + let known = self.chain().get_block_header(&locator[0]); match known { Ok(header) => { // even if we know the block, it may not be on our winning chain - let known_winning = chain.get_header_by_height(header.height); + let known_winning = self.chain().get_header_by_height(header.height); if let Ok(known_winning) = known_winning { if known_winning.hash() != header.hash() { self.find_common_header(locator[1..].to_vec()) @@ -434,9 +434,8 @@ impl NetToChainAdapter { // pushing the new block through the chain pipeline // remembering to reset the head if we have a bad block fn process_block(&self, b: core::Block, addr: SocketAddr) -> bool { - let chain = w(&self.chain); if !self.archive_mode { - let head = chain.head().unwrap(); + let head = self.chain().head().unwrap(); // we have a fast sync'd node and are sent a block older than our horizon, // only sync can do something with that if b.header.height < head @@ -449,7 +448,7 @@ impl NetToChainAdapter { let prev_hash = b.header.previous; let bhash = b.hash(); - match chain.process_block(b, self.chain_opts()) { + match self.chain().process_block(b, self.chain_opts()) { Ok(tip) => { self.validate_chain(bhash); self.check_compact(tip); @@ -460,7 +459,7 @@ impl NetToChainAdapter { LOGGER, "adapter: process_block: {} is a bad block, resetting head", bhash ); - let _ = chain.reset_head(); + let _ = self.chain().reset_head(); // we potentially changed the state of the system here // so check everything is still ok @@ -472,7 +471,7 @@ impl NetToChainAdapter { match e.kind() { chain::ErrorKind::Orphan => { // make sure we did not miss the parent block - if !chain.is_orphan(&prev_hash) && !self.sync_state.is_syncing() { + if !self.chain().is_orphan(&prev_hash) && !self.sync_state.is_syncing() { debug!(LOGGER, "adapter: process_block: received an orphan block, checking the parent: {:}", prev_hash); self.request_block_by_hash(prev_hash, &addr) } @@ -498,8 +497,7 @@ impl NetToChainAdapter { // We are out of consensus at this point and want to track the problem // down as soon as possible. // Skip this if we are currently syncing (too slow). - let chain = w(&self.chain); - if chain.head().unwrap().height > 0 + if self.chain().head().unwrap().height > 0 && !self.sync_state.is_syncing() && self.config.chain_validation_mode == ChainValidationMode::EveryBlock { @@ -510,8 +508,7 @@ impl NetToChainAdapter { "adapter: process_block: ***** validating full chain state at {}", bhash, ); - let chain = w(&self.chain); - chain + self.chain() .validate(true) .expect("chain validation failed, hard stop"); @@ -533,7 +530,7 @@ impl NetToChainAdapter { // trigger compaction every 2000 blocks, uses a different thread to avoid // blocking the caller thread (likely a peer) if tip.height % 2000 == 0 { - let chain = w(&self.chain); + let chain = self.chain().clone(); let _ = thread::Builder::new() .name("compactor".to_string()) .spawn(move || { @@ -570,8 +567,8 @@ impl NetToChainAdapter { where F: Fn(&p2p::Peer, Hash) -> Result<(), p2p::Error>, { - match w(&self.chain).block_exists(h) { - Ok(false) => match wo(&self.peers).get_connected_peer(addr) { + match self.chain().block_exists(h) { + Ok(false) => match self.peers().get_connected_peer(addr) { None => debug!( LOGGER, "send_block_request_to_peer: can't send request to peer {:?}, not connected", @@ -637,10 +634,10 @@ impl ChainAdapter for ChainToPoolAndNetAdapter { if opts.contains(Options::MINE) { // propagate compact block out if we mined the block let cb: CompactBlock = b.clone().into(); - wo(&self.peers).broadcast_compact_block(&cb); + self.peers().broadcast_compact_block(&cb); } else { // "header first" propagation if we are not the originator of this block - wo(&self.peers).broadcast_header(&b.header); + self.peers().broadcast_header(&b.header); } } } @@ -660,8 +657,15 @@ impl ChainToPoolAndNetAdapter { /// Initialize a ChainToPoolAndNetAdapter instance with handle to a Peers /// object. Should only be called once. - pub fn init(&self, peers: Weak) { - self.peers.init(peers); + pub fn init(&self, peers: Arc) { + self.peers.init(Arc::downgrade(&peers)); + } + + fn peers(&self) -> Arc { + self.peers + .borrow() + .upgrade() + .expect("Failed to upgrade weak ref to our peers.") } } @@ -673,13 +677,13 @@ pub struct PoolToNetAdapter { impl pool::PoolAdapter for PoolToNetAdapter { fn stem_tx_accepted(&self, tx: &core::Transaction) -> Result<(), pool::PoolError> { - wo(&self.peers) + self.peers() .broadcast_stem_transaction(tx) .map_err(|_| pool::PoolError::DandelionError)?; Ok(()) } fn tx_accepted(&self, tx: &core::Transaction) { - wo(&self.peers).broadcast_transaction(tx); + self.peers().broadcast_transaction(tx); } } @@ -692,8 +696,15 @@ impl PoolToNetAdapter { } /// Setup the p2p server on the adapter - pub fn init(&self, peers: Weak) { - self.peers.init(peers); + pub fn init(&self, peers: Arc) { + self.peers.init(Arc::downgrade(&peers)); + } + + fn peers(&self) -> Arc { + self.peers + .borrow() + .upgrade() + .expect("Failed to upgrade weak ref to our peers.") } } @@ -714,44 +725,51 @@ impl PoolToChainAdapter { } /// Set the pool adapter's chain. Should only be called once. - pub fn set_chain(&self, chain_ref: Weak) { - self.chain.init(chain_ref); + pub fn set_chain(&self, chain_ref: Arc) { + self.chain.init(Arc::downgrade(&chain_ref)); + } + + fn chain(&self) -> Arc { + self.chain + .borrow() + .upgrade() + .expect("Failed to upgrade the weak ref to our chain.") } } impl pool::BlockChain for PoolToChainAdapter { fn chain_head(&self) -> Result { - wo(&self.chain) + self.chain() .head_header() .map_err(|_| pool::PoolError::Other(format!("failed to get head_header"))) } fn get_block_header(&self, hash: &Hash) -> Result { - wo(&self.chain) + self.chain() .get_block_header(hash) .map_err(|_| pool::PoolError::Other(format!("failed to get block_header"))) } fn get_block_sums(&self, hash: &Hash) -> Result { - wo(&self.chain) + self.chain() .get_block_sums(hash) .map_err(|_| pool::PoolError::Other(format!("failed to get block_sums"))) } fn validate_tx(&self, tx: &Transaction) -> Result<(), pool::PoolError> { - wo(&self.chain) + self.chain() .validate_tx(tx) .map_err(|_| pool::PoolError::Other(format!("failed to validate tx"))) } fn verify_coinbase_maturity(&self, tx: &Transaction) -> Result<(), pool::PoolError> { - wo(&self.chain) + self.chain() .verify_coinbase_maturity(tx) .map_err(|_| pool::PoolError::ImmatureCoinbase) } fn verify_tx_lock_height(&self, tx: &Transaction) -> Result<(), pool::PoolError> { - wo(&self.chain) + self.chain() .verify_tx_lock_height(tx) .map_err(|_| pool::PoolError::ImmatureTransaction) } diff --git a/servers/src/grin/dandelion_monitor.rs b/servers/src/grin/dandelion_monitor.rs index 9b0623195..f4cd4a7c0 100644 --- a/servers/src/grin/dandelion_monitor.rs +++ b/servers/src/grin/dandelion_monitor.rs @@ -53,8 +53,6 @@ pub fn monitor_transactions( let patience_secs = dandelion_config.patience_secs.unwrap(); thread::sleep(Duration::from_secs(patience_secs)); - let tx_pool = tx_pool.clone(); - // Step 1: find all "ToStem" entries in stempool from last run. // Aggregate them up to give a single (valid) aggregated tx and propagate it // to the next Dandelion relay along the stem. @@ -90,7 +88,7 @@ fn process_stem_phase( ) -> Result<(), PoolError> { let mut tx_pool = tx_pool.write().unwrap(); - let header = tx_pool.blockchain.chain_head()?; + let header = tx_pool.chain_head()?; let txpool_tx = tx_pool.txpool.aggregate_transaction()?; let stem_txs = tx_pool @@ -137,7 +135,7 @@ fn process_fluff_phase( ) -> Result<(), PoolError> { let mut tx_pool = tx_pool.write().unwrap(); - let header = tx_pool.blockchain.chain_head()?; + let header = tx_pool.chain_head()?; let txpool_tx = tx_pool.txpool.aggregate_transaction()?; let stem_txs = tx_pool @@ -239,7 +237,7 @@ fn process_expired_entries( { let mut tx_pool = tx_pool.write().unwrap(); - let header = tx_pool.blockchain.chain_head()?; + let header = tx_pool.chain_head()?; for entry in expired_entries { let src = TxSource { diff --git a/servers/src/grin/server.rs b/servers/src/grin/server.rs index b25923f27..31fac890b 100644 --- a/servers/src/grin/server.rs +++ b/servers/src/grin/server.rs @@ -168,14 +168,14 @@ impl Server { archive_mode, )?); - pool_adapter.set_chain(Arc::downgrade(&shared_chain)); + pool_adapter.set_chain(shared_chain.clone()); let awaiting_peers = Arc::new(AtomicBool::new(false)); let net_adapter = Arc::new(NetToChainAdapter::new( sync_state.clone(), archive_mode, - Arc::downgrade(&shared_chain), + shared_chain.clone(), tx_pool.clone(), verifier_cache.clone(), config.clone(), @@ -197,9 +197,9 @@ impl Server { archive_mode, block_1_hash, )?); - chain_adapter.init(Arc::downgrade(&p2p_server.peers)); - pool_net_adapter.init(Arc::downgrade(&p2p_server.peers)); - net_adapter.init(Arc::downgrade(&p2p_server.peers)); + chain_adapter.init(p2p_server.peers.clone()); + pool_net_adapter.init(p2p_server.peers.clone()); + net_adapter.init(p2p_server.peers.clone()); if config.p2p_config.seeding_type.clone() != p2p::Seeding::Programmatic { let seeder = match config.p2p_config.seeding_type.clone() { @@ -258,9 +258,9 @@ impl Server { let api_secret = get_first_line(config.api_secret_path.clone()); api::start_rest_apis( config.api_http_addr.clone(), - Arc::downgrade(&shared_chain), - Arc::downgrade(&tx_pool), - Arc::downgrade(&p2p_server.peers), + shared_chain.clone(), + tx_pool.clone(), + p2p_server.peers.clone(), api_secret, None, ); diff --git a/util/src/lib.rs b/util/src/lib.rs index aaa378a1d..9fd57f094 100644 --- a/util/src/lib.rs +++ b/util/src/lib.rs @@ -58,9 +58,9 @@ pub mod macros; // other utils use byteorder::{BigEndian, ByteOrder}; -use std::cell::{Ref, RefCell}; #[allow(unused_imports)] use std::ops::Deref; +use std::sync::{Arc, RwLock}; mod hex; pub use hex::*; @@ -70,44 +70,42 @@ pub mod file; /// Compress and decompress zip bz2 archives pub mod zip; -/// Encapsulation of a RefCell> for one-time initialization after -/// construction. This implementation will purposefully fail hard if not used -/// properly, for example if it's not initialized before being first used +/// Encapsulation of a RwLock> for one-time initialization. +/// This implementation will purposefully fail hard if not used +/// properly, for example if not initialized before being first used /// (borrowed). #[derive(Clone)] pub struct OneTime { - /// inner - inner: RefCell>, + /// The inner value. + inner: Arc>>, } -unsafe impl Sync for OneTime {} -unsafe impl Send for OneTime {} - -impl OneTime { +impl OneTime +where + T: Clone, +{ /// Builds a new uninitialized OneTime. pub fn new() -> OneTime { OneTime { - inner: RefCell::new(None), + inner: Arc::new(RwLock::new(None)), } } /// Initializes the OneTime, should only be called once after construction. + /// Will panic (via assert) if called more than once. pub fn init(&self, value: T) { - let mut inner_mut = self.inner.borrow_mut(); - *inner_mut = Some(value); - } - - /// Whether the OneTime has been initialized - pub fn is_initialized(&self) -> bool { - match self.inner.try_borrow() { - Ok(inner) => inner.is_some(), - Err(_) => false, - } + let mut inner = self.inner.write().unwrap(); + assert!(inner.is_none()); + *inner = Some(value); } /// Borrows the OneTime, should only be called after initialization. - pub fn borrow(&self) -> Ref { - Ref::map(self.inner.borrow(), |o| o.as_ref().unwrap()) + /// Will panic (via expect) if called before initialization. + pub fn borrow(&self) -> T { + let inner = self.inner.read().unwrap(); + inner + .clone() + .expect("Cannot borrow one_time before initialization.") } }