Replace RefCell with RwLock in OneTime (and cleanup Weak usage) (#1694)

* rustfmt

* use RwLock in OneTime (and not RefCell)

* rustfmt

* put weak refs back and cleanup

* rustfmt

* revert weak in adapters
This commit is contained in:
Antioch Peverell 2018-10-09 16:53:57 +01:00 committed by GitHub
parent 02c30cb302
commit a676eb1b39
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 138 additions and 120 deletions

View file

@ -791,9 +791,9 @@ impl Handler for PoolPushHandler {
/// except during tests).
pub fn start_rest_apis(
addr: String,
chain: Weak<chain::Chain>,
tx_pool: Weak<RwLock<pool::TransactionPool>>,
peers: Weak<p2p::Peers>,
chain: Arc<chain::Chain>,
tx_pool: Arc<RwLock<pool::TransactionPool>>,
peers: Arc<p2p::Peers>,
api_secret: Option<String>,
tls_config: Option<TLSConfig>,
) -> bool {
@ -813,9 +813,9 @@ pub fn start_rest_apis(
}
pub fn build_router(
chain: Weak<chain::Chain>,
tx_pool: Weak<RwLock<pool::TransactionPool>>,
peers: Weak<p2p::Peers>,
chain: Arc<chain::Chain>,
tx_pool: Arc<RwLock<pool::TransactionPool>>,
peers: Arc<p2p::Peers>,
) -> Result<Router, RouterError> {
let route_list = vec![
"get blocks".to_string(),
@ -840,45 +840,45 @@ pub fn build_router(
let index_handler = IndexHandler { list: route_list };
let output_handler = OutputHandler {
chain: chain.clone(),
chain: Arc::downgrade(&chain),
};
let block_handler = BlockHandler {
chain: chain.clone(),
chain: Arc::downgrade(&chain),
};
let header_handler = HeaderHandler {
chain: chain.clone(),
chain: Arc::downgrade(&chain),
};
let chain_tip_handler = ChainHandler {
chain: chain.clone(),
chain: Arc::downgrade(&chain),
};
let chain_compact_handler = ChainCompactHandler {
chain: chain.clone(),
chain: Arc::downgrade(&chain),
};
let chain_validation_handler = ChainValidationHandler {
chain: chain.clone(),
chain: Arc::downgrade(&chain),
};
let status_handler = StatusHandler {
chain: chain.clone(),
peers: peers.clone(),
chain: Arc::downgrade(&chain),
peers: Arc::downgrade(&peers),
};
let txhashset_handler = TxHashSetHandler {
chain: chain.clone(),
chain: Arc::downgrade(&chain),
};
let pool_info_handler = PoolInfoHandler {
tx_pool: tx_pool.clone(),
tx_pool: Arc::downgrade(&tx_pool),
};
let pool_push_handler = PoolPushHandler {
tx_pool: tx_pool.clone(),
tx_pool: Arc::downgrade(&tx_pool),
};
let peers_all_handler = PeersAllHandler {
peers: peers.clone(),
peers: Arc::downgrade(&peers),
};
let peers_connected_handler = PeersConnectedHandler {
peers: peers.clone(),
peers: Arc::downgrade(&peers),
};
let peer_handler = PeerHandler {
peers: peers.clone(),
peers: Arc::downgrade(&peers),
};
let mut router = Router::new();

View file

@ -61,6 +61,10 @@ impl TransactionPool {
}
}
pub fn chain_head(&self) -> Result<BlockHeader, PoolError> {
self.blockchain.chain_head()
}
fn add_to_stempool(&mut self, entry: PoolEntry, header: &BlockHeader) -> Result<(), PoolError> {
// Add tx to stempool (passing in all txs from txpool to validate against).
self.stempool

View file

@ -17,7 +17,6 @@
use std::fs::File;
use std::net::SocketAddr;
use std::ops::Deref;
use std::sync::{Arc, RwLock, Weak};
use std::thread;
use std::time::Instant;
@ -35,17 +34,6 @@ use pool;
use store;
use util::{OneTime, LOGGER};
// All adapters use `Weak` references instead of `Arc` to avoid cycles that
// can never be destroyed. These 2 functions are simple helpers to reduce the
// boilerplate of dealing with `Weak`.
fn w<T>(weak: &Weak<T>) -> Arc<T> {
weak.upgrade().unwrap()
}
fn wo<T>(weak_one: &OneTime<Weak<T>>) -> Arc<T> {
w(weak_one.borrow().deref())
}
/// Implementation of the NetAdapter for the . Gets notified when new
/// blocks and transactions are received and forwards to the chain and pool
/// implementations.
@ -61,11 +49,11 @@ pub struct NetToChainAdapter {
impl p2p::ChainAdapter for NetToChainAdapter {
fn total_difficulty(&self) -> Difficulty {
w(&self.chain).head().unwrap().total_difficulty
self.chain().head().unwrap().total_difficulty
}
fn total_height(&self) -> u64 {
w(&self.chain).head().unwrap().height
self.chain().head().unwrap().height
}
fn transaction_received(&self, tx: core::Transaction, stem: bool) {
@ -80,7 +68,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
};
let tx_hash = tx.hash();
let header = w(&self.chain).head_header().unwrap();
let header = self.chain().head_header().unwrap();
debug!(
LOGGER,
@ -140,7 +128,10 @@ impl p2p::ChainAdapter for NetToChainAdapter {
}
} else {
// check at least the header is valid before hydrating
if let Err(e) = w(&self.chain).process_block_header(&cb.header, self.chain_opts()) {
if let Err(e) = self
.chain()
.process_block_header(&cb.header, self.chain_opts())
{
debug!(LOGGER, "Invalid compact block header {}: {}", cb_hash, e);
return !e.is_bad_data();
}
@ -170,12 +161,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
}
};
let chain = self
.chain
.upgrade()
.expect("failed to upgrade weak ref to chain");
if let Ok(prev) = chain.get_block_header(&cb.header.previous) {
if let Ok(prev) = self.chain().get_block_header(&cb.header.previous) {
if block
.validate(
&prev.total_kernel_offset,
@ -220,7 +206,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
// pushing the new block header through the header chain pipeline
// we will go ask for the block if this is a new header
let res = w(&self.chain).process_block_header(&bh, self.chain_opts());
let res = self.chain().process_block_header(&bh, self.chain_opts());
if let &Err(ref e) = &res {
debug!(
@ -234,7 +220,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
LOGGER,
"header_received: {} is a bad header, resetting header head", bhash
);
let _ = w(&self.chain).reset_head();
let _ = self.chain().reset_head();
return false;
} else {
// we got an error when trying to process the block header
@ -264,7 +250,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
}
// try to add headers to our header chain
let res = w(&self.chain).sync_block_headers(&bhs, self.chain_opts());
let res = self.chain().sync_block_headers(&bhs, self.chain_opts());
if let &Err(ref e) = &res {
debug!(LOGGER, "Block headers refused by chain: {:?}", e);
@ -289,7 +275,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
let hh = header.height;
let mut headers = vec![];
for h in (hh + 1)..(hh + (p2p::MAX_BLOCK_HEADERS as u64)) {
let header = w(&self.chain).get_header_by_height(h);
let header = self.chain().get_header_by_height(h);
match header {
Ok(head) => headers.push(head),
Err(e) => match e.kind() {
@ -313,7 +299,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
/// Gets a full block by its hash.
fn get_block(&self, h: Hash) -> Option<core::Block> {
let b = w(&self.chain).get_block(&h);
let b = self.chain().get_block(&h);
match b {
Ok(b) => Some(b),
_ => None,
@ -324,7 +310,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
/// the required indexes for a consumer to rewind to a consistent state
/// at the provided block hash.
fn txhashset_read(&self, h: Hash) -> Option<p2p::TxHashSetRead> {
match w(&self.chain).txhashset_read(h.clone()) {
match self.chain().txhashset_read(h.clone()) {
Ok((out_index, kernel_index, read)) => Some(p2p::TxHashSetRead {
output_index: out_index,
kernel_index: kernel_index,
@ -354,7 +340,9 @@ impl p2p::ChainAdapter for NetToChainAdapter {
return true;
}
if let Err(e) = w(&self.chain).txhashset_write(h, txhashset_data, self.sync_state.as_ref())
if let Err(e) = self
.chain()
.txhashset_write(h, txhashset_data, self.sync_state.as_ref())
{
error!(LOGGER, "Failed to save txhashset archive: {}", e);
let is_good_data = !e.is_bad_data();
@ -372,7 +360,7 @@ impl NetToChainAdapter {
pub fn new(
sync_state: Arc<SyncState>,
archive_mode: bool,
chain_ref: Weak<chain::Chain>,
chain: Arc<chain::Chain>,
tx_pool: Arc<RwLock<pool::TransactionPool>>,
verifier_cache: Arc<RwLock<VerifierCache>>,
config: ServerConfig,
@ -380,7 +368,7 @@ impl NetToChainAdapter {
NetToChainAdapter {
sync_state,
archive_mode,
chain: chain_ref,
chain: Arc::downgrade(&chain),
tx_pool,
verifier_cache,
peers: OneTime::new(),
@ -390,8 +378,21 @@ impl NetToChainAdapter {
/// Initialize a NetToChainAdaptor with reference to a Peers object.
/// Should only be called once.
pub fn init(&self, peers: Weak<p2p::Peers>) {
self.peers.init(peers);
pub fn init(&self, peers: Arc<p2p::Peers>) {
self.peers.init(Arc::downgrade(&peers));
}
fn peers(&self) -> Arc<p2p::Peers> {
self.peers
.borrow()
.upgrade()
.expect("Failed to upgrade weak ref to our peers.")
}
fn chain(&self) -> Arc<chain::Chain> {
self.chain
.upgrade()
.expect("Failed to upgrade weak ref to our chain.")
}
// recursively go back through the locator vector and stop when we find
@ -402,13 +403,12 @@ impl NetToChainAdapter {
return None;
}
let chain = w(&self.chain);
let known = chain.get_block_header(&locator[0]);
let known = self.chain().get_block_header(&locator[0]);
match known {
Ok(header) => {
// even if we know the block, it may not be on our winning chain
let known_winning = chain.get_header_by_height(header.height);
let known_winning = self.chain().get_header_by_height(header.height);
if let Ok(known_winning) = known_winning {
if known_winning.hash() != header.hash() {
self.find_common_header(locator[1..].to_vec())
@ -434,9 +434,8 @@ impl NetToChainAdapter {
// pushing the new block through the chain pipeline
// remembering to reset the head if we have a bad block
fn process_block(&self, b: core::Block, addr: SocketAddr) -> bool {
let chain = w(&self.chain);
if !self.archive_mode {
let head = chain.head().unwrap();
let head = self.chain().head().unwrap();
// we have a fast sync'd node and are sent a block older than our horizon,
// only sync can do something with that
if b.header.height < head
@ -449,7 +448,7 @@ impl NetToChainAdapter {
let prev_hash = b.header.previous;
let bhash = b.hash();
match chain.process_block(b, self.chain_opts()) {
match self.chain().process_block(b, self.chain_opts()) {
Ok(tip) => {
self.validate_chain(bhash);
self.check_compact(tip);
@ -460,7 +459,7 @@ impl NetToChainAdapter {
LOGGER,
"adapter: process_block: {} is a bad block, resetting head", bhash
);
let _ = chain.reset_head();
let _ = self.chain().reset_head();
// we potentially changed the state of the system here
// so check everything is still ok
@ -472,7 +471,7 @@ impl NetToChainAdapter {
match e.kind() {
chain::ErrorKind::Orphan => {
// make sure we did not miss the parent block
if !chain.is_orphan(&prev_hash) && !self.sync_state.is_syncing() {
if !self.chain().is_orphan(&prev_hash) && !self.sync_state.is_syncing() {
debug!(LOGGER, "adapter: process_block: received an orphan block, checking the parent: {:}", prev_hash);
self.request_block_by_hash(prev_hash, &addr)
}
@ -498,8 +497,7 @@ impl NetToChainAdapter {
// We are out of consensus at this point and want to track the problem
// down as soon as possible.
// Skip this if we are currently syncing (too slow).
let chain = w(&self.chain);
if chain.head().unwrap().height > 0
if self.chain().head().unwrap().height > 0
&& !self.sync_state.is_syncing()
&& self.config.chain_validation_mode == ChainValidationMode::EveryBlock
{
@ -510,8 +508,7 @@ impl NetToChainAdapter {
"adapter: process_block: ***** validating full chain state at {}", bhash,
);
let chain = w(&self.chain);
chain
self.chain()
.validate(true)
.expect("chain validation failed, hard stop");
@ -533,7 +530,7 @@ impl NetToChainAdapter {
// trigger compaction every 2000 blocks, uses a different thread to avoid
// blocking the caller thread (likely a peer)
if tip.height % 2000 == 0 {
let chain = w(&self.chain);
let chain = self.chain().clone();
let _ = thread::Builder::new()
.name("compactor".to_string())
.spawn(move || {
@ -570,8 +567,8 @@ impl NetToChainAdapter {
where
F: Fn(&p2p::Peer, Hash) -> Result<(), p2p::Error>,
{
match w(&self.chain).block_exists(h) {
Ok(false) => match wo(&self.peers).get_connected_peer(addr) {
match self.chain().block_exists(h) {
Ok(false) => match self.peers().get_connected_peer(addr) {
None => debug!(
LOGGER,
"send_block_request_to_peer: can't send request to peer {:?}, not connected",
@ -637,10 +634,10 @@ impl ChainAdapter for ChainToPoolAndNetAdapter {
if opts.contains(Options::MINE) {
// propagate compact block out if we mined the block
let cb: CompactBlock = b.clone().into();
wo(&self.peers).broadcast_compact_block(&cb);
self.peers().broadcast_compact_block(&cb);
} else {
// "header first" propagation if we are not the originator of this block
wo(&self.peers).broadcast_header(&b.header);
self.peers().broadcast_header(&b.header);
}
}
}
@ -660,8 +657,15 @@ impl ChainToPoolAndNetAdapter {
/// Initialize a ChainToPoolAndNetAdapter instance with handle to a Peers
/// object. Should only be called once.
pub fn init(&self, peers: Weak<p2p::Peers>) {
self.peers.init(peers);
pub fn init(&self, peers: Arc<p2p::Peers>) {
self.peers.init(Arc::downgrade(&peers));
}
fn peers(&self) -> Arc<p2p::Peers> {
self.peers
.borrow()
.upgrade()
.expect("Failed to upgrade weak ref to our peers.")
}
}
@ -673,13 +677,13 @@ pub struct PoolToNetAdapter {
impl pool::PoolAdapter for PoolToNetAdapter {
fn stem_tx_accepted(&self, tx: &core::Transaction) -> Result<(), pool::PoolError> {
wo(&self.peers)
self.peers()
.broadcast_stem_transaction(tx)
.map_err(|_| pool::PoolError::DandelionError)?;
Ok(())
}
fn tx_accepted(&self, tx: &core::Transaction) {
wo(&self.peers).broadcast_transaction(tx);
self.peers().broadcast_transaction(tx);
}
}
@ -692,8 +696,15 @@ impl PoolToNetAdapter {
}
/// Setup the p2p server on the adapter
pub fn init(&self, peers: Weak<p2p::Peers>) {
self.peers.init(peers);
pub fn init(&self, peers: Arc<p2p::Peers>) {
self.peers.init(Arc::downgrade(&peers));
}
fn peers(&self) -> Arc<p2p::Peers> {
self.peers
.borrow()
.upgrade()
.expect("Failed to upgrade weak ref to our peers.")
}
}
@ -714,44 +725,51 @@ impl PoolToChainAdapter {
}
/// Set the pool adapter's chain. Should only be called once.
pub fn set_chain(&self, chain_ref: Weak<chain::Chain>) {
self.chain.init(chain_ref);
pub fn set_chain(&self, chain_ref: Arc<chain::Chain>) {
self.chain.init(Arc::downgrade(&chain_ref));
}
fn chain(&self) -> Arc<chain::Chain> {
self.chain
.borrow()
.upgrade()
.expect("Failed to upgrade the weak ref to our chain.")
}
}
impl pool::BlockChain for PoolToChainAdapter {
fn chain_head(&self) -> Result<BlockHeader, pool::PoolError> {
wo(&self.chain)
self.chain()
.head_header()
.map_err(|_| pool::PoolError::Other(format!("failed to get head_header")))
}
fn get_block_header(&self, hash: &Hash) -> Result<BlockHeader, pool::PoolError> {
wo(&self.chain)
self.chain()
.get_block_header(hash)
.map_err(|_| pool::PoolError::Other(format!("failed to get block_header")))
}
fn get_block_sums(&self, hash: &Hash) -> Result<BlockSums, pool::PoolError> {
wo(&self.chain)
self.chain()
.get_block_sums(hash)
.map_err(|_| pool::PoolError::Other(format!("failed to get block_sums")))
}
fn validate_tx(&self, tx: &Transaction) -> Result<(), pool::PoolError> {
wo(&self.chain)
self.chain()
.validate_tx(tx)
.map_err(|_| pool::PoolError::Other(format!("failed to validate tx")))
}
fn verify_coinbase_maturity(&self, tx: &Transaction) -> Result<(), pool::PoolError> {
wo(&self.chain)
self.chain()
.verify_coinbase_maturity(tx)
.map_err(|_| pool::PoolError::ImmatureCoinbase)
}
fn verify_tx_lock_height(&self, tx: &Transaction) -> Result<(), pool::PoolError> {
wo(&self.chain)
self.chain()
.verify_tx_lock_height(tx)
.map_err(|_| pool::PoolError::ImmatureTransaction)
}

View file

@ -53,8 +53,6 @@ pub fn monitor_transactions(
let patience_secs = dandelion_config.patience_secs.unwrap();
thread::sleep(Duration::from_secs(patience_secs));
let tx_pool = tx_pool.clone();
// Step 1: find all "ToStem" entries in stempool from last run.
// Aggregate them up to give a single (valid) aggregated tx and propagate it
// to the next Dandelion relay along the stem.
@ -90,7 +88,7 @@ fn process_stem_phase(
) -> Result<(), PoolError> {
let mut tx_pool = tx_pool.write().unwrap();
let header = tx_pool.blockchain.chain_head()?;
let header = tx_pool.chain_head()?;
let txpool_tx = tx_pool.txpool.aggregate_transaction()?;
let stem_txs = tx_pool
@ -137,7 +135,7 @@ fn process_fluff_phase(
) -> Result<(), PoolError> {
let mut tx_pool = tx_pool.write().unwrap();
let header = tx_pool.blockchain.chain_head()?;
let header = tx_pool.chain_head()?;
let txpool_tx = tx_pool.txpool.aggregate_transaction()?;
let stem_txs = tx_pool
@ -239,7 +237,7 @@ fn process_expired_entries(
{
let mut tx_pool = tx_pool.write().unwrap();
let header = tx_pool.blockchain.chain_head()?;
let header = tx_pool.chain_head()?;
for entry in expired_entries {
let src = TxSource {

View file

@ -168,14 +168,14 @@ impl Server {
archive_mode,
)?);
pool_adapter.set_chain(Arc::downgrade(&shared_chain));
pool_adapter.set_chain(shared_chain.clone());
let awaiting_peers = Arc::new(AtomicBool::new(false));
let net_adapter = Arc::new(NetToChainAdapter::new(
sync_state.clone(),
archive_mode,
Arc::downgrade(&shared_chain),
shared_chain.clone(),
tx_pool.clone(),
verifier_cache.clone(),
config.clone(),
@ -197,9 +197,9 @@ impl Server {
archive_mode,
block_1_hash,
)?);
chain_adapter.init(Arc::downgrade(&p2p_server.peers));
pool_net_adapter.init(Arc::downgrade(&p2p_server.peers));
net_adapter.init(Arc::downgrade(&p2p_server.peers));
chain_adapter.init(p2p_server.peers.clone());
pool_net_adapter.init(p2p_server.peers.clone());
net_adapter.init(p2p_server.peers.clone());
if config.p2p_config.seeding_type.clone() != p2p::Seeding::Programmatic {
let seeder = match config.p2p_config.seeding_type.clone() {
@ -258,9 +258,9 @@ impl Server {
let api_secret = get_first_line(config.api_secret_path.clone());
api::start_rest_apis(
config.api_http_addr.clone(),
Arc::downgrade(&shared_chain),
Arc::downgrade(&tx_pool),
Arc::downgrade(&p2p_server.peers),
shared_chain.clone(),
tx_pool.clone(),
p2p_server.peers.clone(),
api_secret,
None,
);

View file

@ -58,9 +58,9 @@ pub mod macros;
// other utils
use byteorder::{BigEndian, ByteOrder};
use std::cell::{Ref, RefCell};
#[allow(unused_imports)]
use std::ops::Deref;
use std::sync::{Arc, RwLock};
mod hex;
pub use hex::*;
@ -70,44 +70,42 @@ pub mod file;
/// Compress and decompress zip bz2 archives
pub mod zip;
/// Encapsulation of a RefCell<Option<T>> for one-time initialization after
/// construction. This implementation will purposefully fail hard if not used
/// properly, for example if it's not initialized before being first used
/// Encapsulation of a RwLock<Option<T>> for one-time initialization.
/// This implementation will purposefully fail hard if not used
/// properly, for example if not initialized before being first used
/// (borrowed).
#[derive(Clone)]
pub struct OneTime<T> {
/// inner
inner: RefCell<Option<T>>,
/// The inner value.
inner: Arc<RwLock<Option<T>>>,
}
unsafe impl<T> Sync for OneTime<T> {}
unsafe impl<T> Send for OneTime<T> {}
impl<T> OneTime<T> {
impl<T> OneTime<T>
where
T: Clone,
{
/// Builds a new uninitialized OneTime.
pub fn new() -> OneTime<T> {
OneTime {
inner: RefCell::new(None),
inner: Arc::new(RwLock::new(None)),
}
}
/// Initializes the OneTime, should only be called once after construction.
/// Will panic (via assert) if called more than once.
pub fn init(&self, value: T) {
let mut inner_mut = self.inner.borrow_mut();
*inner_mut = Some(value);
}
/// Whether the OneTime has been initialized
pub fn is_initialized(&self) -> bool {
match self.inner.try_borrow() {
Ok(inner) => inner.is_some(),
Err(_) => false,
}
let mut inner = self.inner.write().unwrap();
assert!(inner.is_none());
*inner = Some(value);
}
/// Borrows the OneTime, should only be called after initialization.
pub fn borrow(&self) -> Ref<T> {
Ref::map(self.inner.borrow(), |o| o.as_ref().unwrap())
/// Will panic (via expect) if called before initialization.
pub fn borrow(&self) -> T {
let inner = self.inner.read().unwrap();
inner
.clone()
.expect("Cannot borrow one_time before initialization.")
}
}