Add a peers_preferred option in Grin (#1401)

* Preferred peers option
* Move P2P configuration into proper config part
* Fix tests
This commit is contained in:
Quentin Le Sceller 2018-08-23 21:16:04 +02:00 committed by Ignotus Peverell
parent 2251a82404
commit 8cd1b23f7a
14 changed files with 174 additions and 109 deletions

View file

@ -25,14 +25,6 @@ api_http_addr = "127.0.0.1:13413"
db_root = ".grin"
#How to seed this server, can be None, List, WebStatic or DNSSeed
#
#seeding_type = "None"
#If seeding_type = List, the list of peers to connect to.
#
#seeds = ["192.168.0.1:13414","192.168.0.2:13414"]
#The chain type, which defines the genesis block and the set of cuckoo
#parameters used for mining. Can be:
#AutomatedTesting - For CI builds and instant blockchain creation
@ -51,10 +43,6 @@ chain_type = "Testnet3"
#run the node in "full archive" mode (default is fast-sync, pruned node)
#archive_mode = false
#7 = Bit flags for FULL_NODE, this structure needs to be changed
#internally to make it more configurable
capabilities = [7]
#skip waiting for sync on startup, (optional param, mostly for testing)
skip_sync_wait = false
@ -98,11 +86,25 @@ stem_probability = 90
host = "0.0.0.0"
port = 13414
#How to seed this server, can be None, List, WebStatic or DNSSeed
#
#seeding_type = "None"
#If seeding_type = List, the list of peers to connect to.
#
#seeds = ["192.168.0.1:13414","192.168.0.2:13414"]
#7 = Bit flags for FULL_NODE, this structure needs to be changed
#internally to make it more configurable
capabilities = [7]
#hardcoded peer lists for allow/deny
#will *only* connect to peers in allow list
#peers_allow = ["192.168.0.1:13414", "192.168.0.2:13414"]
#will *never* connect to peers in deny list
#peers_deny = ["192.168.0.3:13414", "192.168.0.4:13414"]
#a list of preferred peers to connect to
#peers_preferred = ["192.168.0.1:13414","192.168.0.2:13414"]
#how long a banned peer should stay banned
#ban_window = 10800

View file

@ -54,5 +54,7 @@ pub use peer::Peer;
pub use peers::Peers;
pub use serv::{DummyAdapter, Server};
pub use store::{PeerData, State};
pub use types::{Capabilities, ChainAdapter, Direction, Error, P2PConfig, PeerInfo, ReasonForBan,
TxHashSetRead, MAX_BLOCK_HEADERS, MAX_LOCATORS, MAX_PEER_ADDRS};
pub use types::{
Capabilities, ChainAdapter, Direction, Error, P2PConfig, PeerInfo, ReasonForBan, Seeding,
TxHashSetRead, MAX_BLOCK_HEADERS, MAX_LOCATORS, MAX_PEER_ADDRS,
};

View file

@ -161,8 +161,7 @@ impl Peer {
/// Send the ban reason before banning
pub fn send_ban_reason(&self, ban_reason: ReasonForBan) {
let ban_reason_msg = BanReason { ban_reason };
match self
.connection
match self.connection
.as_ref()
.unwrap()
.send(ban_reason_msg, msg::Type::BanReason)

View file

@ -109,8 +109,7 @@ impl Peers {
/// Get vec of peers we are currently connected to.
pub fn connected_peers(&self) -> Vec<Arc<RwLock<Peer>>> {
let mut res = self
.peers
let mut res = self.peers
.read()
.unwrap()
.values()

View file

@ -169,9 +169,10 @@ impl MessageHandler for Protocol {
let headers = adapter.locate_headers(loc.hashes);
// serialize and send all the headers over
Ok(Some(
msg.respond(Type::Headers, Headers { headers: headers }),
))
Ok(Some(msg.respond(
Type::Headers,
Headers { headers: headers },
)))
}
// "header first" block propagation - if we have not yet seen this block
@ -275,8 +276,7 @@ impl MessageHandler for Protocol {
);
let tmp_zip = File::open(tmp)?;
let res = self
.adapter
let res = self.adapter
.txhashset_write(sm_arch.hash, tmp_zip, self.addr);
debug!(

View file

@ -96,10 +96,24 @@ pub struct P2PConfig {
pub host: IpAddr,
pub port: u16,
/// Method used to get the list of seed nodes for initial bootstrap.
#[serde(default)]
pub seeding_type: Seeding,
/// The list of seed nodes, if using Seeding as a seed type
pub seeds: Option<Vec<String>>,
/// Capabilities expose by this node, also conditions which other peers this
/// node will have an affinity toward when connection.
pub capabilities: Capabilities,
pub peers_allow: Option<Vec<String>>,
pub peers_deny: Option<Vec<String>>,
/// The list of preferred peers that we will try to connect to
pub peers_preferred: Option<Vec<String>>,
pub ban_window: Option<i64>,
pub peer_max_count: Option<u32>,
@ -114,8 +128,12 @@ impl Default for P2PConfig {
P2PConfig {
host: ipaddr,
port: 13414,
capabilities: Capabilities::FULL_NODE,
seeding_type: Seeding::default(),
seeds: None,
peers_allow: None,
peers_deny: None,
peers_preferred: None,
ban_window: None,
peer_max_count: None,
peer_min_preferred_count: None,
@ -151,6 +169,27 @@ impl P2PConfig {
}
}
/// Type of seeding the server will use to find other peers on the network.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum Seeding {
/// No seeding, mostly for tests that programmatically connect
None,
/// A list of seed addresses provided to the server
List,
/// Automatically download a text file with a list of server addresses
WebStatic,
/// Automatically get a list of seeds from multiple DNS
DNSSeed,
/// Mostly for tests, where connections are initiated programmatically
Programmatic,
}
impl Default for Seeding {
fn default() -> Seeding {
Seeding::DNSSeed
}
}
bitflags! {
/// Options for what type of interaction a peer supports
#[derive(Serialize, Deserialize)]
@ -206,7 +245,7 @@ pub struct PeerInfo {
}
/// The full txhashset data along with indexes required for a consumer to
/// rewind to a consistant requested state.
/// rewind to a consistent requested state.
pub struct TxHashSetRead {
/// Output tree index the receiver should rewind to
pub output_index: u64,

View file

@ -164,8 +164,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
}
};
let chain = self
.chain
let chain = self.chain
.upgrade()
.expect("failed to upgrade weak ref to chain");
@ -438,8 +437,7 @@ impl NetToChainAdapter {
// we have a fast sync'd node and are sent a block older than our horizon,
// only sync can do something with that
if b.header.height
< head
.height
< head.height
.saturating_sub(global::cut_through_horizon() as u64)
{
return true;

View file

@ -103,27 +103,6 @@ impl Default for ChainValidationMode {
}
}
/// Type of seeding the server will use to find other peers on the network.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum Seeding {
/// No seeding, mostly for tests that programmatically connect
None,
/// A list of seed addresses provided to the server
List,
/// Automatically download a text file with a list of server addresses
WebStatic,
/// Automatically get a list of seeds from multiple DNS
DNSSeed,
/// Mostly for tests, where connections are initiated programmatically
Programmatic,
}
impl Default for Seeding {
fn default() -> Seeding {
Seeding::DNSSeed
}
}
/// Full server configuration, aggregating configurations required for the
/// different components.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
@ -145,19 +124,6 @@ pub struct ServerConfig {
#[serde(default)]
pub chain_validation_mode: ChainValidationMode,
/// Method used to get the list of seed nodes for initial bootstrap.
#[serde(default)]
pub seeding_type: Seeding,
/// TODO - move this into p2p_config?
/// The list of seed nodes, if using Seeding as a seed type
pub seeds: Option<Vec<String>>,
/// TODO - move this into p2p_config?
/// Capabilities expose by this node, also conditions which other peers this
/// node will have an affinity toward when connection.
pub capabilities: p2p::Capabilities,
/// Configuration for the peer-to-peer server
pub p2p_config: p2p::P2PConfig,
@ -201,9 +167,6 @@ impl Default for ServerConfig {
ServerConfig {
db_root: ".grin".to_string(),
api_http_addr: "127.0.0.1:13413".to_string(),
capabilities: p2p::Capabilities::FULL_NODE,
seeding_type: Seeding::default(),
seeds: None,
p2p_config: p2p::P2PConfig::default(),
dandelion_config: pool::DandelionConfig::default(),
stratum_mining_config: Some(StratumServerConfig::default()),

View file

@ -42,6 +42,7 @@ pub fn connect_and_monitor(
capabilities: p2p::Capabilities,
dandelion_config: DandelionConfig,
seed_list: Box<Fn() -> Vec<SocketAddr> + Send>,
preferred_peers: Option<Vec<SocketAddr>>,
stop: Arc<AtomicBool>,
) {
let _ = thread::Builder::new()
@ -54,7 +55,12 @@ pub fn connect_and_monitor(
let (tx, rx) = mpsc::channel();
// check seeds first
connect_to_seeds(peers.clone(), tx.clone(), seed_list);
connect_to_seeds_and_preferred_peers(
peers.clone(),
tx.clone(),
seed_list,
preferred_peers.clone(),
);
let mut prev = Utc::now() - Duration::seconds(60);
loop {
@ -72,6 +78,7 @@ pub fn connect_and_monitor(
p2p_server.config.clone(),
capabilities,
tx.clone(),
preferred_peers.clone(),
);
update_dandelion_relay(peers.clone(), dandelion_config.clone());
@ -93,6 +100,7 @@ fn monitor_peers(
config: p2p::P2PConfig,
capabilities: p2p::Capabilities,
tx: mpsc::Sender<SocketAddr>,
preferred_peers_list: Option<Vec<SocketAddr>>,
) {
// regularly check if we need to acquire more peers and if so, gets
// them from db
@ -142,15 +150,31 @@ fn monitor_peers(
// loop over connected peers
// ask them for their list of peers
let mut connected_peers: Vec<SocketAddr> = vec![];
for p in peers.connected_peers() {
if let Ok(p) = p.try_read() {
debug!(LOGGER, "monitor_peers: ask {} for more peers", p.info.addr);
let _ = p.send_peer_request(capabilities);
connected_peers.push(p.info.addr)
} else {
warn!(LOGGER, "monitor_peers: failed to get read lock on peer");
}
}
// Attempt to connect to preferred peers if there is some
match preferred_peers_list {
Some(preferred_peers) => {
for mut p in preferred_peers {
if !connected_peers.is_empty() {
if connected_peers.contains(&p) {
tx.send(p).unwrap();
}
}
}
}
None => debug!(LOGGER, "monitor_peers: no preferred peers"),
}
// find some peers from our db
// and queue them up for a connection attempt
let new_peers = peers.find_peers(
@ -183,21 +207,28 @@ fn update_dandelion_relay(peers: Arc<p2p::Peers>, dandelion_config: DandelionCon
// Check if we have any pre-existing peer in db. If so, start with those,
// otherwise use the seeds provided.
fn connect_to_seeds(
fn connect_to_seeds_and_preferred_peers(
peers: Arc<p2p::Peers>,
tx: mpsc::Sender<SocketAddr>,
seed_list: Box<Fn() -> Vec<SocketAddr>>,
peers_preferred_list: Option<Vec<SocketAddr>>,
) {
// check if we have some peers in db
let peers = peers.find_peers(p2p::State::Healthy, p2p::Capabilities::FULL_HIST, 100);
// if so, get their addresses, otherwise use our seeds
let peer_addrs = if peers.len() > 3 {
let mut peer_addrs = if peers.len() > 3 {
peers.iter().map(|p| p.addr).collect::<Vec<_>>()
} else {
seed_list()
};
// If we have preferred peers add them to the connection
match peers_preferred_list {
Some(mut peers_preferred) => peer_addrs.append(&mut peers_preferred),
None => debug!(LOGGER, "No preferred peers"),
};
if peer_addrs.len() == 0 {
warn!(LOGGER, "No seeds were retrieved.");
}
@ -274,8 +305,7 @@ pub fn dns_seeds() -> Box<Fn() -> Vec<SocketAddr> + Send> {
pub fn web_seeds() -> Box<Fn() -> Vec<SocketAddr> + Send> {
Box::new(|| {
let text: String = api::client::get(SEEDS_URL).expect("Failed to resolve seeds");
let addrs = text
.split_whitespace()
let addrs = text.split_whitespace()
.map(|s| s.parse().unwrap())
.collect::<Vec<_>>();
debug!(LOGGER, "Retrieved seed addresses: {:?}", addrs);
@ -293,3 +323,18 @@ pub fn predefined_seeds(addrs_str: Vec<String>) -> Box<Fn() -> Vec<SocketAddr> +
.collect::<Vec<_>>()
})
}
/// Convenience function when the seed list is immediately known. Mostly used
/// for tests.
pub fn preferred_peers(addrs_str: Vec<String>) -> Option<Vec<SocketAddr>> {
if addrs_str.is_empty() {
None
} else {
Some(
addrs_str
.iter()
.map(|s| s.parse().unwrap())
.collect::<Vec<_>>(),
)
}
}

View file

@ -23,10 +23,11 @@ use std::{thread, time};
use api;
use chain;
use common::adapters::{ChainToPoolAndNetAdapter, NetToChainAdapter, PoolToChainAdapter,
PoolToNetAdapter};
use common::adapters::{
ChainToPoolAndNetAdapter, NetToChainAdapter, PoolToChainAdapter, PoolToNetAdapter,
};
use common::stats::{DiffBlock, DiffStats, PeerStats, ServerStateInfo, ServerStats};
use common::types::{Error, Seeding, ServerConfig, StratumServerConfig, SyncState};
use common::types::{Error, ServerConfig, StratumServerConfig, SyncState};
use core::core::hash::Hashed;
use core::core::target::Difficulty;
use core::{consensus, genesis, global, pow};
@ -105,8 +106,16 @@ impl Server {
};
// If archive mode is enabled then the flags should contains the FULL_HIST flag
if archive_mode && !config.capabilities.contains(p2p::Capabilities::FULL_HIST) {
config.capabilities.insert(p2p::Capabilities::FULL_HIST);
if archive_mode
&& !config
.p2p_config
.capabilities
.contains(p2p::Capabilities::FULL_HIST)
{
config
.p2p_config
.capabilities
.insert(p2p::Capabilities::FULL_HIST);
}
let stop = Arc::new(AtomicBool::new(false));
@ -165,7 +174,7 @@ impl Server {
let p2p_server = Arc::new(p2p::Server::new(
db_env,
config.capabilities,
config.p2p_config.capabilities,
config.p2p_config.clone(),
net_adapter.clone(),
genesis.hash(),
@ -177,25 +186,34 @@ impl Server {
pool_net_adapter.init(Arc::downgrade(&p2p_server.peers));
net_adapter.init(Arc::downgrade(&p2p_server.peers));
if config.seeding_type.clone() != Seeding::Programmatic {
let seeder = match config.seeding_type.clone() {
Seeding::None => {
if config.p2p_config.seeding_type.clone() != p2p::Seeding::Programmatic {
let seeder = match config.p2p_config.seeding_type.clone() {
p2p::Seeding::None => {
warn!(
LOGGER,
"No seed configured, will stay solo until connected to"
);
seed::predefined_seeds(vec![])
}
Seeding::List => seed::predefined_seeds(config.seeds.as_mut().unwrap().clone()),
Seeding::DNSSeed => seed::dns_seeds(),
Seeding::WebStatic => seed::web_seeds(),
p2p::Seeding::List => {
seed::predefined_seeds(config.p2p_config.seeds.as_mut().unwrap().clone())
}
p2p::Seeding::DNSSeed => seed::dns_seeds(),
p2p::Seeding::WebStatic => seed::web_seeds(),
_ => unreachable!(),
};
let peers_preferred = match config.p2p_config.peers_preferred.clone() {
Some(peers_preferred) => seed::preferred_peers(peers_preferred),
None => None,
};
seed::connect_and_monitor(
p2p_server.clone(),
config.capabilities,
config.p2p_config.capabilities,
config.dandelion_config.clone(),
seeder,
peers_preferred,
stop.clone(),
);
}
@ -292,12 +310,7 @@ impl Server {
let _ = thread::Builder::new()
.name("stratum_server".to_string())
.spawn(move || {
stratum_server.run_loop(
stratum_stats,
cuckoo_size as u32,
proof_size,
sync_state,
);
stratum_server.run_loop(stratum_stats, cuckoo_size as u32, proof_size, sync_state);
});
}

View file

@ -469,17 +469,23 @@ impl SyncInfo {
let (timeout, latest_height, prev_height) = self.prev_header_sync;
// received all necessary headers, can ask for more
let all_headers_received = header_head.height >= prev_height + (p2p::MAX_BLOCK_HEADERS as u64) - 4;
let all_headers_received =
header_head.height >= prev_height + (p2p::MAX_BLOCK_HEADERS as u64) - 4;
// no headers processed and we're past timeout, need to ask for more
let stalling = header_head.height == latest_height && now > timeout;
if all_headers_received || stalling {
self.prev_header_sync = (now + Duration::seconds(10), header_head.height, header_head.height);
self.prev_header_sync = (
now + Duration::seconds(10),
header_head.height,
header_head.height,
);
true
} else {
// resetting the timeout as long as we progress
if header_head.height > latest_height {
self.prev_header_sync = (now + Duration::seconds(2), header_head.height, prev_height);
self.prev_header_sync =
(now + Duration::seconds(2), header_head.height, prev_height);
}
false
}

View file

@ -54,6 +54,6 @@ mod mining;
mod webwallet;
pub use common::stats::{DiffBlock, PeerStats, ServerStats, StratumStats, WorkerStats};
pub use common::types::{Seeding, ServerConfig, StratumServerConfig};
pub use common::types::{ServerConfig, StratumServerConfig};
pub use grin::server::Server;
pub use webwallet::server::start_webwallet_server;

View file

@ -179,11 +179,11 @@ impl LocalServerContainer {
pub fn run_server(&mut self, duration_in_seconds: u64) -> servers::Server {
let api_addr = format!("{}:{}", self.config.base_addr, self.config.api_server_port);
let mut seeding_type = servers::Seeding::None;
let mut seeding_type = p2p::Seeding::None;
let mut seeds = Vec::new();
if self.config.seed_addr.len() > 0 {
seeding_type = servers::Seeding::List;
seeding_type = p2p::Seeding::List;
seeds = vec![self.config.seed_addr.to_string()];
}
@ -192,10 +192,10 @@ impl LocalServerContainer {
db_root: format!("{}/.grin", self.working_dir),
p2p_config: p2p::P2PConfig {
port: self.config.p2p_server_port,
..p2p::P2PConfig::default()
},
seeds: Some(seeds),
seeding_type: seeding_type,
..p2p::P2PConfig::default()
},
chain_type: core::global::ChainTypes::AutomatedTesting,
skip_sync_wait: Some(true),
stratum_mining_config: None,
@ -585,10 +585,10 @@ pub fn config(n: u16, test_name_dir: &str, seed_n: u16) -> servers::ServerConfig
db_root: format!("target/tmp/{}/grin-sync-{}", test_name_dir, n),
p2p_config: p2p::P2PConfig {
port: 10000 + n,
seeding_type: p2p::Seeding::List,
seeds: Some(vec![format!("127.0.0.1:{}", 10000 + seed_n)]),
..p2p::P2PConfig::default()
},
seeding_type: servers::Seeding::List,
seeds: Some(vec![format!("127.0.0.1:{}", 10000 + seed_n)]),
chain_type: core::global::ChainTypes::AutomatedTesting,
archive_mode: Some(true),
skip_sync_wait: Some(true),

View file

@ -13,7 +13,6 @@
// limitations under the License.
/// Grin server commands processing
use std::env::current_dir;
use std::process::exit;
use std::sync::atomic::{AtomicBool, Ordering};
@ -21,14 +20,15 @@ use std::sync::Arc;
use std::thread;
use std::time::Duration;
use ctrlc;
use clap::ArgMatches;
use ctrlc;
use daemonize::Daemonize;
use cmd::wallet;
use config::GlobalConfig;
use core::global;
use grin_wallet::controller;
use p2p::Seeding;
use servers;
use tui::ui;
use util::LOGGER;
@ -118,8 +118,8 @@ pub fn server_command(server_args: Option<&ArgMatches>, mut global_config: Globa
}
if let Some(seeds) = a.values_of("seed") {
server_config.seeding_type = servers::Seeding::List;
server_config.seeds = Some(seeds.map(|s| s.to_string()).collect());
server_config.p2p_config.seeding_type = Seeding::List;
server_config.p2p_config.seeds = Some(seeds.map(|s| s.to_string()).collect());
}
}
@ -192,4 +192,3 @@ pub fn server_command(server_args: Option<&ArgMatches>, mut global_config: Globa
start_server(server_config);
}
}