mirror of
https://github.com/mimblewimble/grin.git
synced 2025-05-06 09:11:13 +03:00
Clean server shutdown, generalizes usage of Weak
(#700)
* Clean server shutdown, generalizes usage of `Weak`. Introduces 2 main changes: * A shared `AtomicBool` that signals server shutdown. All server threads regularly check it to break out of their main loops when it changes to `true`. * Breaking of circular `Arc` references, which can never be destroyed, by downgrading to `Weak` instead. Only the main server keeps the `Arc` while all other components get the `Weak` variant. Both of these are required for all long-living structs to be cleanly destroyed. Note that in Rust this is fairly important as most resource-freeing logic is associated with `drop`, which is only called when said struct is free of scope or `Arc` references. Should address most of #536 (only need the stop hook to call `Server` shutdown). * Test for fast sync, followed by restart, followed by re-fast-sync * P2P test fix * Double sync taking too long for Travis, commenting out for now
This commit is contained in:
parent
7dcbb8824d
commit
76796738c1
11 changed files with 282 additions and 179 deletions
api/src
grin
p2p
|
@ -13,7 +13,7 @@
|
|||
// limitations under the License.
|
||||
|
||||
use std::io::Read;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::sync::{Arc, Weak, RwLock};
|
||||
use std::thread;
|
||||
|
||||
use iron::prelude::*;
|
||||
|
@ -36,6 +36,14 @@ use types::*;
|
|||
use util;
|
||||
use util::LOGGER;
|
||||
|
||||
// All handlers use `Weak` references instead of `Arc` to avoid cycles that
|
||||
// can never be destroyed. These 2 functions are simple helpers to reduce the
|
||||
// boilerplate of dealing with `Weak`.
|
||||
fn w<T>(weak: &Weak<T>) -> Arc<T> {
|
||||
weak.upgrade().unwrap()
|
||||
}
|
||||
|
||||
|
||||
// RESTful index of available api endpoints
|
||||
// GET /v1/
|
||||
struct IndexHandler {
|
||||
|
@ -55,7 +63,7 @@ impl Handler for IndexHandler {
|
|||
// GET /v1/chain/utxos/byids?id=xxx&id=yyy&id=zzz
|
||||
// GET /v1/chain/utxos/byheight?start_height=101&end_height=200
|
||||
struct UtxoHandler {
|
||||
chain: Arc<chain::Chain>,
|
||||
chain: Weak<chain::Chain>,
|
||||
}
|
||||
|
||||
impl UtxoHandler {
|
||||
|
@ -73,7 +81,7 @@ impl UtxoHandler {
|
|||
];
|
||||
|
||||
for x in outputs.iter() {
|
||||
if let Ok(_) = self.chain.is_unspent(&x) {
|
||||
if let Ok(_) = w(&self.chain).is_unspent(&x) {
|
||||
return Ok(Utxo::new(&commit))
|
||||
}
|
||||
}
|
||||
|
@ -109,11 +117,10 @@ impl UtxoHandler {
|
|||
commitments: Vec<Commitment>,
|
||||
include_proof: bool,
|
||||
) -> BlockOutputs {
|
||||
let header = self.chain
|
||||
.clone()
|
||||
let header = w(&self.chain)
|
||||
.get_header_by_height(block_height)
|
||||
.unwrap();
|
||||
let block = self.chain.clone().get_block(&header.hash()).unwrap();
|
||||
let block = w(&self.chain).get_block(&header.hash()).unwrap();
|
||||
let outputs = block
|
||||
.outputs
|
||||
.iter()
|
||||
|
@ -121,7 +128,7 @@ impl UtxoHandler {
|
|||
commitments.is_empty() || commitments.contains(&output.commit)
|
||||
})
|
||||
.map(|output| {
|
||||
OutputPrintable::from_output(output, self.chain.clone(), include_proof)
|
||||
OutputPrintable::from_output(output, w(&self.chain), include_proof)
|
||||
})
|
||||
.collect();
|
||||
BlockOutputs {
|
||||
|
@ -207,28 +214,28 @@ impl Handler for UtxoHandler {
|
|||
// GET /v1/sumtrees/lastrangeproofs
|
||||
// GET /v1/sumtrees/lastkernels
|
||||
struct SumTreeHandler {
|
||||
chain: Arc<chain::Chain>,
|
||||
chain: Weak<chain::Chain>,
|
||||
}
|
||||
|
||||
impl SumTreeHandler {
|
||||
// gets roots
|
||||
fn get_roots(&self) -> SumTrees {
|
||||
SumTrees::from_head(self.chain.clone())
|
||||
SumTrees::from_head(w(&self.chain))
|
||||
}
|
||||
|
||||
// gets last n utxos inserted in to the tree
|
||||
fn get_last_n_utxo(&self, distance: u64) -> Vec<SumTreeNode> {
|
||||
SumTreeNode::get_last_n_utxo(self.chain.clone(), distance)
|
||||
SumTreeNode::get_last_n_utxo(w(&self.chain), distance)
|
||||
}
|
||||
|
||||
// gets last n utxos inserted in to the tree
|
||||
fn get_last_n_rangeproof(&self, distance: u64) -> Vec<SumTreeNode> {
|
||||
SumTreeNode::get_last_n_rangeproof(self.chain.clone(), distance)
|
||||
SumTreeNode::get_last_n_rangeproof(w(&self.chain), distance)
|
||||
}
|
||||
|
||||
// gets last n utxos inserted in to the tree
|
||||
fn get_last_n_kernel(&self, distance: u64) -> Vec<SumTreeNode> {
|
||||
SumTreeNode::get_last_n_kernel(self.chain.clone(), distance)
|
||||
SumTreeNode::get_last_n_kernel(w(&self.chain), distance)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -261,24 +268,24 @@ impl Handler for SumTreeHandler {
|
|||
}
|
||||
|
||||
pub struct PeersAllHandler {
|
||||
pub peers: p2p::Peers,
|
||||
pub peers: Weak<p2p::Peers>,
|
||||
}
|
||||
|
||||
impl Handler for PeersAllHandler {
|
||||
fn handle(&self, _req: &mut Request) -> IronResult<Response> {
|
||||
let peers = &self.peers.all_peers();
|
||||
let peers = &w(&self.peers).all_peers();
|
||||
json_response_pretty(&peers)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PeersConnectedHandler {
|
||||
pub peers: p2p::Peers,
|
||||
pub peers: Weak<p2p::Peers>,
|
||||
}
|
||||
|
||||
impl Handler for PeersConnectedHandler {
|
||||
fn handle(&self, _req: &mut Request) -> IronResult<Response> {
|
||||
let mut peers = vec![];
|
||||
for p in &self.peers.connected_peers() {
|
||||
for p in &w(&self.peers).connected_peers() {
|
||||
let p = p.read().unwrap();
|
||||
let peer_info = p.info.clone();
|
||||
peers.push(peer_info);
|
||||
|
@ -291,7 +298,7 @@ impl Handler for PeersConnectedHandler {
|
|||
/// POST /v1/peers/10.12.12.13/ban
|
||||
/// POST /v1/peers/10.12.12.13/unban
|
||||
pub struct PeerPostHandler {
|
||||
pub peers: p2p::Peers,
|
||||
pub peers: Weak<p2p::Peers>,
|
||||
}
|
||||
|
||||
impl Handler for PeerPostHandler {
|
||||
|
@ -305,7 +312,7 @@ impl Handler for PeerPostHandler {
|
|||
"ban" => {
|
||||
path_elems.pop();
|
||||
if let Ok(addr) = path_elems.last().unwrap().parse() {
|
||||
self.peers.ban_peer(&addr);
|
||||
w(&self.peers).ban_peer(&addr);
|
||||
Ok(Response::with((status::Ok, "")))
|
||||
} else {
|
||||
Ok(Response::with((status::BadRequest, "")))
|
||||
|
@ -314,7 +321,7 @@ impl Handler for PeerPostHandler {
|
|||
"unban" => {
|
||||
path_elems.pop();
|
||||
if let Ok(addr) = path_elems.last().unwrap().parse() {
|
||||
self.peers.unban_peer(&addr);
|
||||
w(&self.peers).unban_peer(&addr);
|
||||
Ok(Response::with((status::Ok, "")))
|
||||
} else {
|
||||
Ok(Response::with((status::BadRequest, "")))
|
||||
|
@ -327,7 +334,7 @@ impl Handler for PeerPostHandler {
|
|||
|
||||
/// Get details about a given peer
|
||||
pub struct PeerGetHandler {
|
||||
pub peers: p2p::Peers,
|
||||
pub peers: Weak<p2p::Peers>,
|
||||
}
|
||||
|
||||
impl Handler for PeerGetHandler {
|
||||
|
@ -338,7 +345,7 @@ impl Handler for PeerGetHandler {
|
|||
path_elems.pop();
|
||||
}
|
||||
if let Ok(addr) = path_elems.last().unwrap().parse() {
|
||||
match self.peers.get_peer(addr) {
|
||||
match w(&self.peers).get_peer(addr) {
|
||||
Ok(peer) => json_response(&peer),
|
||||
Err(_) => Ok(Response::with((status::BadRequest, ""))),
|
||||
}
|
||||
|
@ -351,13 +358,13 @@ impl Handler for PeerGetHandler {
|
|||
// Status handler. Post a summary of the server status
|
||||
// GET /v1/status
|
||||
pub struct StatusHandler {
|
||||
pub chain: Arc<chain::Chain>,
|
||||
pub peers: p2p::Peers,
|
||||
pub chain: Weak<chain::Chain>,
|
||||
pub peers: Weak<p2p::Peers>,
|
||||
}
|
||||
|
||||
impl StatusHandler {
|
||||
fn get_status(&self) -> Status {
|
||||
Status::from_tip_and_peers(self.chain.head().unwrap(), self.peers.peer_count())
|
||||
Status::from_tip_and_peers(w(&self.chain).head().unwrap(), w(&self.peers).peer_count())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -370,12 +377,12 @@ impl Handler for StatusHandler {
|
|||
// Chain handler. Get the head details.
|
||||
// GET /v1/chain
|
||||
pub struct ChainHandler {
|
||||
pub chain: Arc<chain::Chain>,
|
||||
pub chain: Weak<chain::Chain>,
|
||||
}
|
||||
|
||||
impl ChainHandler {
|
||||
fn get_tip(&self) -> Tip {
|
||||
Tip::from_tip(self.chain.head().unwrap())
|
||||
Tip::from_tip(w(&self.chain).head().unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -393,31 +400,31 @@ impl Handler for ChainHandler {
|
|||
/// GET /v1/blocks/<hash>?compact
|
||||
///
|
||||
pub struct BlockHandler {
|
||||
pub chain: Arc<chain::Chain>,
|
||||
pub chain: Weak<chain::Chain>,
|
||||
}
|
||||
|
||||
impl BlockHandler {
|
||||
fn get_block(&self, h: &Hash) -> Result<BlockPrintable, Error> {
|
||||
let block = self.chain.clone().get_block(h).map_err(|_| Error::NotFound)?;
|
||||
let block = w(&self.chain).get_block(h).map_err(|_| Error::NotFound)?;
|
||||
Ok(BlockPrintable::from_block(
|
||||
&block,
|
||||
self.chain.clone(),
|
||||
w(&self.chain),
|
||||
false,
|
||||
))
|
||||
}
|
||||
|
||||
fn get_compact_block(&self, h: &Hash) -> Result<CompactBlockPrintable, Error> {
|
||||
let block = self.chain.clone().get_block(h).map_err(|_| Error::NotFound)?;
|
||||
let block = w(&self.chain).get_block(h).map_err(|_| Error::NotFound)?;
|
||||
Ok(CompactBlockPrintable::from_compact_block(
|
||||
&block.as_compact_block(),
|
||||
self.chain.clone(),
|
||||
w(&self.chain),
|
||||
))
|
||||
}
|
||||
|
||||
// Try to decode the string as a height or a hash.
|
||||
fn parse_input(&self, input: String) -> Result<Hash, Error> {
|
||||
if let Ok(height) = input.parse() {
|
||||
match self.chain.clone().get_header_by_height(height) {
|
||||
match w(&self.chain).get_header_by_height(height) {
|
||||
Ok(header) => return Ok(header.hash()),
|
||||
Err(_) => return Err(Error::NotFound),
|
||||
}
|
||||
|
@ -462,7 +469,7 @@ impl Handler for BlockHandler {
|
|||
|
||||
// Get basic information about the transaction pool.
|
||||
struct PoolInfoHandler<T> {
|
||||
tx_pool: Arc<RwLock<pool::TransactionPool<T>>>,
|
||||
tx_pool: Weak<RwLock<pool::TransactionPool<T>>>,
|
||||
}
|
||||
|
||||
impl<T> Handler for PoolInfoHandler<T>
|
||||
|
@ -470,7 +477,8 @@ where
|
|||
T: pool::BlockChain + Send + Sync + 'static,
|
||||
{
|
||||
fn handle(&self, _req: &mut Request) -> IronResult<Response> {
|
||||
let pool = self.tx_pool.read().unwrap();
|
||||
let pool_arc = w(&self.tx_pool);
|
||||
let pool = pool_arc.read().unwrap();
|
||||
json_response(&PoolInfo {
|
||||
pool_size: pool.pool_size(),
|
||||
orphans_size: pool.orphans_size(),
|
||||
|
@ -488,7 +496,7 @@ struct TxWrapper {
|
|||
// Push new transactions to our transaction pool, that should broadcast it
|
||||
// to the network if valid.
|
||||
struct PoolPushHandler<T> {
|
||||
tx_pool: Arc<RwLock<pool::TransactionPool<T>>>,
|
||||
tx_pool: Weak<RwLock<pool::TransactionPool<T>>>,
|
||||
}
|
||||
|
||||
impl<T> Handler for PoolPushHandler<T>
|
||||
|
@ -517,10 +525,8 @@ where
|
|||
tx.outputs.len()
|
||||
);
|
||||
|
||||
let res = self.tx_pool
|
||||
.write()
|
||||
.unwrap()
|
||||
.add_to_memory_pool(source, tx);
|
||||
let pool_arc = w(&self.tx_pool);
|
||||
let res = pool_arc.write().unwrap().add_to_memory_pool(source, tx);
|
||||
|
||||
match res {
|
||||
Ok(()) => Ok(Response::with(status::Ok)),
|
||||
|
@ -556,11 +562,17 @@ where
|
|||
}
|
||||
/// Start all server HTTP handlers. Register all of them with Iron
|
||||
/// and runs the corresponding HTTP server.
|
||||
///
|
||||
/// Hyper currently has a bug that prevents clean shutdown. In order
|
||||
/// to avoid having references kept forever by handlers, we only pass
|
||||
/// weak references. Note that this likely means a crash if the handlers are
|
||||
/// used after a server shutdown (which should normally never happen,
|
||||
/// except during tests).
|
||||
pub fn start_rest_apis<T>(
|
||||
addr: String,
|
||||
chain: Arc<chain::Chain>,
|
||||
tx_pool: Arc<RwLock<pool::TransactionPool<T>>>,
|
||||
peers: p2p::Peers,
|
||||
chain: Weak<chain::Chain>,
|
||||
tx_pool: Weak<RwLock<pool::TransactionPool<T>>>,
|
||||
peers: Weak<p2p::Peers>,
|
||||
) where
|
||||
T: pool::BlockChain + Send + Sync + 'static,
|
||||
{
|
||||
|
|
|
@ -14,7 +14,8 @@
|
|||
|
||||
use std::fs::File;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::ops::Deref;
|
||||
use std::sync::{Arc, Weak, RwLock};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use rand;
|
||||
use rand::Rng;
|
||||
|
@ -31,23 +32,34 @@ use util::OneTime;
|
|||
use store;
|
||||
use util::LOGGER;
|
||||
|
||||
// All adapters use `Weak` references instead of `Arc` to avoid cycles that
|
||||
// can never be destroyed. These 2 functions are simple helpers to reduce the
|
||||
// boilerplate of dealing with `Weak`.
|
||||
fn w<T>(weak: &Weak<T>) -> Arc<T> {
|
||||
weak.upgrade().unwrap()
|
||||
}
|
||||
|
||||
fn wo<T>(weak_one: &OneTime<Weak<T>>) -> Arc<T> {
|
||||
w(weak_one.borrow().deref())
|
||||
}
|
||||
|
||||
/// Implementation of the NetAdapter for the blockchain. Gets notified when new
|
||||
/// blocks and transactions are received and forwards to the chain and pool
|
||||
/// implementations.
|
||||
pub struct NetToChainAdapter {
|
||||
currently_syncing: Arc<AtomicBool>,
|
||||
chain: Arc<chain::Chain>,
|
||||
chain: Weak<chain::Chain>,
|
||||
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
|
||||
peers: OneTime<p2p::Peers>,
|
||||
peers: OneTime<Weak<p2p::Peers>>,
|
||||
}
|
||||
|
||||
impl p2p::ChainAdapter for NetToChainAdapter {
|
||||
fn total_difficulty(&self) -> Difficulty {
|
||||
self.chain.total_difficulty()
|
||||
w(&self.chain).total_difficulty()
|
||||
}
|
||||
|
||||
fn total_height(&self) -> u64 {
|
||||
self.chain.head().unwrap().height
|
||||
w(&self.chain).head().unwrap().height
|
||||
}
|
||||
|
||||
fn transaction_received(&self, tx: core::Transaction) {
|
||||
|
@ -120,13 +132,13 @@ impl p2p::ChainAdapter for NetToChainAdapter {
|
|||
|
||||
// pushing the new block header through the header chain pipeline
|
||||
// we will go ask for the block if this is a new header
|
||||
let res = self.chain.process_block_header(&bh, self.chain_opts());
|
||||
let res = w(&self.chain).process_block_header(&bh, self.chain_opts());
|
||||
|
||||
if let &Err(ref e) = &res {
|
||||
debug!(LOGGER, "Block header {} refused by chain: {:?}", bhash, e);
|
||||
if e.is_bad_data() {
|
||||
debug!(LOGGER, "header_received: {} is a bad header, resetting header head", bhash);
|
||||
let _ = self.chain.reset_head();
|
||||
let _ = w(&self.chain).reset_head();
|
||||
return false;
|
||||
} else {
|
||||
// we got an error when trying to process the block header
|
||||
|
@ -154,7 +166,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
|
|||
// try to add each header to our header chain
|
||||
let mut added_hs = vec![];
|
||||
for bh in bhs {
|
||||
let res = self.chain.sync_block_header(&bh, self.chain_opts());
|
||||
let res = w(&self.chain).sync_block_header(&bh, self.chain_opts());
|
||||
match res {
|
||||
Ok(_) => {
|
||||
added_hs.push(bh.hash());
|
||||
|
@ -185,7 +197,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
|
|||
}
|
||||
}
|
||||
|
||||
let header_head = self.chain.get_header_head().unwrap();
|
||||
let header_head = w(&self.chain).get_header_head().unwrap();
|
||||
info!(
|
||||
LOGGER,
|
||||
"Added {} headers to the header chain. Last: {} at {}.",
|
||||
|
@ -217,7 +229,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
|
|||
let hh = header.height;
|
||||
let mut headers = vec![];
|
||||
for h in (hh + 1)..(hh + (p2p::MAX_BLOCK_HEADERS as u64)) {
|
||||
let header = self.chain.get_header_by_height(h);
|
||||
let header = w(&self.chain).get_header_by_height(h);
|
||||
match header {
|
||||
Ok(head) => headers.push(head),
|
||||
Err(chain::Error::StoreErr(store::Error::NotFoundErr, _)) => break,
|
||||
|
@ -239,7 +251,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
|
|||
|
||||
/// Gets a full block by its hash.
|
||||
fn get_block(&self, h: Hash) -> Option<core::Block> {
|
||||
let b = self.chain.get_block(&h);
|
||||
let b = w(&self.chain).get_block(&h);
|
||||
match b {
|
||||
Ok(b) => Some(b),
|
||||
_ => None,
|
||||
|
@ -250,7 +262,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
|
|||
/// the required indexes for a consumer to rewind to a consistant state
|
||||
/// at the provided block hash.
|
||||
fn sumtrees_read(&self, h: Hash) -> Option<p2p::SumtreesRead> {
|
||||
match self.chain.sumtrees_read(h.clone()) {
|
||||
match w(&self.chain).sumtrees_read(h.clone()) {
|
||||
Ok((out_index, kernel_index, read)) => Some(p2p::SumtreesRead {
|
||||
output_index: out_index,
|
||||
kernel_index: kernel_index,
|
||||
|
@ -272,7 +284,9 @@ impl p2p::ChainAdapter for NetToChainAdapter {
|
|||
rewind_to_output: u64, rewind_to_kernel: u64,
|
||||
sumtree_data: File, peer_addr: SocketAddr) -> bool {
|
||||
// TODO check whether we should accept any sumtree now
|
||||
if let Err(e) = self.chain.sumtrees_write(h, rewind_to_output, rewind_to_kernel, sumtree_data) {
|
||||
if let Err(e) = w(&self.chain).
|
||||
sumtrees_write(h, rewind_to_output, rewind_to_kernel, sumtree_data) {
|
||||
|
||||
error!(LOGGER, "Failed to save sumtree archive: {:?}", e);
|
||||
!e.is_bad_data()
|
||||
} else {
|
||||
|
@ -286,7 +300,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
|
|||
impl NetToChainAdapter {
|
||||
pub fn new(
|
||||
currently_syncing: Arc<AtomicBool>,
|
||||
chain_ref: Arc<chain::Chain>,
|
||||
chain_ref: Weak<chain::Chain>,
|
||||
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
|
||||
) -> NetToChainAdapter {
|
||||
NetToChainAdapter {
|
||||
|
@ -297,7 +311,7 @@ impl NetToChainAdapter {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn init(&self, peers: p2p::Peers) {
|
||||
pub fn init(&self, peers: Weak<p2p::Peers>) {
|
||||
self.peers.init(peers);
|
||||
}
|
||||
|
||||
|
@ -309,12 +323,13 @@ impl NetToChainAdapter {
|
|||
return None;
|
||||
}
|
||||
|
||||
let known = self.chain.get_block_header(&locator[0]);
|
||||
let chain = w(&self.chain);
|
||||
let known = chain.get_block_header(&locator[0]);
|
||||
|
||||
match known {
|
||||
Ok(header) => {
|
||||
// even if we know the block, it may not be on our winning chain
|
||||
let known_winning = self.chain.get_header_by_height(header.height);
|
||||
let known_winning = chain.get_header_by_height(header.height);
|
||||
if let Ok(known_winning) = known_winning {
|
||||
if known_winning.hash() != header.hash() {
|
||||
self.find_common_header(locator[1..].to_vec())
|
||||
|
@ -339,12 +354,13 @@ impl NetToChainAdapter {
|
|||
// remembering to reset the head if we have a bad block
|
||||
fn process_block(&self, b: core::Block) -> bool {
|
||||
let bhash = b.hash();
|
||||
let res = self.chain.process_block(b, self.chain_opts());
|
||||
let chain = w(&self.chain);
|
||||
let res = chain.process_block(b, self.chain_opts());
|
||||
if let Err(ref e) = res {
|
||||
debug!(LOGGER, "Block {} refused by chain: {:?}", bhash, e);
|
||||
if e.is_bad_data() {
|
||||
debug!(LOGGER, "adapter: process_block: {} is a bad block, resetting head", bhash);
|
||||
let _ = self.chain.reset_head();
|
||||
let _ = chain.reset_head();
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
@ -358,8 +374,8 @@ impl NetToChainAdapter {
|
|||
// TODO - currently only request block from a single peer
|
||||
// consider additional peers for redundancy?
|
||||
fn request_block(&self, bh: &BlockHeader, addr: &SocketAddr) {
|
||||
if let None = self.peers.borrow().adapter.get_block(bh.hash()) {
|
||||
if let Some(peer) = self.peers.borrow().get_connected_peer(addr) {
|
||||
if let None = wo(&self.peers).adapter.get_block(bh.hash()) {
|
||||
if let Some(peer) = wo(&self.peers).get_connected_peer(addr) {
|
||||
if let Ok(peer) = peer.read() {
|
||||
let _ = peer.send_block_request(bh.hash());
|
||||
}
|
||||
|
@ -376,8 +392,8 @@ impl NetToChainAdapter {
|
|||
// TODO - currently only request block from a single peer
|
||||
// consider additional peers for redundancy?
|
||||
fn request_compact_block(&self, bh: &BlockHeader, addr: &SocketAddr) {
|
||||
if let None = self.peers.borrow().adapter.get_block(bh.hash()) {
|
||||
if let Some(peer) = self.peers.borrow().get_connected_peer(addr) {
|
||||
if let None = wo(&self.peers).adapter.get_block(bh.hash()) {
|
||||
if let Some(peer) = wo(&self.peers).get_connected_peer(addr) {
|
||||
if let Ok(peer) = peer.read() {
|
||||
let _ = peer.send_compact_block_request(bh.hash());
|
||||
}
|
||||
|
@ -403,7 +419,7 @@ impl NetToChainAdapter {
|
|||
/// the network to broadcast the block
|
||||
pub struct ChainToPoolAndNetAdapter {
|
||||
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
|
||||
peers: OneTime<p2p::Peers>,
|
||||
peers: OneTime<Weak<p2p::Peers>>,
|
||||
}
|
||||
|
||||
impl ChainAdapter for ChainToPoolAndNetAdapter {
|
||||
|
@ -436,16 +452,16 @@ impl ChainAdapter for ChainToPoolAndNetAdapter {
|
|||
|
||||
let mut rng = rand::thread_rng();
|
||||
if rng.gen() {
|
||||
self.peers.borrow().broadcast_block(&b);
|
||||
wo(&self.peers).broadcast_block(&b);
|
||||
} else {
|
||||
self.peers.borrow().broadcast_compact_block(&cb);
|
||||
wo(&self.peers).broadcast_compact_block(&cb);
|
||||
}
|
||||
} else {
|
||||
self.peers.borrow().broadcast_compact_block(&cb);
|
||||
wo(&self.peers).broadcast_compact_block(&cb);
|
||||
}
|
||||
} else {
|
||||
// "header first" propagation if we are not the originator of this block
|
||||
self.peers.borrow().broadcast_header(&b.header);
|
||||
wo(&self.peers).broadcast_header(&b.header);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -459,7 +475,7 @@ impl ChainToPoolAndNetAdapter {
|
|||
peers: OneTime::new(),
|
||||
}
|
||||
}
|
||||
pub fn init(&self, peers: p2p::Peers) {
|
||||
pub fn init(&self, peers: Weak<p2p::Peers>) {
|
||||
self.peers.init(peers);
|
||||
}
|
||||
}
|
||||
|
@ -467,12 +483,12 @@ impl ChainToPoolAndNetAdapter {
|
|||
/// Adapter between the transaction pool and the network, to relay
|
||||
/// transactions that have been accepted.
|
||||
pub struct PoolToNetAdapter {
|
||||
peers: OneTime<p2p::Peers>,
|
||||
peers: OneTime<Weak<p2p::Peers>>,
|
||||
}
|
||||
|
||||
impl pool::PoolAdapter for PoolToNetAdapter {
|
||||
fn tx_accepted(&self, tx: &core::Transaction) {
|
||||
self.peers.borrow().broadcast_transaction(tx);
|
||||
wo(&self.peers).broadcast_transaction(tx);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -485,7 +501,7 @@ impl PoolToNetAdapter {
|
|||
}
|
||||
|
||||
/// Setup the p2p server on the adapter
|
||||
pub fn init(&self, peers: p2p::Peers) {
|
||||
pub fn init(&self, peers: Weak<p2p::Peers>) {
|
||||
self.peers.init(peers);
|
||||
}
|
||||
}
|
||||
|
@ -495,7 +511,7 @@ impl PoolToNetAdapter {
|
|||
/// dependency between the pool and the chain.
|
||||
#[derive(Clone)]
|
||||
pub struct PoolToChainAdapter {
|
||||
chain: OneTime<Arc<chain::Chain>>,
|
||||
chain: OneTime<Weak<chain::Chain>>,
|
||||
}
|
||||
|
||||
impl PoolToChainAdapter {
|
||||
|
@ -506,15 +522,14 @@ impl PoolToChainAdapter {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn set_chain(&self, chain_ref: Arc<chain::Chain>) {
|
||||
pub fn set_chain(&self, chain_ref: Weak<chain::Chain>) {
|
||||
self.chain.init(chain_ref);
|
||||
}
|
||||
}
|
||||
|
||||
impl pool::BlockChain for PoolToChainAdapter {
|
||||
fn is_unspent(&self, output_ref: &OutputIdentifier) -> Result<(), pool::PoolError> {
|
||||
self.chain
|
||||
.borrow()
|
||||
wo(&self.chain)
|
||||
.is_unspent(output_ref)
|
||||
.map_err(|e| match e {
|
||||
chain::types::Error::OutputNotFound => pool::PoolError::OutputNotFound,
|
||||
|
@ -524,8 +539,7 @@ impl pool::BlockChain for PoolToChainAdapter {
|
|||
}
|
||||
|
||||
fn is_matured(&self, input: &Input, height: u64) -> Result<(), pool::PoolError> {
|
||||
self.chain
|
||||
.borrow()
|
||||
wo(&self.chain)
|
||||
.is_matured(input, height)
|
||||
.map_err(|e| match e {
|
||||
chain::types::Error::OutputNotFound => pool::PoolError::OutputNotFound,
|
||||
|
@ -534,9 +548,9 @@ impl pool::BlockChain for PoolToChainAdapter {
|
|||
}
|
||||
|
||||
fn head_header(&self) -> Result<BlockHeader, pool::PoolError> {
|
||||
self.chain
|
||||
.borrow()
|
||||
wo(&self.chain)
|
||||
.head_header()
|
||||
.map_err(|_| pool::PoolError::GenericPoolError)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
use rand::{self, Rng};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use time;
|
||||
|
@ -117,6 +118,7 @@ pub struct Miner {
|
|||
config: MinerConfig,
|
||||
chain: Arc<chain::Chain>,
|
||||
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
|
||||
stop: Arc<AtomicBool>,
|
||||
|
||||
// Just to hold the port we're on, so this miner can be identified
|
||||
// while watching debug output
|
||||
|
@ -130,12 +132,14 @@ impl Miner {
|
|||
config: MinerConfig,
|
||||
chain_ref: Arc<chain::Chain>,
|
||||
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
|
||||
stop: Arc<AtomicBool>,
|
||||
) -> Miner {
|
||||
Miner {
|
||||
config: config,
|
||||
chain: chain_ref,
|
||||
tx_pool: tx_pool,
|
||||
debug_output_id: String::from("none"),
|
||||
stop: stop,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -556,6 +560,10 @@ impl Miner {
|
|||
);
|
||||
key_id = block_fees.key_id();
|
||||
}
|
||||
|
||||
if self.stop.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -20,9 +20,10 @@ use std::io::Read;
|
|||
use std::net::SocketAddr;
|
||||
use std::str;
|
||||
use std::sync::{Arc, mpsc};
|
||||
use std::time;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::time::Duration;
|
||||
use std::thread;
|
||||
use time::now_utc;
|
||||
use time::{self, now_utc};
|
||||
|
||||
use hyper;
|
||||
|
||||
|
@ -38,6 +39,7 @@ pub fn connect_and_monitor(
|
|||
p2p_server: Arc<p2p::Server>,
|
||||
capabilities: p2p::Capabilities,
|
||||
seed_list: Box<Fn() -> Vec<SocketAddr> + Send>,
|
||||
stop: Arc<AtomicBool>,
|
||||
) {
|
||||
|
||||
let _ = thread::Builder::new()
|
||||
|
@ -52,20 +54,31 @@ pub fn connect_and_monitor(
|
|||
// check seeds first
|
||||
connect_to_seeds(peers.clone(), tx.clone(), seed_list);
|
||||
|
||||
let mut prev = time::now_utc() - time::Duration::seconds(60);
|
||||
loop {
|
||||
// try to connect to any address sent to the channel
|
||||
listen_for_addrs(peers.clone(), p2p_server.clone(), capabilities, &rx);
|
||||
let current_time = time::now_utc();
|
||||
|
||||
// monitor additional peers if we need to add more
|
||||
monitor_peers(peers.clone(), capabilities, tx.clone());
|
||||
if current_time - prev > time::Duration::seconds(20) {
|
||||
// try to connect to any address sent to the channel
|
||||
listen_for_addrs(peers.clone(), p2p_server.clone(), capabilities, &rx);
|
||||
|
||||
thread::sleep(time::Duration::from_secs(20));
|
||||
// monitor additional peers if we need to add more
|
||||
monitor_peers(peers.clone(), capabilities, tx.clone());
|
||||
|
||||
prev = current_time;
|
||||
}
|
||||
|
||||
thread::sleep(Duration::from_secs(1));
|
||||
|
||||
if stop.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
fn monitor_peers(
|
||||
peers: p2p::Peers,
|
||||
peers: Arc<p2p::Peers>,
|
||||
capabilities: p2p::Capabilities,
|
||||
tx: mpsc::Sender<SocketAddr>,
|
||||
) {
|
||||
|
@ -143,7 +156,7 @@ fn monitor_peers(
|
|||
// Check if we have any pre-existing peer in db. If so, start with those,
|
||||
// otherwise use the seeds provided.
|
||||
fn connect_to_seeds(
|
||||
peers: p2p::Peers,
|
||||
peers: Arc<p2p::Peers>,
|
||||
tx: mpsc::Sender<SocketAddr>,
|
||||
seed_list: Box<Fn() -> Vec<SocketAddr>>,
|
||||
) {
|
||||
|
@ -172,7 +185,7 @@ fn connect_to_seeds(
|
|||
/// connection if the max peer count isn't exceeded. A request for more
|
||||
/// peers is also automatically sent after connection.
|
||||
fn listen_for_addrs(
|
||||
peers: p2p::Peers,
|
||||
peers: Arc<p2p::Peers>,
|
||||
p2p: Arc<p2p::Server>,
|
||||
capab: p2p::Capabilities,
|
||||
rx: &mpsc::Receiver<SocketAddr>,
|
||||
|
|
|
@ -40,17 +40,18 @@ pub struct Server {
|
|||
/// server config
|
||||
pub config: ServerConfig,
|
||||
/// handle to our network server
|
||||
p2p: Arc<p2p::Server>,
|
||||
pub p2p: Arc<p2p::Server>,
|
||||
/// data store access
|
||||
chain: Arc<chain::Chain>,
|
||||
pub chain: Arc<chain::Chain>,
|
||||
/// in-memory transaction pool
|
||||
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
|
||||
currently_syncing: Arc<AtomicBool>,
|
||||
stop: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl Server {
|
||||
/// Instantiates and starts a new server.
|
||||
pub fn start(config: ServerConfig) -> Result<Server, Error> {
|
||||
pub fn start(config: ServerConfig) -> Result<(), Error> {
|
||||
let mut mining_config = config.mining_config.clone();
|
||||
let serv = Server::new(config)?;
|
||||
if mining_config.as_mut().unwrap().enable_mining {
|
||||
|
@ -58,12 +59,17 @@ impl Server {
|
|||
}
|
||||
|
||||
loop {
|
||||
thread::sleep(time::Duration::from_secs(10));
|
||||
thread::sleep(time::Duration::from_secs(1));
|
||||
if serv.stop.load(Ordering::Relaxed) {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Instantiates a new server associated with the provided future reactor.
|
||||
pub fn new(mut config: ServerConfig) -> Result<Server, Error> {
|
||||
let stop = Arc::new(AtomicBool::new(false));
|
||||
|
||||
let pool_adapter = Arc::new(PoolToChainAdapter::new());
|
||||
let pool_net_adapter = Arc::new(PoolToNetAdapter::new());
|
||||
let tx_pool = Arc::new(RwLock::new(pool::TransactionPool::new(
|
||||
|
@ -92,13 +98,13 @@ impl Server {
|
|||
pow::verify_size,
|
||||
)?);
|
||||
|
||||
pool_adapter.set_chain(shared_chain.clone());
|
||||
pool_adapter.set_chain(Arc::downgrade(&shared_chain));
|
||||
|
||||
let currently_syncing = Arc::new(AtomicBool::new(true));
|
||||
|
||||
let net_adapter = Arc::new(NetToChainAdapter::new(
|
||||
currently_syncing.clone(),
|
||||
shared_chain.clone(),
|
||||
Arc::downgrade(&shared_chain),
|
||||
tx_pool.clone(),
|
||||
));
|
||||
|
||||
|
@ -109,10 +115,11 @@ impl Server {
|
|||
p2p_config,
|
||||
net_adapter.clone(),
|
||||
genesis.hash(),
|
||||
stop.clone(),
|
||||
)?);
|
||||
chain_adapter.init(p2p_server.peers.clone());
|
||||
pool_net_adapter.init(p2p_server.peers.clone());
|
||||
net_adapter.init(p2p_server.peers.clone());
|
||||
chain_adapter.init(Arc::downgrade(&p2p_server.peers));
|
||||
pool_net_adapter.init(Arc::downgrade(&p2p_server.peers));
|
||||
net_adapter.init(Arc::downgrade(&p2p_server.peers));
|
||||
|
||||
if config.seeding_type.clone() != Seeding::Programmatic {
|
||||
|
||||
|
@ -129,7 +136,8 @@ impl Server {
|
|||
}
|
||||
_ => unreachable!(),
|
||||
};
|
||||
seed::connect_and_monitor(p2p_server.clone(), config.capabilities, seeder);
|
||||
seed::connect_and_monitor(
|
||||
p2p_server.clone(), config.capabilities, seeder, stop.clone());
|
||||
}
|
||||
|
||||
let skip_sync_wait = match config.skip_sync_wait {
|
||||
|
@ -143,6 +151,7 @@ impl Server {
|
|||
shared_chain.clone(),
|
||||
skip_sync_wait,
|
||||
!config.archive_mode,
|
||||
stop.clone(),
|
||||
);
|
||||
|
||||
let p2p_inner = p2p_server.clone();
|
||||
|
@ -154,9 +163,9 @@ impl Server {
|
|||
|
||||
api::start_rest_apis(
|
||||
config.api_http_addr.clone(),
|
||||
shared_chain.clone(),
|
||||
tx_pool.clone(),
|
||||
p2p_server.peers.clone(),
|
||||
Arc::downgrade(&shared_chain),
|
||||
Arc::downgrade(&tx_pool),
|
||||
Arc::downgrade(&p2p_server.peers),
|
||||
);
|
||||
|
||||
warn!(LOGGER, "Grin server started.");
|
||||
|
@ -166,6 +175,7 @@ impl Server {
|
|||
chain: shared_chain,
|
||||
tx_pool: tx_pool,
|
||||
currently_syncing: currently_syncing,
|
||||
stop: stop,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -187,7 +197,8 @@ impl Server {
|
|||
let proof_size = global::proofsize();
|
||||
let currently_syncing = self.currently_syncing.clone();
|
||||
|
||||
let mut miner = miner::Miner::new(config.clone(), self.chain.clone(), self.tx_pool.clone());
|
||||
let mut miner = miner::Miner::new(
|
||||
config.clone(), self.chain.clone(), self.tx_pool.clone(), self.stop.clone());
|
||||
miner.set_debug_output_id(format!("Port {}", self.config.p2p_config.port));
|
||||
let _ = thread::Builder::new()
|
||||
.name("miner".to_string())
|
||||
|
@ -217,11 +228,15 @@ impl Server {
|
|||
/// can be updated over time to include any information needed by tests or
|
||||
/// other
|
||||
/// consumers
|
||||
|
||||
pub fn get_server_stats(&self) -> Result<ServerStats, Error> {
|
||||
Ok(ServerStats {
|
||||
peer_count: self.peer_count(),
|
||||
head: self.head(),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn stop(&self) {
|
||||
self.p2p.stop();
|
||||
self.stop.store(true, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,10 +29,11 @@ use util::LOGGER;
|
|||
/// Starts the syncing loop, just spawns two threads that loop forever
|
||||
pub fn run_sync(
|
||||
currently_syncing: Arc<AtomicBool>,
|
||||
peers: p2p::Peers,
|
||||
peers: Arc<p2p::Peers>,
|
||||
chain: Arc<chain::Chain>,
|
||||
skip_sync_wait: bool,
|
||||
fast_sync: bool,
|
||||
stop: Arc<AtomicBool>,
|
||||
) {
|
||||
|
||||
let chain = chain.clone();
|
||||
|
@ -113,11 +114,15 @@ pub fn run_sync(
|
|||
}
|
||||
}
|
||||
thread::sleep(Duration::from_secs(1));
|
||||
|
||||
if stop.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
fn body_sync(peers: Peers, chain: Arc<chain::Chain>) {
|
||||
fn body_sync(peers: Arc<Peers>, chain: Arc<chain::Chain>) {
|
||||
|
||||
let body_head: chain::Tip = chain.head().unwrap();
|
||||
let header_head: chain::Tip = chain.get_header_head().unwrap();
|
||||
|
@ -190,7 +195,7 @@ fn body_sync(peers: Peers, chain: Arc<chain::Chain>) {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn header_sync(peers: Peers, chain: Arc<chain::Chain>) {
|
||||
pub fn header_sync(peers: Arc<Peers>, chain: Arc<chain::Chain>) {
|
||||
if let Ok(header_head) = chain.get_header_head() {
|
||||
let difficulty = header_head.total_difficulty;
|
||||
|
||||
|
@ -237,7 +242,7 @@ fn request_headers(
|
|||
/// just receiving blocks through gossip.
|
||||
pub fn needs_syncing(
|
||||
currently_syncing: Arc<AtomicBool>,
|
||||
peers: Peers,
|
||||
peers: Arc<Peers>,
|
||||
chain: Arc<chain::Chain>,
|
||||
header_only: bool) -> bool {
|
||||
|
||||
|
|
|
@ -173,7 +173,7 @@ fn test_p2p() {
|
|||
let _ = thread::spawn(move || server_two.run_server(120));
|
||||
|
||||
// Let them do the handshake
|
||||
thread::sleep(time::Duration::from_millis(1000));
|
||||
thread::sleep(time::Duration::from_millis(1500));
|
||||
|
||||
// Starting tests
|
||||
warn!(LOGGER, "Starting P2P Tests");
|
||||
|
|
|
@ -23,6 +23,8 @@ extern crate grin_wallet as wallet;
|
|||
|
||||
mod framework;
|
||||
|
||||
use std::fs;
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
use std::time;
|
||||
use std::default::Default;
|
||||
|
@ -185,21 +187,6 @@ fn a_simulate_block_propagation() {
|
|||
let test_name_dir = "grin-prop";
|
||||
framework::clean_all_output(test_name_dir);
|
||||
|
||||
let mut plugin_config = pow::types::CuckooMinerPluginConfig::default();
|
||||
let mut plugin_config_vec: Vec<pow::types::CuckooMinerPluginConfig> = Vec::new();
|
||||
plugin_config.type_filter = String::from("mean_cpu");
|
||||
plugin_config_vec.push(plugin_config);
|
||||
|
||||
let miner_config = pow::types::MinerConfig {
|
||||
enable_mining: true,
|
||||
burn_reward: true,
|
||||
use_cuckoo_miner: false,
|
||||
cuckoo_miner_async_mode: None,
|
||||
cuckoo_miner_plugin_dir: Some(String::from("../target/debug/deps")),
|
||||
cuckoo_miner_plugin_config: Some(plugin_config_vec),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
// instantiates 5 servers on different ports
|
||||
let mut servers = vec![];
|
||||
for n in 0..5 {
|
||||
|
@ -221,7 +208,7 @@ fn a_simulate_block_propagation() {
|
|||
}
|
||||
|
||||
// start mining
|
||||
servers[0].start_miner(miner_config);
|
||||
servers[0].start_miner(miner_config());
|
||||
let original_height = servers[0].head().height;
|
||||
|
||||
// monitor for a change of head on a different server and check whether
|
||||
|
@ -252,24 +239,9 @@ fn simulate_full_sync() {
|
|||
let test_name_dir = "grin-sync";
|
||||
framework::clean_all_output(test_name_dir);
|
||||
|
||||
let mut plugin_config = pow::types::CuckooMinerPluginConfig::default();
|
||||
let mut plugin_config_vec: Vec<pow::types::CuckooMinerPluginConfig> = Vec::new();
|
||||
plugin_config.type_filter = String::from("mean_cpu");
|
||||
plugin_config_vec.push(plugin_config);
|
||||
|
||||
let miner_config = pow::types::MinerConfig {
|
||||
enable_mining: true,
|
||||
burn_reward: true,
|
||||
use_cuckoo_miner: false,
|
||||
cuckoo_miner_async_mode: Some(false),
|
||||
cuckoo_miner_plugin_dir: Some(String::from("../target/debug/deps")),
|
||||
cuckoo_miner_plugin_config: Some(plugin_config_vec),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let s1 = grin::Server::new(config(0, "grin-sync")).unwrap();
|
||||
// mine a few blocks on server 1
|
||||
s1.start_miner(miner_config);
|
||||
s1.start_miner(miner_config());
|
||||
thread::sleep(time::Duration::from_secs(8));
|
||||
|
||||
let mut conf = config(1, "grin-sync");
|
||||
|
@ -291,24 +263,9 @@ fn simulate_fast_sync() {
|
|||
let test_name_dir = "grin-fast";
|
||||
framework::clean_all_output(test_name_dir);
|
||||
|
||||
let mut plugin_config = pow::types::CuckooMinerPluginConfig::default();
|
||||
let mut plugin_config_vec: Vec<pow::types::CuckooMinerPluginConfig> = Vec::new();
|
||||
plugin_config.type_filter = String::from("mean_cpu");
|
||||
plugin_config_vec.push(plugin_config);
|
||||
|
||||
let miner_config = pow::types::MinerConfig {
|
||||
enable_mining: true,
|
||||
burn_reward: true,
|
||||
use_cuckoo_miner: false,
|
||||
cuckoo_miner_async_mode: Some(false),
|
||||
cuckoo_miner_plugin_dir: Some(String::from("../target/debug/deps")),
|
||||
cuckoo_miner_plugin_config: Some(plugin_config_vec),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let s1 = grin::Server::new(config(1000, "grin-fast")).unwrap();
|
||||
// mine a few blocks on server 1
|
||||
s1.start_miner(miner_config);
|
||||
s1.start_miner(miner_config());
|
||||
thread::sleep(time::Duration::from_secs(8));
|
||||
|
||||
let mut conf = config(1001, "grin-fast");
|
||||
|
@ -320,6 +277,45 @@ fn simulate_fast_sync() {
|
|||
}
|
||||
}
|
||||
|
||||
// #[test]
|
||||
fn simulate_fast_sync_double() {
|
||||
util::init_test_logger();
|
||||
|
||||
// we actually set the chain_type in the ServerConfig below
|
||||
global::set_mining_mode(ChainTypes::AutomatedTesting);
|
||||
|
||||
framework::clean_all_output("grin-double-fast1");
|
||||
framework::clean_all_output("grin-double-fast2");
|
||||
|
||||
let s1 = grin::Server::new(config(1000, "grin-double-fast1")).unwrap();
|
||||
// mine a few blocks on server 1
|
||||
s1.start_miner(miner_config());
|
||||
thread::sleep(time::Duration::from_secs(8));
|
||||
|
||||
{
|
||||
let mut conf = config(1001, "grin-double-fast2");
|
||||
conf.archive_mode = false;
|
||||
conf.seeds = Some(vec!["127.0.0.1:12000".to_string()]);
|
||||
let s2 = grin::Server::new(conf).unwrap();
|
||||
while s2.head().height != s2.header_head().height || s2.head().height < 20 {
|
||||
thread::sleep(time::Duration::from_millis(1000));
|
||||
}
|
||||
s2.stop();
|
||||
}
|
||||
// locks files don't seem to be cleaned properly until process exit
|
||||
std::fs::remove_file("target/grin-double-fast2/grin-sync-1001/chain/LOCK");
|
||||
std::fs::remove_file("target/grin-double-fast2/grin-sync-1001/peers/LOCK");
|
||||
thread::sleep(time::Duration::from_secs(20));
|
||||
|
||||
let mut conf = config(1001, "grin-double-fast2");
|
||||
conf.archive_mode = false;
|
||||
conf.seeds = Some(vec!["127.0.0.1:12000".to_string()]);
|
||||
let s2 = grin::Server::new(conf).unwrap();
|
||||
while s2.head().height != s2.header_head().height || s2.head().height < 50 {
|
||||
thread::sleep(time::Duration::from_millis(1000));
|
||||
}
|
||||
}
|
||||
|
||||
fn config(n: u16, test_name_dir: &str) -> grin::ServerConfig {
|
||||
grin::ServerConfig {
|
||||
api_http_addr: format!("127.0.0.1:{}", 19000 + n),
|
||||
|
@ -336,3 +332,20 @@ fn config(n: u16, test_name_dir: &str) -> grin::ServerConfig {
|
|||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
fn miner_config() -> pow::types::MinerConfig {
|
||||
let mut plugin_config = pow::types::CuckooMinerPluginConfig::default();
|
||||
let mut plugin_config_vec: Vec<pow::types::CuckooMinerPluginConfig> = Vec::new();
|
||||
plugin_config.type_filter = String::from("mean_cpu");
|
||||
plugin_config_vec.push(plugin_config);
|
||||
|
||||
pow::types::MinerConfig {
|
||||
enable_mining: true,
|
||||
burn_reward: true,
|
||||
use_cuckoo_miner: false,
|
||||
cuckoo_miner_async_mode: Some(false),
|
||||
cuckoo_miner_plugin_dir: Some(String::from("../target/debug/deps")),
|
||||
cuckoo_miner_plugin_config: Some(plugin_config_vec),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
use std::collections::HashMap;
|
||||
use std::fs::File;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::sync::{Arc, RwLock, atomic};
|
||||
|
||||
use rand::{thread_rng, Rng};
|
||||
|
||||
|
@ -29,11 +29,10 @@ use peer::Peer;
|
|||
use store::{PeerStore, PeerData, State};
|
||||
use types::*;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Peers {
|
||||
pub adapter: Arc<ChainAdapter>,
|
||||
store: Arc<PeerStore>,
|
||||
peers: Arc<RwLock<HashMap<SocketAddr, Arc<RwLock<Peer>>>>>,
|
||||
store: PeerStore,
|
||||
peers: RwLock<HashMap<SocketAddr, Arc<RwLock<Peer>>>>,
|
||||
config: P2PConfig,
|
||||
}
|
||||
|
||||
|
@ -44,8 +43,8 @@ impl Peers {
|
|||
pub fn new(store: PeerStore, adapter: Arc<ChainAdapter>, config: P2PConfig) -> Peers {
|
||||
Peers {
|
||||
adapter,
|
||||
store: Arc::new(store),
|
||||
peers: Arc::new(RwLock::new(HashMap::new())),
|
||||
store,
|
||||
peers: RwLock::new(HashMap::new()),
|
||||
config,
|
||||
}
|
||||
}
|
||||
|
@ -425,9 +424,9 @@ impl Peers {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn stop(self) {
|
||||
let peers = self.connected_peers();
|
||||
for peer in peers {
|
||||
pub fn stop(&self) {
|
||||
let mut peers = self.peers.write().unwrap();
|
||||
for (_, peer) in peers.drain() {
|
||||
let peer = peer.read().unwrap();
|
||||
peer.stop();
|
||||
}
|
||||
|
|
|
@ -13,8 +13,10 @@
|
|||
// limitations under the License.
|
||||
|
||||
use std::fs::File;
|
||||
use std::io;
|
||||
use std::net::{TcpListener, TcpStream, SocketAddr, Shutdown};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
|
@ -34,7 +36,8 @@ pub struct Server {
|
|||
config: P2PConfig,
|
||||
capabilities: Capabilities,
|
||||
handshake: Arc<Handshake>,
|
||||
pub peers: Peers,
|
||||
pub peers: Arc<Peers>,
|
||||
stop: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
unsafe impl Sync for Server {}
|
||||
|
@ -50,13 +53,15 @@ impl Server {
|
|||
config: P2PConfig,
|
||||
adapter: Arc<ChainAdapter>,
|
||||
genesis: Hash,
|
||||
stop: Arc<AtomicBool>,
|
||||
) -> Result<Server, Error> {
|
||||
|
||||
Ok(Server {
|
||||
config: config.clone(),
|
||||
capabilities: capab,
|
||||
handshake: Arc::new(Handshake::new(genesis, config.clone())),
|
||||
peers: Peers::new(PeerStore::new(db_root)?, adapter, config),
|
||||
peers: Arc::new(Peers::new(PeerStore::new(db_root)?, adapter, config)),
|
||||
stop: stop,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -65,37 +70,49 @@ impl Server {
|
|||
pub fn listen(&self) -> Result<(), Error> {
|
||||
// start peer monitoring thread
|
||||
let peers_inner = self.peers.clone();
|
||||
let stop = self.stop.clone();
|
||||
let _ = thread::Builder::new().name("p2p-monitor".to_string()).spawn(move || {
|
||||
loop {
|
||||
let total_diff = peers_inner.total_difficulty();
|
||||
let total_height = peers_inner.total_height();
|
||||
peers_inner.check_all(total_diff, total_height);
|
||||
thread::sleep(Duration::from_secs(20));
|
||||
thread::sleep(Duration::from_secs(10));
|
||||
if stop.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// start TCP listener and handle incoming connections
|
||||
let addr = SocketAddr::new(self.config.host, self.config.port);
|
||||
let listener = TcpListener::bind(addr)?;
|
||||
listener.set_nonblocking(true)?;
|
||||
|
||||
for stream in listener.incoming() {
|
||||
match stream {
|
||||
Ok(stream) => {
|
||||
let sleep_time = Duration::from_millis(1);
|
||||
loop {
|
||||
match listener.accept() {
|
||||
Ok((stream, peer_addr)) => {
|
||||
if !self.check_banned(&stream) {
|
||||
let peer_addr = stream.peer_addr();
|
||||
if let Err(e) = self.handle_new_peer(stream) {
|
||||
debug!(
|
||||
LOGGER,
|
||||
"Error accepting peer {}: {:?}",
|
||||
peer_addr.map(|a| a.to_string()).unwrap_or("?".to_owned()),
|
||||
peer_addr.to_string(),
|
||||
e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||
// nothing to do, will retry in next iteration
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(LOGGER, "Couldn't establish new client connection: {:?}", e);
|
||||
}
|
||||
}
|
||||
if self.stop.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
thread::sleep(sleep_time);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
@ -126,7 +143,7 @@ impl Server {
|
|||
total_diff,
|
||||
addr,
|
||||
&self.handshake,
|
||||
Arc::new(self.peers.clone()),
|
||||
self.peers.clone(),
|
||||
)?;
|
||||
let added = self.peers.add_connected(peer);
|
||||
{
|
||||
|
@ -151,7 +168,7 @@ impl Server {
|
|||
self.capabilities,
|
||||
total_diff,
|
||||
&self.handshake,
|
||||
Arc::new(self.peers.clone()),
|
||||
self.peers.clone(),
|
||||
)?;
|
||||
let added = self.peers.add_connected(peer);
|
||||
let mut peer = added.write().unwrap();
|
||||
|
@ -172,6 +189,11 @@ impl Server {
|
|||
}
|
||||
false
|
||||
}
|
||||
|
||||
pub fn stop(&self) {
|
||||
self.stop.store(true, Ordering::Relaxed);
|
||||
self.peers.stop();
|
||||
}
|
||||
}
|
||||
|
||||
/// A no-op network adapter used for testing.
|
||||
|
|
|
@ -18,6 +18,7 @@ extern crate grin_util as util;
|
|||
|
||||
use std::net::{SocketAddr, TcpListener, TcpStream};
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::thread;
|
||||
use std::time;
|
||||
|
||||
|
@ -52,6 +53,7 @@ fn peer_handshake() {
|
|||
p2p_conf.clone(),
|
||||
net_adapter.clone(),
|
||||
Hash::from_vec(vec![]),
|
||||
Arc::new(AtomicBool::new(false)),
|
||||
).unwrap());
|
||||
|
||||
let p2p_inner = server.clone();
|
||||
|
|
Loading…
Add table
Reference in a new issue